Browse Source

chore: calculate the compact length after receiving ack

appflowy 2 years ago
parent
commit
b3b24d0cc0

+ 1 - 0
frontend/rust-lib/Cargo.lock

@@ -1072,6 +1072,7 @@ dependencies = [
  "bytes",
  "bytes",
  "dashmap",
  "dashmap",
  "flowy-error",
  "flowy-error",
+ "flowy-revision",
  "flowy-sync",
  "flowy-sync",
  "futures-util",
  "futures-util",
  "lib-infra",
  "lib-infra",

+ 3 - 1
frontend/rust-lib/flowy-document/src/editor/editor.rs

@@ -29,7 +29,9 @@ impl AppFlowyDocumentEditor {
         mut rev_manager: RevisionManager<Arc<ConnectionPool>>,
         mut rev_manager: RevisionManager<Arc<ConnectionPool>>,
         cloud_service: Arc<dyn RevisionCloudService>,
         cloud_service: Arc<dyn RevisionCloudService>,
     ) -> FlowyResult<Arc<Self>> {
     ) -> FlowyResult<Arc<Self>> {
-        let document = rev_manager.load::<DocumentRevisionSerde>(Some(cloud_service)).await?;
+        let document = rev_manager
+            .initialize::<DocumentRevisionSerde>(Some(cloud_service))
+            .await?;
         let rev_manager = Arc::new(rev_manager);
         let rev_manager = Arc::new(rev_manager);
         let command_sender = spawn_edit_queue(user, rev_manager.clone(), document);
         let command_sender = spawn_edit_queue(user, rev_manager.clone(), document);
         let doc_id = doc_id.to_string();
         let doc_id = doc_id.to_string();

+ 1 - 1
frontend/rust-lib/flowy-document/src/old_editor/editor.rs

@@ -45,7 +45,7 @@ impl DeltaDocumentEditor {
         cloud_service: Arc<dyn RevisionCloudService>,
         cloud_service: Arc<dyn RevisionCloudService>,
     ) -> FlowyResult<Arc<Self>> {
     ) -> FlowyResult<Arc<Self>> {
         let document = rev_manager
         let document = rev_manager
-            .load::<DeltaDocumentRevisionSerde>(Some(cloud_service))
+            .initialize::<DeltaDocumentRevisionSerde>(Some(cloud_service))
             .await?;
             .await?;
         let operations = DeltaTextOperations::from_bytes(&document.content)?;
         let operations = DeltaTextOperations::from_bytes(&document.content)?;
         let rev_manager = Arc::new(rev_manager);
         let rev_manager = Arc::new(rev_manager);

+ 3 - 1
frontend/rust-lib/flowy-folder/src/services/folder_editor.rs

@@ -38,7 +38,9 @@ impl FolderEditor {
         let cloud = Arc::new(FolderRevisionCloudService {
         let cloud = Arc::new(FolderRevisionCloudService {
             token: token.to_string(),
             token: token.to_string(),
         });
         });
-        let folder = Arc::new(RwLock::new(rev_manager.load::<FolderRevisionSerde>(Some(cloud)).await?));
+        let folder = Arc::new(RwLock::new(
+            rev_manager.initialize::<FolderRevisionSerde>(Some(cloud)).await?,
+        ));
         let rev_manager = Arc::new(rev_manager);
         let rev_manager = Arc::new(rev_manager);
 
 
         #[cfg(feature = "sync")]
         #[cfg(feature = "sync")]

+ 1 - 1
frontend/rust-lib/flowy-grid/src/services/block_editor.rs

@@ -34,7 +34,7 @@ impl GridBlockRevisionEditor {
         let cloud = Arc::new(GridBlockRevisionCloudService {
         let cloud = Arc::new(GridBlockRevisionCloudService {
             token: token.to_owned(),
             token: token.to_owned(),
         });
         });
-        let block_revision_pad = rev_manager.load::<GridBlockRevisionSerde>(Some(cloud)).await?;
+        let block_revision_pad = rev_manager.initialize::<GridBlockRevisionSerde>(Some(cloud)).await?;
         let pad = Arc::new(RwLock::new(block_revision_pad));
         let pad = Arc::new(RwLock::new(block_revision_pad));
         let rev_manager = Arc::new(rev_manager);
         let rev_manager = Arc::new(rev_manager);
         let user_id = user_id.to_owned();
         let user_id = user_id.to_owned();

+ 1 - 1
frontend/rust-lib/flowy-grid/src/services/grid_editor.rs

@@ -60,7 +60,7 @@ impl GridRevisionEditor {
     ) -> FlowyResult<Arc<Self>> {
     ) -> FlowyResult<Arc<Self>> {
         let token = user.token()?;
         let token = user.token()?;
         let cloud = Arc::new(GridRevisionCloudService { token });
         let cloud = Arc::new(GridRevisionCloudService { token });
-        let grid_pad = rev_manager.load::<GridRevisionSerde>(Some(cloud)).await?;
+        let grid_pad = rev_manager.initialize::<GridRevisionSerde>(Some(cloud)).await?;
         let rev_manager = Arc::new(rev_manager);
         let rev_manager = Arc::new(rev_manager);
         let grid_pad = Arc::new(RwLock::new(grid_pad));
         let grid_pad = Arc::new(RwLock::new(grid_pad));
 
 

+ 1 - 1
frontend/rust-lib/flowy-grid/src/services/grid_view_editor.rs

@@ -55,7 +55,7 @@ impl GridViewRevisionEditor {
         let cloud = Arc::new(GridViewRevisionCloudService {
         let cloud = Arc::new(GridViewRevisionCloudService {
             token: token.to_owned(),
             token: token.to_owned(),
         });
         });
-        let view_revision_pad = rev_manager.load::<GridViewRevisionSerde>(Some(cloud)).await?;
+        let view_revision_pad = rev_manager.initialize::<GridViewRevisionSerde>(Some(cloud)).await?;
         let pad = Arc::new(RwLock::new(view_revision_pad));
         let pad = Arc::new(RwLock::new(view_revision_pad));
         let rev_manager = Arc::new(rev_manager);
         let rev_manager = Arc::new(rev_manager);
         let group_controller = new_group_controller(
         let group_controller = new_group_controller(

+ 1 - 0
frontend/rust-lib/flowy-revision/Cargo.toml

@@ -23,6 +23,7 @@ serde_json = {version = "1.0"}
 
 
 [dev-dependencies]
 [dev-dependencies]
 nanoid = "0.4.0"
 nanoid = "0.4.0"
+flowy-revision = {path = ".", features = ["flowy_unit_test"]}
 serde = { version = "1.0", features = ["derive"] }
 serde = { version = "1.0", features = ["derive"] }
 serde_json = { version = "1.0" }
 serde_json = { version = "1.0" }
 parking_lot = "0.11"
 parking_lot = "0.11"

+ 11 - 4
frontend/rust-lib/flowy-revision/src/rev_manager.rs

@@ -108,7 +108,7 @@ impl<Connection: 'static> RevisionManager<Connection> {
     }
     }
 
 
     #[tracing::instrument(level = "debug", skip_all, fields(object_id) err)]
     #[tracing::instrument(level = "debug", skip_all, fields(object_id) err)]
-    pub async fn load<B>(&mut self, cloud: Option<Arc<dyn RevisionCloudService>>) -> FlowyResult<B::Output>
+    pub async fn initialize<B>(&mut self, cloud: Option<Arc<dyn RevisionCloudService>>) -> FlowyResult<B::Output>
     where
     where
         B: RevisionObjectDeserializer,
         B: RevisionObjectDeserializer,
     {
     {
@@ -199,6 +199,10 @@ impl<Connection: 'static> RevisionManager<Connection> {
         self.rev_persistence.number_of_sync_records()
         self.rev_persistence.number_of_sync_records()
     }
     }
 
 
+    pub fn number_of_revisions_in_disk(&self) -> usize {
+        self.rev_persistence.number_of_records_in_disk()
+    }
+
     pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result<Vec<Revision>, FlowyError> {
     pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result<Vec<Revision>, FlowyError> {
         let revisions = self.rev_persistence.revisions_in_range(&range).await?;
         let revisions = self.rev_persistence.revisions_in_range(&range).await?;
         Ok(revisions)
         Ok(revisions)
@@ -230,13 +234,16 @@ impl<Connection: 'static> WSDataProviderDataSource for Arc<RevisionManager<Conne
 }
 }
 
 
 #[cfg(feature = "flowy_unit_test")]
 #[cfg(feature = "flowy_unit_test")]
-impl<Connection> RevisionManager<Connection> {
+impl<Connection: 'static> RevisionManager<Connection> {
     pub async fn revision_cache(&self) -> Arc<RevisionPersistence<Connection>> {
     pub async fn revision_cache(&self) -> Arc<RevisionPersistence<Connection>> {
         self.rev_persistence.clone()
         self.rev_persistence.clone()
     }
     }
     pub fn ack_notify(&self) -> tokio::sync::broadcast::Receiver<i64> {
     pub fn ack_notify(&self) -> tokio::sync::broadcast::Receiver<i64> {
         self.rev_ack_notifier.subscribe()
         self.rev_ack_notifier.subscribe()
     }
     }
+    pub fn get_all_revision_records(&self) -> FlowyResult<Vec<crate::disk::SyncRecord>> {
+        self.rev_persistence.load_all_records(&self.object_id)
+    }
 }
 }
 
 
 pub struct RevisionLoader<Connection> {
 pub struct RevisionLoader<Connection> {
@@ -248,7 +255,7 @@ pub struct RevisionLoader<Connection> {
 
 
 impl<Connection: 'static> RevisionLoader<Connection> {
 impl<Connection: 'static> RevisionLoader<Connection> {
     pub async fn load(&self) -> Result<(Vec<Revision>, i64), FlowyError> {
     pub async fn load(&self) -> Result<(Vec<Revision>, i64), FlowyError> {
-        let records = self.rev_persistence.batch_get(&self.object_id)?;
+        let records = self.rev_persistence.load_all_records(&self.object_id)?;
         let revisions: Vec<Revision>;
         let revisions: Vec<Revision>;
         let mut rev_id = 0;
         let mut rev_id = 0;
         if records.is_empty() && self.cloud.is_some() {
         if records.is_empty() && self.cloud.is_some() {
@@ -282,7 +289,7 @@ impl<Connection: 'static> RevisionLoader<Connection> {
     }
     }
 
 
     pub async fn load_revisions(&self) -> Result<Vec<Revision>, FlowyError> {
     pub async fn load_revisions(&self) -> Result<Vec<Revision>, FlowyError> {
-        let records = self.rev_persistence.batch_get(&self.object_id)?;
+        let records = self.rev_persistence.load_all_records(&self.object_id)?;
         let revisions = records.into_iter().map(|record| record.revision).collect::<_>();
         let revisions = records.into_iter().map(|record| record.revision).collect::<_>();
         Ok(revisions)
         Ok(revisions)
     }
     }

+ 65 - 37
frontend/rust-lib/flowy-revision/src/rev_persistence.rs

@@ -14,6 +14,7 @@ use tokio::task::spawn_blocking;
 
 
 pub const REVISION_WRITE_INTERVAL_IN_MILLIS: u64 = 600;
 pub const REVISION_WRITE_INTERVAL_IN_MILLIS: u64 = 600;
 
 
+#[derive(Clone)]
 pub struct RevisionPersistenceConfiguration {
 pub struct RevisionPersistenceConfiguration {
     merge_threshold: usize,
     merge_threshold: usize,
 }
 }
@@ -24,14 +25,14 @@ impl RevisionPersistenceConfiguration {
         if merge_threshold > 1 {
         if merge_threshold > 1 {
             Self { merge_threshold }
             Self { merge_threshold }
         } else {
         } else {
-            Self { merge_threshold: 2 }
+            Self { merge_threshold: 100 }
         }
         }
     }
     }
 }
 }
 
 
 impl std::default::Default for RevisionPersistenceConfiguration {
 impl std::default::Default for RevisionPersistenceConfiguration {
     fn default() -> Self {
     fn default() -> Self {
-        Self { merge_threshold: 2 }
+        Self { merge_threshold: 100 }
     }
     }
 }
 }
 
 
@@ -93,7 +94,7 @@ where
     pub(crate) async fn sync_revision(&self, revision: &Revision) -> FlowyResult<()> {
     pub(crate) async fn sync_revision(&self, revision: &Revision) -> FlowyResult<()> {
         tracing::Span::current().record("rev_id", &revision.rev_id);
         tracing::Span::current().record("rev_id", &revision.rev_id);
         self.add(revision.clone(), RevisionState::Sync, false).await?;
         self.add(revision.clone(), RevisionState::Sync, false).await?;
-        self.sync_seq.write().await.dry_push(revision.rev_id)?;
+        self.sync_seq.write().await.recv(revision.rev_id)?;
         Ok(())
         Ok(())
     }
     }
 
 
@@ -105,13 +106,17 @@ where
         rev_compress: &Arc<dyn RevisionMergeable + 'a>,
         rev_compress: &Arc<dyn RevisionMergeable + 'a>,
     ) -> FlowyResult<i64> {
     ) -> FlowyResult<i64> {
         let mut sync_seq = self.sync_seq.write().await;
         let mut sync_seq = self.sync_seq.write().await;
-        let step = sync_seq.step;
+        let compact_length = sync_seq.compact_length;
 
 
-        // Before the new_revision pushed into the sync_seq, we check if the current `step` of the
-        // sync_seq is less equal or greater than the merge threshold. If yes, it's need to merged
+        // Before the new_revision is pushed into the sync_seq, we check if the current `step` of the
+        // sync_seq is less equal to or greater than the merge threshold. If yes, it's needs to merged
         // with the new_revision into one revision.
         // with the new_revision into one revision.
-        if step >= self.configuration.merge_threshold - 1 {
-            let compact_seq = sync_seq.compact();
+        let mut compact_seq = VecDeque::default();
+        // tracing::info!("{}", compact_seq)
+        if compact_length >= self.configuration.merge_threshold - 1 {
+            compact_seq.extend(sync_seq.compact());
+        }
+        if !compact_seq.is_empty() {
             let range = RevisionRange {
             let range = RevisionRange {
                 start: *compact_seq.front().unwrap(),
                 start: *compact_seq.front().unwrap(),
                 end: *compact_seq.back().unwrap(),
                 end: *compact_seq.back().unwrap(),
@@ -127,7 +132,7 @@ where
             let merged_revision = rev_compress.merge_revisions(&self.user_id, &self.object_id, revisions)?;
             let merged_revision = rev_compress.merge_revisions(&self.user_id, &self.object_id, revisions)?;
             let rev_id = merged_revision.rev_id;
             let rev_id = merged_revision.rev_id;
             tracing::Span::current().record("rev_id", &merged_revision.rev_id);
             tracing::Span::current().record("rev_id", &merged_revision.rev_id);
-            let _ = sync_seq.dry_push(merged_revision.rev_id)?;
+            let _ = sync_seq.recv(merged_revision.rev_id)?;
 
 
             // replace the revisions in range with compact revision
             // replace the revisions in range with compact revision
             self.compact(&range, merged_revision).await?;
             self.compact(&range, merged_revision).await?;
@@ -135,7 +140,7 @@ where
         } else {
         } else {
             tracing::Span::current().record("rev_id", &new_revision.rev_id);
             tracing::Span::current().record("rev_id", &new_revision.rev_id);
             self.add(new_revision.clone(), RevisionState::Sync, true).await?;
             self.add(new_revision.clone(), RevisionState::Sync, true).await?;
-            sync_seq.push(new_revision.rev_id)?;
+            sync_seq.merge_recv(new_revision.rev_id)?;
             Ok(new_revision.rev_id)
             Ok(new_revision.rev_id)
         }
         }
     }
     }
@@ -163,6 +168,16 @@ where
         self.memory_cache.number_of_sync_records()
         self.memory_cache.number_of_sync_records()
     }
     }
 
 
+    pub(crate) fn number_of_records_in_disk(&self) -> usize {
+        match self.disk_cache.read_revision_records(&self.object_id, None) {
+            Ok(records) => records.len(),
+            Err(e) => {
+                tracing::error!("Read revision records failed: {:?}", e);
+                0
+            }
+        }
+    }
+
     /// The cache gets reset while it conflicts with the remote revisions.
     /// The cache gets reset while it conflicts with the remote revisions.
     #[tracing::instrument(level = "trace", skip(self, revisions), err)]
     #[tracing::instrument(level = "trace", skip(self, revisions), err)]
     pub(crate) async fn reset(&self, revisions: Vec<Revision>) -> FlowyResult<()> {
     pub(crate) async fn reset(&self, revisions: Vec<Revision>) -> FlowyResult<()> {
@@ -228,8 +243,8 @@ where
         }
         }
     }
     }
 
 
-    pub fn batch_get(&self, doc_id: &str) -> FlowyResult<Vec<SyncRecord>> {
-        self.disk_cache.read_revision_records(doc_id, None)
+    pub fn load_all_records(&self, object_id: &str) -> FlowyResult<Vec<SyncRecord>> {
+        self.disk_cache.read_revision_records(object_id, None)
     }
     }
 
 
     // Read the revision which rev_id >= range.start && rev_id <= range.end
     // Read the revision which rev_id >= range.start && rev_id <= range.end
@@ -289,8 +304,8 @@ impl<C> RevisionMemoryCacheDelegate for Arc<dyn RevisionDiskCache<C, Error = Flo
 #[derive(Default)]
 #[derive(Default)]
 struct DeferSyncSequence {
 struct DeferSyncSequence {
     rev_ids: VecDeque<i64>,
     rev_ids: VecDeque<i64>,
-    start: Option<usize>,
-    step: usize,
+    compact_index: Option<usize>,
+    compact_length: usize,
 }
 }
 
 
 impl DeferSyncSequence {
 impl DeferSyncSequence {
@@ -298,17 +313,22 @@ impl DeferSyncSequence {
         DeferSyncSequence::default()
         DeferSyncSequence::default()
     }
     }
 
 
-    fn push(&mut self, new_rev_id: i64) -> FlowyResult<()> {
-        let _ = self.dry_push(new_rev_id)?;
+    /// Pushes the new_rev_id to the end of the list and marks this new_rev_id is mergeable.
+    ///
+    /// When calling `compact` method, it will return a list of revision ids started from
+    /// the `compact_start_pos`, and ends with the `compact_length`.
+    fn merge_recv(&mut self, new_rev_id: i64) -> FlowyResult<()> {
+        let _ = self.recv(new_rev_id)?;
 
 
-        self.step += 1;
-        if self.start.is_none() && !self.rev_ids.is_empty() {
-            self.start = Some(self.rev_ids.len() - 1);
+        self.compact_length += 1;
+        if self.compact_index.is_none() && !self.rev_ids.is_empty() {
+            self.compact_index = Some(self.rev_ids.len() - 1);
         }
         }
         Ok(())
         Ok(())
     }
     }
 
 
-    fn dry_push(&mut self, new_rev_id: i64) -> FlowyResult<()> {
+    /// Pushes the new_rev_id to the end of the list.
+    fn recv(&mut self, new_rev_id: i64) -> FlowyResult<()> {
         // The last revision's rev_id must be greater than the new one.
         // The last revision's rev_id must be greater than the new one.
         if let Some(rev_id) = self.rev_ids.back() {
         if let Some(rev_id) = self.rev_ids.back() {
             if *rev_id >= new_rev_id {
             if *rev_id >= new_rev_id {
@@ -321,6 +341,7 @@ impl DeferSyncSequence {
         Ok(())
         Ok(())
     }
     }
 
 
+    /// Removes the rev_id from the list
     fn ack(&mut self, rev_id: &i64) -> FlowyResult<()> {
     fn ack(&mut self, rev_id: &i64) -> FlowyResult<()> {
         let cur_rev_id = self.rev_ids.front().cloned();
         let cur_rev_id = self.rev_ids.front().cloned();
         if let Some(pop_rev_id) = cur_rev_id {
         if let Some(pop_rev_id) = cur_rev_id {
@@ -331,7 +352,20 @@ impl DeferSyncSequence {
                 );
                 );
                 return Err(FlowyError::internal().context(desc));
                 return Err(FlowyError::internal().context(desc));
             }
             }
-            let _ = self.rev_ids.pop_front();
+
+            let mut compact_rev_id = None;
+            if let Some(compact_index) = self.compact_index {
+                compact_rev_id = self.rev_ids.get(compact_index).cloned();
+            }
+
+            let pop_rev_id = self.rev_ids.pop_front();
+            if let (Some(compact_rev_id), Some(pop_rev_id)) = (compact_rev_id, pop_rev_id) {
+                if compact_rev_id <= pop_rev_id {
+                    if self.compact_length > 0 {
+                        self.compact_length -= 1;
+                    }
+                }
+            }
         }
         }
         Ok(())
         Ok(())
     }
     }
@@ -341,28 +375,22 @@ impl DeferSyncSequence {
     }
     }
 
 
     fn clear(&mut self) {
     fn clear(&mut self) {
-        self.start = None;
-        self.step = 0;
+        self.compact_index = None;
+        self.compact_length = 0;
         self.rev_ids.clear();
         self.rev_ids.clear();
     }
     }
 
 
     // Compact the rev_ids into one except the current synchronizing rev_id.
     // Compact the rev_ids into one except the current synchronizing rev_id.
     fn compact(&mut self) -> VecDeque<i64> {
     fn compact(&mut self) -> VecDeque<i64> {
-        if self.start.is_none() {
-            return VecDeque::default();
+        let mut compact_seq = VecDeque::with_capacity(self.rev_ids.len());
+        if let Some(start) = self.compact_index {
+            if start < self.rev_ids.len() {
+                let seq = self.rev_ids.split_off(start);
+                compact_seq.extend(seq);
+            }
         }
         }
-
-        let start = self.start.unwrap();
-        let compact_seq = self.rev_ids.split_off(start);
-        self.start = None;
-        self.step = 0;
+        self.compact_index = None;
+        self.compact_length = 0;
         compact_seq
         compact_seq
-
-        // let mut new_seq = self.rev_ids.clone();
-        // let mut drained = new_seq.drain(1..).collect::<VecDeque<_>>();
-        //
-        // let start = drained.pop_front()?;
-        // let end = drained.pop_back().unwrap_or(start);
-        // Some((RevisionRange { start, end }, new_seq))
     }
     }
 }
 }

+ 101 - 25
frontend/rust-lib/flowy-revision/tests/revision_test/local_revision_test.rs

@@ -19,37 +19,31 @@ async fn revision_sync_test() {
 }
 }
 
 
 #[tokio::test]
 #[tokio::test]
-async fn revision_sync_multiple_revisions() {
+async fn revision_compress_2_revisions_with_2_threshold_test() {
     let test = RevisionTest::new_with_configuration(2).await;
     let test = RevisionTest::new_with_configuration(2).await;
-    let (base_rev_id, rev_id_1) = test.next_rev_id_pair();
 
 
-    test.run_script(AddLocalRevision {
+    test.run_script(AddLocalRevision2 {
         content: "123".to_string(),
         content: "123".to_string(),
-        base_rev_id,
-        rev_id: rev_id_1,
+        pair_rev_id: test.next_rev_id_pair(),
     })
     })
     .await;
     .await;
 
 
-    let (base_rev_id, rev_id_2) = test.next_rev_id_pair();
-    test.run_script(AddLocalRevision {
+    test.run_script(AddLocalRevision2 {
         content: "456".to_string(),
         content: "456".to_string(),
-        base_rev_id,
-        rev_id: rev_id_2,
+        pair_rev_id: test.next_rev_id_pair(),
     })
     })
     .await;
     .await;
 
 
     test.run_scripts(vec![
     test.run_scripts(vec![
-        AssertNextSyncRevisionId { rev_id: Some(rev_id_1) },
-        AckRevision { rev_id: rev_id_1 },
-        AssertNextSyncRevisionId { rev_id: Some(rev_id_2) },
-        AckRevision { rev_id: rev_id_2 },
+        AssertNextSyncRevisionId { rev_id: Some(1) },
+        AckRevision { rev_id: 1 },
         AssertNextSyncRevisionId { rev_id: None },
         AssertNextSyncRevisionId { rev_id: None },
     ])
     ])
     .await;
     .await;
 }
 }
 
 
 #[tokio::test]
 #[tokio::test]
-async fn revision_compress_three_revisions_test() {
+async fn revision_compress_4_revisions_with_threshold_2_test() {
     let test = RevisionTest::new_with_configuration(2).await;
     let test = RevisionTest::new_with_configuration(2).await;
     let (base_rev_id, rev_id_1) = test.next_rev_id_pair();
     let (base_rev_id, rev_id_1) = test.next_rev_id_pair();
 
 
@@ -86,23 +80,23 @@ async fn revision_compress_three_revisions_test() {
 
 
     // rev_id_2,rev_id_3,rev_id4 will be merged into rev_id_1
     // rev_id_2,rev_id_3,rev_id4 will be merged into rev_id_1
     test.run_scripts(vec![
     test.run_scripts(vec![
-        Wait {
-            milliseconds: REVISION_WRITE_INTERVAL_IN_MILLIS,
-        },
-        AssertNumberOfSyncRevisions { num: 1 },
+        AssertNumberOfSyncRevisions { num: 2 },
         AssertNextSyncRevisionId { rev_id: Some(rev_id_1) },
         AssertNextSyncRevisionId { rev_id: Some(rev_id_1) },
         AssertNextSyncRevisionContent {
         AssertNextSyncRevisionContent {
-            expected: "1234".to_string(),
+            expected: "12".to_string(),
         },
         },
         AckRevision { rev_id: rev_id_1 },
         AckRevision { rev_id: rev_id_1 },
-        AssertNextSyncRevisionId { rev_id: None },
+        AssertNextSyncRevisionId { rev_id: Some(rev_id_2) },
+        AssertNextSyncRevisionContent {
+            expected: "34".to_string(),
+        },
     ])
     ])
     .await;
     .await;
 }
 }
 
 
 #[tokio::test]
 #[tokio::test]
-async fn revision_compress_three_revisions_test2() {
-    let test = RevisionTest::new_with_configuration(2).await;
+async fn revision_compress_8_revisions_with_threshold_4_test() {
+    let test = RevisionTest::new_with_configuration(4).await;
     let (base_rev_id, rev_id_1) = test.next_rev_id_pair();
     let (base_rev_id, rev_id_1) = test.next_rev_id_pair();
 
 
     test.run_script(AddLocalRevision {
     test.run_script(AddLocalRevision {
@@ -169,9 +163,6 @@ async fn revision_compress_three_revisions_test2() {
     .await;
     .await;
 
 
     test.run_scripts(vec![
     test.run_scripts(vec![
-        // Wait {
-        //     milliseconds: REVISION_WRITE_INTERVAL_IN_MILLIS,
-        // },
         AssertNumberOfSyncRevisions { num: 2 },
         AssertNumberOfSyncRevisions { num: 2 },
         AssertNextSyncRevisionId { rev_id: Some(rev_id_1) },
         AssertNextSyncRevisionId { rev_id: Some(rev_id_1) },
         AssertNextSyncRevisionContent {
         AssertNextSyncRevisionContent {
@@ -241,3 +232,88 @@ async fn revision_merge_per_100_revision_test() {
 
 
     test.run_scripts(vec![AssertNumberOfSyncRevisions { num: 10 }]).await;
     test.run_scripts(vec![AssertNumberOfSyncRevisions { num: 10 }]).await;
 }
 }
+
+#[tokio::test]
+async fn revision_merge_per_100_revision_test2() {
+    let test = RevisionTest::new_with_configuration(100).await;
+    for i in 0..50 {
+        let (base_rev_id, rev_id) = test.next_rev_id_pair();
+        test.run_script(AddLocalRevision {
+            content: format!("{}", i),
+            base_rev_id,
+            rev_id,
+        })
+        .await;
+    }
+
+    test.run_scripts(vec![AssertNumberOfSyncRevisions { num: 50 }]).await;
+}
+
+#[tokio::test]
+async fn revision_merge_per_1000_revision_test() {
+    let test = RevisionTest::new_with_configuration(1000).await;
+    for i in 0..100000 {
+        let (base_rev_id, rev_id) = test.next_rev_id_pair();
+        test.run_script(AddLocalRevision {
+            content: format!("{}", i),
+            base_rev_id,
+            rev_id,
+        })
+        .await;
+    }
+
+    test.run_scripts(vec![AssertNumberOfSyncRevisions { num: 100 }]).await;
+}
+
+#[tokio::test]
+async fn revision_compress_revision_test() {
+    let test = RevisionTest::new_with_configuration(2).await;
+
+    test.run_scripts(vec![
+        AddLocalRevision2 {
+            content: "1".to_string(),
+            pair_rev_id: test.next_rev_id_pair(),
+        },
+        AddLocalRevision2 {
+            content: "2".to_string(),
+            pair_rev_id: test.next_rev_id_pair(),
+        },
+        AddLocalRevision2 {
+            content: "3".to_string(),
+            pair_rev_id: test.next_rev_id_pair(),
+        },
+        AddLocalRevision2 {
+            content: "4".to_string(),
+            pair_rev_id: test.next_rev_id_pair(),
+        },
+        AssertNumberOfSyncRevisions { num: 2 },
+    ])
+    .await;
+}
+#[tokio::test]
+async fn revision_compress_revision_while_recv_ack_test() {
+    let test = RevisionTest::new_with_configuration(2).await;
+    test.run_scripts(vec![
+        AddLocalRevision2 {
+            content: "1".to_string(),
+            pair_rev_id: test.next_rev_id_pair(),
+        },
+        AckRevision { rev_id: 1 },
+        AddLocalRevision2 {
+            content: "2".to_string(),
+            pair_rev_id: test.next_rev_id_pair(),
+        },
+        AckRevision { rev_id: 2 },
+        AddLocalRevision2 {
+            content: "3".to_string(),
+            pair_rev_id: test.next_rev_id_pair(),
+        },
+        AckRevision { rev_id: 3 },
+        AddLocalRevision2 {
+            content: "4".to_string(),
+            pair_rev_id: test.next_rev_id_pair(),
+        },
+        AssertNumberOfSyncRevisions { num: 4 },
+    ])
+    .await;
+}

+ 1 - 0
frontend/rust-lib/flowy-revision/tests/revision_test/mod.rs

@@ -1,2 +1,3 @@
 mod local_revision_test;
 mod local_revision_test;
+mod revision_disk_test;
 mod script;
 mod script;

+ 103 - 0
frontend/rust-lib/flowy-revision/tests/revision_test/revision_disk_test.rs

@@ -0,0 +1,103 @@
+use crate::revision_test::script::RevisionScript::*;
+use crate::revision_test::script::{InvalidRevisionObject, RevisionTest};
+use flowy_revision::REVISION_WRITE_INTERVAL_IN_MILLIS;
+
+#[tokio::test]
+async fn revision_write_to_disk_test() {
+    let test = RevisionTest::new_with_configuration(2).await;
+    let (base_rev_id, rev_id) = test.next_rev_id_pair();
+
+    test.run_script(AddLocalRevision {
+        content: "123".to_string(),
+        base_rev_id,
+        rev_id,
+    })
+    .await;
+
+    test.run_scripts(vec![
+        AssertNumberOfRevisionsInDisk { num: 0 },
+        WaitWhenWriteToDisk,
+        AssertNumberOfRevisionsInDisk { num: 1 },
+    ])
+    .await;
+}
+
+#[tokio::test]
+async fn revision_write_to_disk_with_merge_test() {
+    let test = RevisionTest::new_with_configuration(100).await;
+    for i in 0..1000 {
+        let (base_rev_id, rev_id) = test.next_rev_id_pair();
+        test.run_script(AddLocalRevision {
+            content: format!("{}", i),
+            base_rev_id,
+            rev_id,
+        })
+        .await;
+    }
+
+    test.run_scripts(vec![
+        AssertNumberOfRevisionsInDisk { num: 0 },
+        AssertNumberOfSyncRevisions { num: 10 },
+        WaitWhenWriteToDisk,
+        AssertNumberOfRevisionsInDisk { num: 10 },
+    ])
+    .await;
+}
+
+#[tokio::test]
+async fn revision_read_from_disk_test() {
+    let test = RevisionTest::new_with_configuration(2).await;
+    let (base_rev_id, rev_id) = test.next_rev_id_pair();
+    test.run_scripts(vec![
+        AddLocalRevision {
+            content: "123".to_string(),
+            base_rev_id,
+            rev_id,
+        },
+        AssertNumberOfRevisionsInDisk { num: 0 },
+        WaitWhenWriteToDisk,
+        AssertNumberOfRevisionsInDisk { num: 1 },
+    ])
+    .await;
+
+    let test = RevisionTest::new_with_other(test).await;
+    let (base_rev_id, rev_id) = test.next_rev_id_pair();
+    test.run_scripts(vec![
+        AssertNextSyncRevisionId { rev_id: Some(1) },
+        AddLocalRevision {
+            content: "456".to_string(),
+            base_rev_id,
+            rev_id: rev_id.clone(),
+        },
+        AckRevision { rev_id: 1 },
+        AssertNextSyncRevisionId { rev_id: Some(rev_id) },
+    ])
+    .await;
+}
+
+#[tokio::test]
+#[should_panic]
+async fn revision_read_from_disk_with_invalid_record_test() {
+    let test = RevisionTest::new_with_configuration(2).await;
+    let (base_rev_id, rev_id) = test.next_rev_id_pair();
+    test.run_script(AddLocalRevision {
+        content: "123".to_string(),
+        base_rev_id,
+        rev_id,
+    })
+    .await;
+
+    let (base_rev_id, rev_id) = test.next_rev_id_pair();
+    test.run_script(AddInvalidLocalRevision {
+        bytes: InvalidRevisionObject::new(),
+        base_rev_id,
+        rev_id,
+    })
+    .await;
+
+    let test = RevisionTest::new_with_other(test).await;
+    test.run_scripts(vec![AssertNextSyncRevisionContent {
+        expected: "123".to_string(),
+    }])
+    .await;
+}

+ 152 - 21
frontend/rust-lib/flowy-revision/tests/revision_test/script.rs

@@ -2,11 +2,13 @@ use bytes::Bytes;
 use flowy_error::{FlowyError, FlowyResult};
 use flowy_error::{FlowyError, FlowyResult};
 use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, SyncRecord};
 use flowy_revision::disk::{RevisionChangeset, RevisionDiskCache, SyncRecord};
 use flowy_revision::{
 use flowy_revision::{
-    RevisionManager, RevisionMergeable, RevisionPersistence, RevisionPersistenceConfiguration,
-    RevisionSnapshotDiskCache, RevisionSnapshotInfo,
+    RevisionManager, RevisionMergeable, RevisionObjectDeserializer, RevisionPersistence,
+    RevisionPersistenceConfiguration, RevisionSnapshotDiskCache, RevisionSnapshotInfo,
+    REVISION_WRITE_INTERVAL_IN_MILLIS,
 };
 };
+use flowy_sync::entities::document::DocumentPayloadPB;
 use flowy_sync::entities::revision::{Revision, RevisionRange};
 use flowy_sync::entities::revision::{Revision, RevisionRange};
-use flowy_sync::util::md5;
+use flowy_sync::util::{make_operations_from_revisions, md5};
 use nanoid::nanoid;
 use nanoid::nanoid;
 use parking_lot::RwLock;
 use parking_lot::RwLock;
 use serde::{Deserialize, Serialize};
 use serde::{Deserialize, Serialize};
@@ -19,6 +21,15 @@ pub enum RevisionScript {
         base_rev_id: i64,
         base_rev_id: i64,
         rev_id: i64,
         rev_id: i64,
     },
     },
+    AddLocalRevision2 {
+        content: String,
+        pair_rev_id: (i64, i64),
+    },
+    AddInvalidLocalRevision {
+        bytes: Vec<u8>,
+        base_rev_id: i64,
+        rev_id: i64,
+    },
     AckRevision {
     AckRevision {
         rev_id: i64,
         rev_id: i64,
     },
     },
@@ -28,15 +39,19 @@ pub enum RevisionScript {
     AssertNumberOfSyncRevisions {
     AssertNumberOfSyncRevisions {
         num: usize,
         num: usize,
     },
     },
+    AssertNumberOfRevisionsInDisk {
+        num: usize,
+    },
     AssertNextSyncRevisionContent {
     AssertNextSyncRevisionContent {
         expected: String,
         expected: String,
     },
     },
-    Wait {
-        milliseconds: u64,
-    },
+    WaitWhenWriteToDisk,
 }
 }
 
 
 pub struct RevisionTest {
 pub struct RevisionTest {
+    user_id: String,
+    object_id: String,
+    configuration: RevisionPersistenceConfiguration,
     rev_manager: Arc<RevisionManager<RevisionConnectionMock>>,
     rev_manager: Arc<RevisionManager<RevisionConnectionMock>>,
 }
 }
 
 
@@ -45,19 +60,47 @@ impl RevisionTest {
         Self::new_with_configuration(2).await
         Self::new_with_configuration(2).await
     }
     }
 
 
-    pub async fn new_with_configuration(merge_when_excess_number_of_version: i64) -> Self {
+    pub async fn new_with_configuration(merge_threshold: i64) -> Self {
         let user_id = nanoid!(10);
         let user_id = nanoid!(10);
         let object_id = nanoid!(6);
         let object_id = nanoid!(6);
-        let configuration = RevisionPersistenceConfiguration::new(merge_when_excess_number_of_version as usize);
-        let persistence = RevisionPersistence::new(&user_id, &object_id, RevisionDiskCacheMock::new(), configuration);
+        let configuration = RevisionPersistenceConfiguration::new(merge_threshold as usize);
+        let disk_cache = RevisionDiskCacheMock::new(vec![]);
+        let persistence = RevisionPersistence::new(&user_id, &object_id, disk_cache, configuration.clone());
         let compress = RevisionCompressMock {};
         let compress = RevisionCompressMock {};
         let snapshot = RevisionSnapshotMock {};
         let snapshot = RevisionSnapshotMock {};
-        let rev_manager = RevisionManager::new(&user_id, &object_id, persistence, compress, snapshot);
+        let mut rev_manager = RevisionManager::new(&user_id, &object_id, persistence, compress, snapshot);
+        rev_manager.initialize::<RevisionObjectMockSerde>(None).await.unwrap();
         Self {
         Self {
+            user_id,
+            object_id,
+            configuration,
             rev_manager: Arc::new(rev_manager),
             rev_manager: Arc::new(rev_manager),
         }
         }
     }
     }
 
 
+    pub async fn new_with_other(old_test: RevisionTest) -> Self {
+        let records = old_test.rev_manager.get_all_revision_records().unwrap();
+        let disk_cache = RevisionDiskCacheMock::new(records);
+        let configuration = old_test.configuration;
+        let persistence = RevisionPersistence::new(
+            &old_test.user_id,
+            &old_test.object_id,
+            disk_cache,
+            configuration.clone(),
+        );
+
+        let compress = RevisionCompressMock {};
+        let snapshot = RevisionSnapshotMock {};
+        let mut rev_manager =
+            RevisionManager::new(&old_test.user_id, &old_test.object_id, persistence, compress, snapshot);
+        rev_manager.initialize::<RevisionObjectMockSerde>(None).await.unwrap();
+        Self {
+            user_id: old_test.user_id,
+            object_id: old_test.object_id,
+            configuration,
+            rev_manager: Arc::new(rev_manager),
+        }
+    }
     pub async fn run_scripts(&self, scripts: Vec<RevisionScript>) {
     pub async fn run_scripts(&self, scripts: Vec<RevisionScript>) {
         for script in scripts {
         for script in scripts {
             self.run_script(script).await;
             self.run_script(script).await;
@@ -87,6 +130,34 @@ impl RevisionTest {
                 );
                 );
                 self.rev_manager.add_local_revision(&revision).await.unwrap();
                 self.rev_manager.add_local_revision(&revision).await.unwrap();
             }
             }
+            RevisionScript::AddLocalRevision2 { content, pair_rev_id } => {
+                let object = RevisionObjectMock::new(&content);
+                let bytes = object.to_bytes();
+                let md5 = md5(&bytes);
+                let revision = Revision::new(
+                    &self.rev_manager.object_id,
+                    pair_rev_id.0,
+                    pair_rev_id.1,
+                    Bytes::from(bytes),
+                    md5,
+                );
+                self.rev_manager.add_local_revision(&revision).await.unwrap();
+            }
+            RevisionScript::AddInvalidLocalRevision {
+                bytes,
+                base_rev_id,
+                rev_id,
+            } => {
+                let md5 = md5(&bytes);
+                let revision = Revision::new(
+                    &self.rev_manager.object_id,
+                    base_rev_id,
+                    rev_id,
+                    Bytes::from(bytes),
+                    md5,
+                );
+                self.rev_manager.add_local_revision(&revision).await.unwrap();
+            }
             RevisionScript::AckRevision { rev_id } => {
             RevisionScript::AckRevision { rev_id } => {
                 //
                 //
                 self.rev_manager.ack_revision(rev_id).await.unwrap()
                 self.rev_manager.ack_revision(rev_id).await.unwrap()
@@ -97,6 +168,9 @@ impl RevisionTest {
             RevisionScript::AssertNumberOfSyncRevisions { num } => {
             RevisionScript::AssertNumberOfSyncRevisions { num } => {
                 assert_eq!(self.rev_manager.number_of_sync_revisions(), num)
                 assert_eq!(self.rev_manager.number_of_sync_revisions(), num)
             }
             }
+            RevisionScript::AssertNumberOfRevisionsInDisk { num } => {
+                assert_eq!(self.rev_manager.number_of_revisions_in_disk(), num)
+            }
             RevisionScript::AssertNextSyncRevisionContent { expected } => {
             RevisionScript::AssertNextSyncRevisionContent { expected } => {
                 //
                 //
                 let rev_id = self.rev_manager.next_sync_rev_id().await.unwrap();
                 let rev_id = self.rev_manager.next_sync_rev_id().await.unwrap();
@@ -104,7 +178,8 @@ impl RevisionTest {
                 let object = RevisionObjectMock::from_bytes(&revision.bytes);
                 let object = RevisionObjectMock::from_bytes(&revision.bytes);
                 assert_eq!(object.content, expected);
                 assert_eq!(object.content, expected);
             }
             }
-            RevisionScript::Wait { milliseconds } => {
+            RevisionScript::WaitWhenWriteToDisk => {
+                let milliseconds = 2 * REVISION_WRITE_INTERVAL_IN_MILLIS;
                 tokio::time::sleep(Duration::from_millis(milliseconds)).await;
                 tokio::time::sleep(Duration::from_millis(milliseconds)).await;
             }
             }
         }
         }
@@ -116,9 +191,9 @@ pub struct RevisionDiskCacheMock {
 }
 }
 
 
 impl RevisionDiskCacheMock {
 impl RevisionDiskCacheMock {
-    pub fn new() -> Self {
+    pub fn new(records: Vec<SyncRecord>) -> Self {
         Self {
         Self {
-            records: RwLock::new(vec![]),
+            records: RwLock::new(records),
         }
         }
     }
     }
 }
 }
@@ -138,17 +213,36 @@ impl RevisionDiskCache<RevisionConnectionMock> for RevisionDiskCacheMock {
     fn read_revision_records(
     fn read_revision_records(
         &self,
         &self,
         _object_id: &str,
         _object_id: &str,
-        _rev_ids: Option<Vec<i64>>,
+        rev_ids: Option<Vec<i64>>,
     ) -> Result<Vec<SyncRecord>, Self::Error> {
     ) -> Result<Vec<SyncRecord>, Self::Error> {
-        todo!()
+        match rev_ids {
+            None => Ok(self.records.read().clone()),
+            Some(rev_ids) => Ok(self
+                .records
+                .read()
+                .iter()
+                .filter(|record| rev_ids.contains(&record.revision.rev_id))
+                .cloned()
+                .collect::<Vec<SyncRecord>>()),
+        }
     }
     }
 
 
     fn read_revision_records_with_range(
     fn read_revision_records_with_range(
         &self,
         &self,
         _object_id: &str,
         _object_id: &str,
-        _range: &RevisionRange,
+        range: &RevisionRange,
     ) -> Result<Vec<SyncRecord>, Self::Error> {
     ) -> Result<Vec<SyncRecord>, Self::Error> {
-        todo!()
+        let read_guard = self.records.read();
+        let records = range
+            .iter()
+            .flat_map(|rev_id| {
+                read_guard
+                    .iter()
+                    .find(|record| record.revision.rev_id == rev_id)
+                    .cloned()
+            })
+            .collect::<Vec<SyncRecord>>();
+        Ok(records)
     }
     }
 
 
     fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
     fn update_revision_record(&self, changesets: Vec<RevisionChangeset>) -> FlowyResult<()> {
@@ -195,9 +289,7 @@ impl RevisionDiskCache<RevisionConnectionMock> for RevisionDiskCacheMock {
 }
 }
 
 
 pub struct RevisionConnectionMock {}
 pub struct RevisionConnectionMock {}
-
 pub struct RevisionSnapshotMock {}
 pub struct RevisionSnapshotMock {}
-
 impl RevisionSnapshotDiskCache for RevisionSnapshotMock {
 impl RevisionSnapshotDiskCache for RevisionSnapshotMock {
     fn write_snapshot(&self, _object_id: &str, _rev_id: i64, _data: Vec<u8>) -> FlowyResult<()> {
     fn write_snapshot(&self, _object_id: &str, _rev_id: i64, _data: Vec<u8>) -> FlowyResult<()> {
         todo!()
         todo!()
@@ -215,12 +307,31 @@ impl RevisionMergeable for RevisionCompressMock {
         let mut object = RevisionObjectMock::new("");
         let mut object = RevisionObjectMock::new("");
         for revision in revisions {
         for revision in revisions {
             let other = RevisionObjectMock::from_bytes(&revision.bytes);
             let other = RevisionObjectMock::from_bytes(&revision.bytes);
-            object.compose(other);
+            let _ = object.compose(other)?;
         }
         }
         Ok(Bytes::from(object.to_bytes()))
         Ok(Bytes::from(object.to_bytes()))
     }
     }
 }
 }
 
 
+#[derive(Serialize, Deserialize)]
+pub struct InvalidRevisionObject {
+    data: String,
+}
+
+impl InvalidRevisionObject {
+    pub fn new() -> Vec<u8> {
+        let object = InvalidRevisionObject { data: "".to_string() };
+        object.to_bytes()
+    }
+    fn to_bytes(&self) -> Vec<u8> {
+        serde_json::to_vec(self).unwrap()
+    }
+
+    fn from_bytes(bytes: &[u8]) -> Self {
+        serde_json::from_slice(bytes).unwrap()
+    }
+}
+
 #[derive(Serialize, Deserialize)]
 #[derive(Serialize, Deserialize)]
 pub struct RevisionObjectMock {
 pub struct RevisionObjectMock {
     content: String,
     content: String,
@@ -231,8 +342,9 @@ impl RevisionObjectMock {
         Self { content: s.to_owned() }
         Self { content: s.to_owned() }
     }
     }
 
 
-    pub fn compose(&mut self, other: RevisionObjectMock) {
+    pub fn compose(&mut self, other: RevisionObjectMock) -> FlowyResult<()> {
         self.content.push_str(other.content.as_str());
         self.content.push_str(other.content.as_str());
+        Ok(())
     }
     }
 
 
     pub fn to_bytes(&self) -> Vec<u8> {
     pub fn to_bytes(&self) -> Vec<u8> {
@@ -243,3 +355,22 @@ impl RevisionObjectMock {
         serde_json::from_slice(bytes).unwrap()
         serde_json::from_slice(bytes).unwrap()
     }
     }
 }
 }
+
+pub struct RevisionObjectMockSerde();
+impl RevisionObjectDeserializer for RevisionObjectMockSerde {
+    type Output = RevisionObjectMock;
+
+    fn deserialize_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
+        let mut object = RevisionObjectMock::new("");
+        if revisions.is_empty() {
+            return Ok(object);
+        }
+
+        for revision in revisions {
+            let revision_object = RevisionObjectMock::from_bytes(&revision.bytes);
+            let _ = object.compose(revision_object)?;
+        }
+
+        Ok(object)
+    }
+}

+ 1 - 1
frontend/rust-lib/flowy-sdk/src/lib.rs

@@ -86,7 +86,7 @@ fn crate_log_filter(level: String) -> String {
     filters.push(format!("lib_ws={}", level));
     filters.push(format!("lib_ws={}", level));
     filters.push(format!("lib_infra={}", level));
     filters.push(format!("lib_infra={}", level));
     filters.push(format!("flowy_sync={}", level));
     filters.push(format!("flowy_sync={}", level));
-    // filters.push(format!("flowy_revision={}", level));
+    filters.push(format!("flowy_revision={}", level));
     // filters.push(format!("lib_dispatch={}", level));
     // filters.push(format!("lib_dispatch={}", level));
 
 
     filters.push(format!("dart_ffi={}", "info"));
     filters.push(format!("dart_ffi={}", "info"));