Selaa lähdekoodia

chore: merge lagging revisions when close the document

appflowy 2 vuotta sitten
vanhempi
commit
6425997508

+ 7 - 1
frontend/rust-lib/flowy-document/src/editor/editor.rs

@@ -83,7 +83,13 @@ fn spawn_edit_queue(
 }
 }
 
 
 impl DocumentEditor for Arc<AppFlowyDocumentEditor> {
 impl DocumentEditor for Arc<AppFlowyDocumentEditor> {
-    fn close(&self) {}
+    #[tracing::instrument(name = "close document editor", level = "trace", skip_all)]
+    fn close(&self) {
+        let rev_manager = self.rev_manager.clone();
+        tokio::spawn(async move {
+            rev_manager.close().await;
+        });
+    }
 
 
     fn export(&self) -> FutureResult<String, FlowyError> {
     fn export(&self) -> FutureResult<String, FlowyError> {
         let this = self.clone();
         let this = self.clone();

+ 36 - 36
frontend/rust-lib/flowy-document/src/manager.rs

@@ -5,7 +5,7 @@ use crate::services::rev_sqlite::{SQLiteDeltaDocumentRevisionPersistence, SQLite
 use crate::services::DocumentPersistence;
 use crate::services::DocumentPersistence;
 use crate::{errors::FlowyError, DocumentCloudService};
 use crate::{errors::FlowyError, DocumentCloudService};
 use bytes::Bytes;
 use bytes::Bytes;
-use dashmap::DashMap;
+
 use flowy_database::ConnectionPool;
 use flowy_database::ConnectionPool;
 use flowy_error::FlowyResult;
 use flowy_error::FlowyResult;
 use flowy_revision::{
 use flowy_revision::{
@@ -16,9 +16,11 @@ use flowy_sync::client_document::initial_delta_document_content;
 use flowy_sync::entities::{document::DocumentIdPB, revision::Revision, ws_data::ServerRevisionWSData};
 use flowy_sync::entities::{document::DocumentIdPB, revision::Revision, ws_data::ServerRevisionWSData};
 use flowy_sync::util::md5;
 use flowy_sync::util::md5;
 use lib_infra::future::FutureResult;
 use lib_infra::future::FutureResult;
+use lib_infra::ref_map::{RefCountHashMap, RefCountValue};
 use lib_ws::WSConnectState;
 use lib_ws::WSConnectState;
 use std::any::Any;
 use std::any::Any;
 use std::{convert::TryInto, sync::Arc};
 use std::{convert::TryInto, sync::Arc};
+use tokio::sync::RwLock;
 
 
 pub trait DocumentUser: Send + Sync {
 pub trait DocumentUser: Send + Sync {
     fn user_dir(&self) -> Result<String, FlowyError>;
     fn user_dir(&self) -> Result<String, FlowyError>;
@@ -76,7 +78,7 @@ impl std::default::Default for DocumentConfig {
 pub struct DocumentManager {
 pub struct DocumentManager {
     cloud_service: Arc<dyn DocumentCloudService>,
     cloud_service: Arc<dyn DocumentCloudService>,
     rev_web_socket: Arc<dyn RevisionWebSocket>,
     rev_web_socket: Arc<dyn RevisionWebSocket>,
-    editor_map: Arc<DocumentEditorMap>,
+    editor_map: Arc<RwLock<RefCountHashMap<RefCountDocumentHandler>>>,
     user: Arc<dyn DocumentUser>,
     user: Arc<dyn DocumentUser>,
     persistence: Arc<DocumentPersistence>,
     persistence: Arc<DocumentPersistence>,
     #[allow(dead_code)]
     #[allow(dead_code)]
@@ -94,7 +96,7 @@ impl DocumentManager {
         Self {
         Self {
             cloud_service,
             cloud_service,
             rev_web_socket,
             rev_web_socket,
-            editor_map: Arc::new(DocumentEditorMap::new()),
+            editor_map: Arc::new(RwLock::new(RefCountHashMap::new())),
             user: document_user,
             user: document_user,
             persistence: Arc::new(DocumentPersistence::new(database)),
             persistence: Arc::new(DocumentPersistence::new(database)),
             config,
             config,
@@ -124,10 +126,10 @@ impl DocumentManager {
     }
     }
 
 
     #[tracing::instrument(level = "trace", skip(self, editor_id), fields(editor_id), err)]
     #[tracing::instrument(level = "trace", skip(self, editor_id), fields(editor_id), err)]
-    pub fn close_document_editor<T: AsRef<str>>(&self, editor_id: T) -> Result<(), FlowyError> {
+    pub async fn close_document_editor<T: AsRef<str>>(&self, editor_id: T) -> Result<(), FlowyError> {
         let editor_id = editor_id.as_ref();
         let editor_id = editor_id.as_ref();
         tracing::Span::current().record("editor_id", &editor_id);
         tracing::Span::current().record("editor_id", &editor_id);
-        self.editor_map.remove(editor_id);
+        self.editor_map.write().await.remove(editor_id);
         Ok(())
         Ok(())
     }
     }
 
 
@@ -149,9 +151,9 @@ impl DocumentManager {
     pub async fn receive_ws_data(&self, data: Bytes) {
     pub async fn receive_ws_data(&self, data: Bytes) {
         let result: Result<ServerRevisionWSData, protobuf::ProtobufError> = data.try_into();
         let result: Result<ServerRevisionWSData, protobuf::ProtobufError> = data.try_into();
         match result {
         match result {
-            Ok(data) => match self.editor_map.get(&data.object_id) {
+            Ok(data) => match self.editor_map.read().await.get(&data.object_id) {
                 None => tracing::error!("Can't find any source handler for {:?}-{:?}", data.object_id, data.ty),
                 None => tracing::error!("Can't find any source handler for {:?}-{:?}", data.object_id, data.ty),
-                Some(editor) => match editor.receive_ws_data(data).await {
+                Some(handler) => match handler.0.receive_ws_data(data).await {
                     Ok(_) => {}
                     Ok(_) => {}
                     Err(e) => tracing::error!("{}", e),
                     Err(e) => tracing::error!("{}", e),
                 },
                 },
@@ -180,13 +182,13 @@ impl DocumentManager {
     /// returns: Result<Arc<DocumentEditor>, FlowyError>
     /// returns: Result<Arc<DocumentEditor>, FlowyError>
     ///
     ///
     async fn get_document_editor(&self, doc_id: &str) -> FlowyResult<Arc<dyn DocumentEditor>> {
     async fn get_document_editor(&self, doc_id: &str) -> FlowyResult<Arc<dyn DocumentEditor>> {
-        match self.editor_map.get(doc_id) {
+        match self.editor_map.read().await.get(doc_id) {
             None => {
             None => {
                 //
                 //
                 tracing::warn!("Should call init_document_editor first");
                 tracing::warn!("Should call init_document_editor first");
                 self.init_document_editor(doc_id).await
                 self.init_document_editor(doc_id).await
             }
             }
-            Some(editor) => Ok(editor),
+            Some(handler) => Ok(handler.0.clone()),
         }
         }
     }
     }
 
 
@@ -216,14 +218,20 @@ impl DocumentManager {
                     DeltaDocumentEditor::new(doc_id, user, rev_manager, self.rev_web_socket.clone(), cloud_service)
                     DeltaDocumentEditor::new(doc_id, user, rev_manager, self.rev_web_socket.clone(), cloud_service)
                         .await?,
                         .await?,
                 );
                 );
-                self.editor_map.insert(doc_id, editor.clone());
+                self.editor_map
+                    .write()
+                    .await
+                    .insert(doc_id.to_string(), RefCountDocumentHandler(editor.clone()));
                 Ok(editor)
                 Ok(editor)
             }
             }
             DocumentVersionPB::V1 => {
             DocumentVersionPB::V1 => {
                 let rev_manager = self.make_document_rev_manager(doc_id, pool.clone())?;
                 let rev_manager = self.make_document_rev_manager(doc_id, pool.clone())?;
                 let editor: Arc<dyn DocumentEditor> =
                 let editor: Arc<dyn DocumentEditor> =
                     Arc::new(AppFlowyDocumentEditor::new(doc_id, user, rev_manager, cloud_service).await?);
                     Arc::new(AppFlowyDocumentEditor::new(doc_id, user, rev_manager, cloud_service).await?);
-                self.editor_map.insert(doc_id, editor.clone());
+                self.editor_map
+                    .write()
+                    .await
+                    .insert(doc_id.to_string(), RefCountDocumentHandler(editor.clone()));
                 Ok(editor)
                 Ok(editor)
             }
             }
         }
         }
@@ -247,7 +255,7 @@ impl DocumentManager {
     ) -> Result<RevisionManager<Arc<ConnectionPool>>, FlowyError> {
     ) -> Result<RevisionManager<Arc<ConnectionPool>>, FlowyError> {
         let user_id = self.user.user_id()?;
         let user_id = self.user.user_id()?;
         let disk_cache = SQLiteDocumentRevisionPersistence::new(&user_id, pool.clone());
         let disk_cache = SQLiteDocumentRevisionPersistence::new(&user_id, pool.clone());
-        let configuration = RevisionPersistenceConfiguration::new(100);
+        let configuration = RevisionPersistenceConfiguration::new(100, true);
         let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache, configuration);
         let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache, configuration);
         // let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
         // let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
         let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool);
         let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool);
@@ -268,7 +276,7 @@ impl DocumentManager {
     ) -> Result<RevisionManager<Arc<ConnectionPool>>, FlowyError> {
     ) -> Result<RevisionManager<Arc<ConnectionPool>>, FlowyError> {
         let user_id = self.user.user_id()?;
         let user_id = self.user.user_id()?;
         let disk_cache = SQLiteDeltaDocumentRevisionPersistence::new(&user_id, pool.clone());
         let disk_cache = SQLiteDeltaDocumentRevisionPersistence::new(&user_id, pool.clone());
-        let configuration = RevisionPersistenceConfiguration::new(100);
+        let configuration = RevisionPersistenceConfiguration::new(100, true);
         let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache, configuration);
         let rev_persistence = RevisionPersistence::new(&user_id, doc_id, disk_cache, configuration);
         // let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
         // let history_persistence = SQLiteRevisionHistoryPersistence::new(doc_id, pool.clone());
         let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool);
         let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(doc_id, pool);
@@ -309,40 +317,32 @@ impl RevisionCloudService for DocumentRevisionCloudService {
     }
     }
 }
 }
 
 
-pub struct DocumentEditorMap {
-    inner: DashMap<String, Arc<dyn DocumentEditor>>,
-}
-
-impl DocumentEditorMap {
-    fn new() -> Self {
-        Self { inner: DashMap::new() }
-    }
+#[derive(Clone)]
+struct RefCountDocumentHandler(Arc<dyn DocumentEditor>);
 
 
-    pub(crate) fn insert(&self, editor_id: &str, editor: Arc<dyn DocumentEditor>) {
-        if self.inner.contains_key(editor_id) {
-            log::warn!("Editor:{} already open", editor_id);
-        }
-        self.inner.insert(editor_id.to_string(), editor);
+impl RefCountValue for RefCountDocumentHandler {
+    fn did_remove(&self) {
+        self.0.close();
     }
     }
+}
 
 
-    pub(crate) fn get(&self, editor_id: &str) -> Option<Arc<dyn DocumentEditor>> {
-        Some(self.inner.get(editor_id)?.clone())
-    }
+impl std::ops::Deref for RefCountDocumentHandler {
+    type Target = Arc<dyn DocumentEditor>;
 
 
-    pub(crate) fn remove(&self, editor_id: &str) {
-        if let Some(editor) = self.get(editor_id) {
-            editor.close()
-        }
-        self.inner.remove(editor_id);
+    fn deref(&self) -> &Self::Target {
+        &self.0
     }
     }
 }
 }
 
 
 #[tracing::instrument(level = "trace", skip(web_socket, handlers))]
 #[tracing::instrument(level = "trace", skip(web_socket, handlers))]
-fn listen_ws_state_changed(web_socket: Arc<dyn RevisionWebSocket>, handlers: Arc<DocumentEditorMap>) {
+fn listen_ws_state_changed(
+    web_socket: Arc<dyn RevisionWebSocket>,
+    handlers: Arc<RwLock<RefCountHashMap<RefCountDocumentHandler>>>,
+) {
     tokio::spawn(async move {
     tokio::spawn(async move {
         let mut notify = web_socket.subscribe_state_changed().await;
         let mut notify = web_socket.subscribe_state_changed().await;
         while let Ok(state) = notify.recv().await {
         while let Ok(state) = notify.recv().await {
-            handlers.inner.iter().for_each(|handler| {
+            handlers.read().await.values().iter().for_each(|handler| {
                 handler.receive_ws_state(&state);
                 handler.receive_ws_state(&state);
             })
             })
         }
         }

+ 1 - 1
frontend/rust-lib/flowy-folder/src/manager.rs

@@ -168,7 +168,7 @@ impl FolderManager {
         let pool = self.persistence.db_pool()?;
         let pool = self.persistence.db_pool()?;
         let object_id = folder_id.as_ref();
         let object_id = folder_id.as_ref();
         let disk_cache = SQLiteFolderRevisionPersistence::new(user_id, pool.clone());
         let disk_cache = SQLiteFolderRevisionPersistence::new(user_id, pool.clone());
-        let configuration = RevisionPersistenceConfiguration::new(100);
+        let configuration = RevisionPersistenceConfiguration::new(100, false);
         let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache, configuration);
         let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache, configuration);
         let rev_compactor = FolderRevisionCompress();
         let rev_compactor = FolderRevisionCompress();
         // let history_persistence = SQLiteRevisionHistoryPersistence::new(object_id, pool.clone());
         // let history_persistence = SQLiteRevisionHistoryPersistence::new(object_id, pool.clone());

+ 1 - 0
frontend/rust-lib/flowy-folder/tests/workspace/script.rs

@@ -70,6 +70,7 @@ pub enum FolderScript {
     DeleteAllTrash,
     DeleteAllTrash,
 
 
     // Sync
     // Sync
+    #[allow(dead_code)]
     AssertCurrentRevId(i64),
     AssertCurrentRevId(i64),
     AssertNextSyncRevId(Option<i64>),
     AssertNextSyncRevId(Option<i64>),
     AssertRevisionState {
     AssertRevisionState {

+ 9 - 25
frontend/rust-lib/flowy-grid/src/manager.rs

@@ -1,15 +1,15 @@
 use crate::entities::GridLayout;
 use crate::entities::GridLayout;
-use crate::services::block_editor::GridBlockRevisionCompress;
+
 use crate::services::grid_editor::{GridRevisionCompress, GridRevisionEditor};
 use crate::services::grid_editor::{GridRevisionCompress, GridRevisionEditor};
 use crate::services::grid_view_manager::make_grid_view_rev_manager;
 use crate::services::grid_view_manager::make_grid_view_rev_manager;
 use crate::services::persistence::block_index::BlockIndexCache;
 use crate::services::persistence::block_index::BlockIndexCache;
 use crate::services::persistence::kv::GridKVPersistence;
 use crate::services::persistence::kv::GridKVPersistence;
 use crate::services::persistence::migration::GridMigration;
 use crate::services::persistence::migration::GridMigration;
-use crate::services::persistence::rev_sqlite::{SQLiteGridBlockRevisionPersistence, SQLiteGridRevisionPersistence};
+use crate::services::persistence::rev_sqlite::SQLiteGridRevisionPersistence;
 use crate::services::persistence::GridDatabase;
 use crate::services::persistence::GridDatabase;
 use crate::services::tasks::GridTaskScheduler;
 use crate::services::tasks::GridTaskScheduler;
 use bytes::Bytes;
 use bytes::Bytes;
-use dashmap::DashMap;
+
 use flowy_database::ConnectionPool;
 use flowy_database::ConnectionPool;
 use flowy_error::{FlowyError, FlowyResult};
 use flowy_error::{FlowyError, FlowyResult};
 use flowy_grid_data_model::revision::{BuildGridContext, GridRevision, GridViewRevision};
 use flowy_grid_data_model::revision::{BuildGridContext, GridRevision, GridViewRevision};
@@ -20,7 +20,8 @@ use flowy_revision::{
 use flowy_sync::client_grid::{make_grid_block_operations, make_grid_operations, make_grid_view_operations};
 use flowy_sync::client_grid::{make_grid_block_operations, make_grid_operations, make_grid_view_operations};
 use flowy_sync::entities::revision::Revision;
 use flowy_sync::entities::revision::Revision;
 use lib_infra::ref_map::{RefCountHashMap, RefCountValue};
 use lib_infra::ref_map::{RefCountHashMap, RefCountValue};
-use std::collections::HashMap;
+
+use crate::services::block_manager::make_grid_block_rev_manager;
 use std::sync::Arc;
 use std::sync::Arc;
 use tokio::sync::RwLock;
 use tokio::sync::RwLock;
 
 
@@ -92,8 +93,7 @@ impl GridManager {
     #[tracing::instrument(level = "debug", skip_all, err)]
     #[tracing::instrument(level = "debug", skip_all, err)]
     pub async fn create_grid_block<T: AsRef<str>>(&self, block_id: T, revisions: Vec<Revision>) -> FlowyResult<()> {
     pub async fn create_grid_block<T: AsRef<str>>(&self, block_id: T, revisions: Vec<Revision>) -> FlowyResult<()> {
         let block_id = block_id.as_ref();
         let block_id = block_id.as_ref();
-        let db_pool = self.grid_user.db_pool()?;
-        let rev_manager = self.make_grid_block_rev_manager(block_id, db_pool)?;
+        let rev_manager = make_grid_block_rev_manager(&self.grid_user, block_id)?;
         let _ = rev_manager.reset_object(revisions).await?;
         let _ = rev_manager.reset_object(revisions).await?;
         Ok(())
         Ok(())
     }
     }
@@ -119,13 +119,13 @@ impl GridManager {
     pub async fn get_grid_editor(&self, grid_id: &str) -> FlowyResult<Arc<GridRevisionEditor>> {
     pub async fn get_grid_editor(&self, grid_id: &str) -> FlowyResult<Arc<GridRevisionEditor>> {
         match self.grid_editors.read().await.get(grid_id) {
         match self.grid_editors.read().await.get(grid_id) {
             None => Err(FlowyError::internal().context("Should call open_grid function first")),
             None => Err(FlowyError::internal().context("Should call open_grid function first")),
-            Some(editor) => Ok(editor.clone()),
+            Some(editor) => Ok(editor),
         }
         }
     }
     }
 
 
     async fn get_or_create_grid_editor(&self, grid_id: &str) -> FlowyResult<Arc<GridRevisionEditor>> {
     async fn get_or_create_grid_editor(&self, grid_id: &str) -> FlowyResult<Arc<GridRevisionEditor>> {
         if let Some(editor) = self.grid_editors.read().await.get(grid_id) {
         if let Some(editor) = self.grid_editors.read().await.get(grid_id) {
-            return Ok(editor.clone());
+            return Ok(editor);
         }
         }
 
 
         let db_pool = self.grid_user.db_pool()?;
         let db_pool = self.grid_user.db_pool()?;
@@ -164,29 +164,13 @@ impl GridManager {
     ) -> FlowyResult<RevisionManager<Arc<ConnectionPool>>> {
     ) -> FlowyResult<RevisionManager<Arc<ConnectionPool>>> {
         let user_id = self.grid_user.user_id()?;
         let user_id = self.grid_user.user_id()?;
         let disk_cache = SQLiteGridRevisionPersistence::new(&user_id, pool.clone());
         let disk_cache = SQLiteGridRevisionPersistence::new(&user_id, pool.clone());
-        let configuration = RevisionPersistenceConfiguration::new(2);
+        let configuration = RevisionPersistenceConfiguration::new(2, false);
         let rev_persistence = RevisionPersistence::new(&user_id, grid_id, disk_cache, configuration);
         let rev_persistence = RevisionPersistence::new(&user_id, grid_id, disk_cache, configuration);
         let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(grid_id, pool);
         let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(grid_id, pool);
         let rev_compactor = GridRevisionCompress();
         let rev_compactor = GridRevisionCompress();
         let rev_manager = RevisionManager::new(&user_id, grid_id, rev_persistence, rev_compactor, snapshot_persistence);
         let rev_manager = RevisionManager::new(&user_id, grid_id, rev_persistence, rev_compactor, snapshot_persistence);
         Ok(rev_manager)
         Ok(rev_manager)
     }
     }
-
-    fn make_grid_block_rev_manager(
-        &self,
-        block_id: &str,
-        pool: Arc<ConnectionPool>,
-    ) -> FlowyResult<RevisionManager<Arc<ConnectionPool>>> {
-        let user_id = self.grid_user.user_id()?;
-        let disk_cache = SQLiteGridBlockRevisionPersistence::new(&user_id, pool.clone());
-        let configuration = RevisionPersistenceConfiguration::new(4);
-        let rev_persistence = RevisionPersistence::new(&user_id, block_id, disk_cache, configuration);
-        let rev_compactor = GridBlockRevisionCompress();
-        let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(block_id, pool);
-        let rev_manager =
-            RevisionManager::new(&user_id, block_id, rev_persistence, rev_compactor, snapshot_persistence);
-        Ok(rev_manager)
-    }
 }
 }
 
 
 pub async fn make_grid_view_data(
 pub async fn make_grid_view_data(

+ 15 - 6
frontend/rust-lib/flowy-grid/src/services/block_manager.rs

@@ -6,6 +6,7 @@ use crate::services::persistence::block_index::BlockIndexCache;
 use crate::services::persistence::rev_sqlite::SQLiteGridBlockRevisionPersistence;
 use crate::services::persistence::rev_sqlite::SQLiteGridBlockRevisionPersistence;
 use crate::services::row::{block_from_row_orders, make_row_from_row_rev, GridBlockSnapshot};
 use crate::services::row::{block_from_row_orders, make_row_from_row_rev, GridBlockSnapshot};
 use dashmap::DashMap;
 use dashmap::DashMap;
+use flowy_database::ConnectionPool;
 use flowy_error::FlowyResult;
 use flowy_error::FlowyResult;
 use flowy_grid_data_model::revision::{
 use flowy_grid_data_model::revision::{
     GridBlockMetaRevision, GridBlockMetaRevisionChangeset, RowChangeset, RowRevision,
     GridBlockMetaRevision, GridBlockMetaRevisionChangeset, RowChangeset, RowRevision,
@@ -46,7 +47,7 @@ impl GridBlockManager {
         match self.block_editors.get(block_id) {
         match self.block_editors.get(block_id) {
             None => {
             None => {
                 tracing::error!("This is a fatal error, block with id:{} is not exist", block_id);
                 tracing::error!("This is a fatal error, block with id:{} is not exist", block_id);
-                let editor = Arc::new(make_block_editor(&self.user, block_id).await?);
+                let editor = Arc::new(make_grid_block_editor(&self.user, block_id).await?);
                 self.block_editors.insert(block_id.to_owned(), editor.clone());
                 self.block_editors.insert(block_id.to_owned(), editor.clone());
                 Ok(editor)
                 Ok(editor)
             }
             }
@@ -261,24 +262,32 @@ async fn make_block_editors(
 ) -> FlowyResult<DashMap<String, Arc<GridBlockRevisionEditor>>> {
 ) -> FlowyResult<DashMap<String, Arc<GridBlockRevisionEditor>>> {
     let editor_map = DashMap::new();
     let editor_map = DashMap::new();
     for block_meta_rev in block_meta_revs {
     for block_meta_rev in block_meta_revs {
-        let editor = make_block_editor(user, &block_meta_rev.block_id).await?;
+        let editor = make_grid_block_editor(user, &block_meta_rev.block_id).await?;
         editor_map.insert(block_meta_rev.block_id.clone(), Arc::new(editor));
         editor_map.insert(block_meta_rev.block_id.clone(), Arc::new(editor));
     }
     }
 
 
     Ok(editor_map)
     Ok(editor_map)
 }
 }
 
 
-async fn make_block_editor(user: &Arc<dyn GridUser>, block_id: &str) -> FlowyResult<GridBlockRevisionEditor> {
+async fn make_grid_block_editor(user: &Arc<dyn GridUser>, block_id: &str) -> FlowyResult<GridBlockRevisionEditor> {
     tracing::trace!("Open block:{} editor", block_id);
     tracing::trace!("Open block:{} editor", block_id);
     let token = user.token()?;
     let token = user.token()?;
     let user_id = user.user_id()?;
     let user_id = user.user_id()?;
-    let pool = user.db_pool()?;
+    let rev_manager = make_grid_block_rev_manager(user, block_id)?;
+    GridBlockRevisionEditor::new(&user_id, &token, block_id, rev_manager).await
+}
 
 
+pub fn make_grid_block_rev_manager(
+    user: &Arc<dyn GridUser>,
+    block_id: &str,
+) -> FlowyResult<RevisionManager<Arc<ConnectionPool>>> {
+    let user_id = user.user_id()?;
+    let pool = user.db_pool()?;
     let disk_cache = SQLiteGridBlockRevisionPersistence::new(&user_id, pool.clone());
     let disk_cache = SQLiteGridBlockRevisionPersistence::new(&user_id, pool.clone());
-    let configuration = RevisionPersistenceConfiguration::new(4);
+    let configuration = RevisionPersistenceConfiguration::new(4, false);
     let rev_persistence = RevisionPersistence::new(&user_id, block_id, disk_cache, configuration);
     let rev_persistence = RevisionPersistence::new(&user_id, block_id, disk_cache, configuration);
     let rev_compactor = GridBlockRevisionCompress();
     let rev_compactor = GridBlockRevisionCompress();
     let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(block_id, pool);
     let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(block_id, pool);
     let rev_manager = RevisionManager::new(&user_id, block_id, rev_persistence, rev_compactor, snapshot_persistence);
     let rev_manager = RevisionManager::new(&user_id, block_id, rev_persistence, rev_compactor, snapshot_persistence);
-    GridBlockRevisionEditor::new(&user_id, &token, block_id, rev_manager).await
+    Ok(rev_manager)
 }
 }

+ 7 - 1
frontend/rust-lib/flowy-grid/src/services/grid_editor.rs

@@ -94,7 +94,13 @@ impl GridRevisionEditor {
         Ok(editor)
         Ok(editor)
     }
     }
 
 
-    pub fn close(&self) {}
+    #[tracing::instrument(name = "close grid editor", level = "trace", skip_all)]
+    pub fn close(&self) {
+        let rev_manager = self.rev_manager.clone();
+        tokio::spawn(async move {
+            rev_manager.close().await;
+        });
+    }
 
 
     /// Save the type-option data to disk and send a `GridNotification::DidUpdateField` notification
     /// Save the type-option data to disk and send a `GridNotification::DidUpdateField` notification
     /// to dart side.
     /// to dart side.

+ 1 - 1
frontend/rust-lib/flowy-grid/src/services/grid_view_manager.rs

@@ -255,7 +255,7 @@ pub async fn make_grid_view_rev_manager(
     let pool = user.db_pool()?;
     let pool = user.db_pool()?;
 
 
     let disk_cache = SQLiteGridViewRevisionPersistence::new(&user_id, pool.clone());
     let disk_cache = SQLiteGridViewRevisionPersistence::new(&user_id, pool.clone());
-    let configuration = RevisionPersistenceConfiguration::new(2);
+    let configuration = RevisionPersistenceConfiguration::new(2, false);
     let rev_persistence = RevisionPersistence::new(&user_id, view_id, disk_cache, configuration);
     let rev_persistence = RevisionPersistence::new(&user_id, view_id, disk_cache, configuration);
     let rev_compactor = GridViewRevisionCompress();
     let rev_compactor = GridViewRevisionCompress();
 
 

+ 1 - 1
frontend/rust-lib/flowy-grid/src/services/mod.rs

@@ -1,7 +1,7 @@
 mod util;
 mod util;
 
 
 pub mod block_editor;
 pub mod block_editor;
-mod block_manager;
+pub mod block_manager;
 mod block_manager_trait_impl;
 mod block_manager_trait_impl;
 pub mod cell;
 pub mod cell;
 pub mod field;
 pub mod field;

+ 12 - 6
frontend/rust-lib/flowy-grid/src/services/tasks/scheduler.rs

@@ -1,4 +1,4 @@
-use crate::services::tasks::queue::{GridTaskQueue, TaskHandlerId};
+use crate::services::tasks::queue::GridTaskQueue;
 use crate::services::tasks::runner::GridTaskRunner;
 use crate::services::tasks::runner::GridTaskRunner;
 use crate::services::tasks::store::GridTaskStore;
 use crate::services::tasks::store::GridTaskStore;
 use crate::services::tasks::task::Task;
 use crate::services::tasks::task::Task;
@@ -7,22 +7,28 @@ use crate::services::tasks::{TaskContent, TaskId, TaskStatus};
 use flowy_error::FlowyError;
 use flowy_error::FlowyError;
 use lib_infra::future::BoxResultFuture;
 use lib_infra::future::BoxResultFuture;
 use lib_infra::ref_map::{RefCountHashMap, RefCountValue};
 use lib_infra::ref_map::{RefCountHashMap, RefCountValue};
-use std::collections::HashMap;
+
 use std::sync::Arc;
 use std::sync::Arc;
 use std::time::Duration;
 use std::time::Duration;
 use tokio::sync::{watch, RwLock};
 use tokio::sync::{watch, RwLock};
 
 
-pub(crate) trait GridTaskHandler: Send + Sync + 'static + RefCountValue {
+pub(crate) trait GridTaskHandler: Send + Sync + 'static {
     fn handler_id(&self) -> &str;
     fn handler_id(&self) -> &str;
 
 
     fn process_content(&self, content: TaskContent) -> BoxResultFuture<(), FlowyError>;
     fn process_content(&self, content: TaskContent) -> BoxResultFuture<(), FlowyError>;
 }
 }
 
 
+#[derive(Clone)]
+struct RefCountTaskHandler(Arc<dyn GridTaskHandler>);
+impl RefCountValue for RefCountTaskHandler {
+    fn did_remove(&self) {}
+}
+
 pub struct GridTaskScheduler {
 pub struct GridTaskScheduler {
     queue: GridTaskQueue,
     queue: GridTaskQueue,
     store: GridTaskStore,
     store: GridTaskStore,
     notifier: watch::Sender<bool>,
     notifier: watch::Sender<bool>,
-    handlers: RefCountHashMap<Arc<dyn GridTaskHandler>>,
+    handlers: RefCountHashMap<RefCountTaskHandler>,
 }
 }
 
 
 impl GridTaskScheduler {
 impl GridTaskScheduler {
@@ -51,7 +57,7 @@ impl GridTaskScheduler {
         T: GridTaskHandler,
         T: GridTaskHandler,
     {
     {
         let handler_id = handler.handler_id().to_owned();
         let handler_id = handler.handler_id().to_owned();
-        self.handlers.insert(handler_id, handler);
+        self.handlers.insert(handler_id, RefCountTaskHandler(handler));
     }
     }
 
 
     pub(crate) fn unregister_handler<T: AsRef<str>>(&mut self, handler_id: T) {
     pub(crate) fn unregister_handler<T: AsRef<str>>(&mut self, handler_id: T) {
@@ -74,7 +80,7 @@ impl GridTaskScheduler {
         let content = task.content.take()?;
         let content = task.content.take()?;
 
 
         task.set_status(TaskStatus::Processing);
         task.set_status(TaskStatus::Processing);
-        let _ = match handler.process_content(content).await {
+        let _ = match handler.0.process_content(content).await {
             Ok(_) => {
             Ok(_) => {
                 task.set_status(TaskStatus::Done);
                 task.set_status(TaskStatus::Done);
                 let _ = ret.send(task.into());
                 let _ = ret.send(task.into());

+ 1 - 1
frontend/rust-lib/flowy-revision/src/cache/reset.rs

@@ -60,7 +60,7 @@ where
     }
     }
 
 
     async fn reset_object(&self) -> FlowyResult<()> {
     async fn reset_object(&self) -> FlowyResult<()> {
-        let configuration = RevisionPersistenceConfiguration::new(2);
+        let configuration = RevisionPersistenceConfiguration::new(2, false);
         let rev_persistence = Arc::new(RevisionPersistence::from_disk_cache(
         let rev_persistence = Arc::new(RevisionPersistence::from_disk_cache(
             &self.user_id,
             &self.user_id,
             self.target.target_id(),
             self.target.target_id(),

+ 4 - 0
frontend/rust-lib/flowy-revision/src/rev_manager.rs

@@ -123,6 +123,10 @@ impl<Connection: 'static> RevisionManager<Connection> {
         B::deserialize_revisions(&self.object_id, revisions)
         B::deserialize_revisions(&self.object_id, revisions)
     }
     }
 
 
+    pub async fn close(&self) {
+        let _ = self.rev_persistence.compact_lagging_revisions(&self.rev_compress).await;
+    }
+
     pub async fn load_revisions(&self) -> FlowyResult<Vec<Revision>> {
     pub async fn load_revisions(&self) -> FlowyResult<Vec<Revision>> {
         let revisions = RevisionLoader {
         let revisions = RevisionLoader {
             object_id: self.object_id.clone(),
             object_id: self.object_id.clone(),

+ 45 - 5
frontend/rust-lib/flowy-revision/src/rev_persistence.rs

@@ -17,22 +17,32 @@ pub const REVISION_WRITE_INTERVAL_IN_MILLIS: u64 = 600;
 #[derive(Clone)]
 #[derive(Clone)]
 pub struct RevisionPersistenceConfiguration {
 pub struct RevisionPersistenceConfiguration {
     merge_threshold: usize,
     merge_threshold: usize,
+    merge_lagging: bool,
 }
 }
 
 
 impl RevisionPersistenceConfiguration {
 impl RevisionPersistenceConfiguration {
-    pub fn new(merge_threshold: usize) -> Self {
+    pub fn new(merge_threshold: usize, merge_lagging: bool) -> Self {
         debug_assert!(merge_threshold > 1);
         debug_assert!(merge_threshold > 1);
         if merge_threshold > 1 {
         if merge_threshold > 1 {
-            Self { merge_threshold }
+            Self {
+                merge_threshold,
+                merge_lagging,
+            }
         } else {
         } else {
-            Self { merge_threshold: 100 }
+            Self {
+                merge_threshold: 100,
+                merge_lagging,
+            }
         }
         }
     }
     }
 }
 }
 
 
 impl std::default::Default for RevisionPersistenceConfiguration {
 impl std::default::Default for RevisionPersistenceConfiguration {
     fn default() -> Self {
     fn default() -> Self {
-        Self { merge_threshold: 100 }
+        Self {
+            merge_threshold: 100,
+            merge_lagging: false,
+        }
     }
     }
 }
 }
 
 
@@ -98,6 +108,36 @@ where
         Ok(())
         Ok(())
     }
     }
 
 
+    #[tracing::instrument(level = "trace", skip_all, err)]
+    pub async fn compact_lagging_revisions<'a>(
+        &'a self,
+        rev_compress: &Arc<dyn RevisionMergeable + 'a>,
+    ) -> FlowyResult<()> {
+        if !self.configuration.merge_lagging {
+            return Ok(());
+        }
+
+        let mut sync_seq = self.sync_seq.write().await;
+        let compact_seq = sync_seq.compact();
+        if !compact_seq.is_empty() {
+            let range = RevisionRange {
+                start: *compact_seq.front().unwrap(),
+                end: *compact_seq.back().unwrap(),
+            };
+
+            let revisions = self.revisions_in_range(&range).await?;
+            debug_assert_eq!(range.len() as usize, revisions.len());
+            // compact multiple revisions into one
+            let merged_revision = rev_compress.merge_revisions(&self.user_id, &self.object_id, revisions)?;
+            tracing::Span::current().record("rev_id", &merged_revision.rev_id);
+            let _ = sync_seq.recv(merged_revision.rev_id)?;
+
+            // replace the revisions in range with compact revision
+            self.compact(&range, merged_revision).await?;
+        }
+        Ok(())
+    }
+
     /// Save the revision to disk and append it to the end of the sync sequence.
     /// Save the revision to disk and append it to the end of the sync sequence.
     #[tracing::instrument(level = "trace", skip_all, fields(rev_id, compact_range, object_id=%self.object_id), err)]
     #[tracing::instrument(level = "trace", skip_all, fields(rev_id, compact_range, object_id=%self.object_id), err)]
     pub(crate) async fn add_sync_revision<'a>(
     pub(crate) async fn add_sync_revision<'a>(
@@ -108,7 +148,7 @@ where
         let mut sync_seq = self.sync_seq.write().await;
         let mut sync_seq = self.sync_seq.write().await;
         let compact_length = sync_seq.compact_length;
         let compact_length = sync_seq.compact_length;
 
 
-        // Before the new_revision is pushed into the sync_seq, we check if the current `step` of the
+        // Before the new_revision is pushed into the sync_seq, we check if the current `compact_length` of the
         // sync_seq is less equal to or greater than the merge threshold. If yes, it's needs to merged
         // sync_seq is less equal to or greater than the merge threshold. If yes, it's needs to merged
         // with the new_revision into one revision.
         // with the new_revision into one revision.
         let mut compact_seq = VecDeque::default();
         let mut compact_seq = VecDeque::default();

+ 1 - 1
frontend/rust-lib/flowy-revision/tests/revision_test/script.rs

@@ -63,7 +63,7 @@ impl RevisionTest {
     pub async fn new_with_configuration(merge_threshold: i64) -> Self {
     pub async fn new_with_configuration(merge_threshold: i64) -> Self {
         let user_id = nanoid!(10);
         let user_id = nanoid!(10);
         let object_id = nanoid!(6);
         let object_id = nanoid!(6);
-        let configuration = RevisionPersistenceConfiguration::new(merge_threshold as usize);
+        let configuration = RevisionPersistenceConfiguration::new(merge_threshold as usize, false);
         let disk_cache = RevisionDiskCacheMock::new(vec![]);
         let disk_cache = RevisionDiskCacheMock::new(vec![]);
         let persistence = RevisionPersistence::new(&user_id, &object_id, disk_cache, configuration.clone());
         let persistence = RevisionPersistence::new(&user_id, &object_id, disk_cache, configuration.clone());
         let compress = RevisionCompressMock {};
         let compress = RevisionCompressMock {};

+ 1 - 1
frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs

@@ -165,7 +165,7 @@ impl ViewDataProcessor for DocumentViewDataProcessor {
         let manager = self.0.clone();
         let manager = self.0.clone();
         let view_id = view_id.to_string();
         let view_id = view_id.to_string();
         FutureResult::new(async move {
         FutureResult::new(async move {
-            let _ = manager.close_document_editor(view_id)?;
+            let _ = manager.close_document_editor(view_id).await?;
             Ok(())
             Ok(())
         })
         })
     }
     }

+ 12 - 2
shared-lib/lib-infra/src/ref_map.rs

@@ -22,16 +22,26 @@ impl<T> RefCountHandler<T> {
 
 
 pub struct RefCountHashMap<T>(HashMap<String, RefCountHandler<T>>);
 pub struct RefCountHashMap<T>(HashMap<String, RefCountHandler<T>>);
 
 
+impl<T> std::default::Default for RefCountHashMap<T> {
+    fn default() -> Self {
+        Self(HashMap::new())
+    }
+}
+
 impl<T> RefCountHashMap<T>
 impl<T> RefCountHashMap<T>
 where
 where
     T: Clone + Send + Sync + RefCountValue,
     T: Clone + Send + Sync + RefCountValue,
 {
 {
     pub fn new() -> Self {
     pub fn new() -> Self {
-        Self(Default::default())
+        Self::default()
     }
     }
 
 
     pub fn get(&self, key: &str) -> Option<T> {
     pub fn get(&self, key: &str) -> Option<T> {
-        self.0.get(key).and_then(|handler| Some(handler.inner.clone()))
+        self.0.get(key).map(|handler| handler.inner.clone())
+    }
+
+    pub fn values(&self) -> Vec<T> {
+        self.0.values().map(|value| value.inner.clone()).collect::<Vec<T>>()
     }
     }
 
 
     pub fn insert(&mut self, key: String, value: T) {
     pub fn insert(&mut self, key: String, value: T) {