Browse Source

refactor: revision_cache with sync_seq

appflowy 3 years ago
parent
commit
dd8c26df02

+ 1 - 1
frontend/rust-lib/flowy-document/src/editor.rs

@@ -213,7 +213,7 @@ struct DocumentInfoBuilder();
 impl RevisionObjectBuilder for DocumentInfoBuilder {
 impl RevisionObjectBuilder for DocumentInfoBuilder {
     type Output = DocumentInfo;
     type Output = DocumentInfo;
 
 
-    fn build_with_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
+    fn build_object(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
         let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
         let (base_rev_id, rev_id) = revisions.last().unwrap().pair_rev_id();
         let mut delta = make_delta_from_revisions(revisions)?;
         let mut delta = make_delta_from_revisions(revisions)?;
         correct_delta(&mut delta);
         correct_delta(&mut delta);

+ 1 - 1
frontend/rust-lib/flowy-document/src/manager.rs

@@ -94,7 +94,7 @@ impl FlowyDocumentManager {
         })
         })
     }
     }
 
 
-    pub async fn receive_revisions<T: AsRef<str>>(&self, doc_id: T, revisions: RepeatedRevision) -> FlowyResult<()> {
+    pub async fn reset_with_revisions<T: AsRef<str>>(&self, doc_id: T, revisions: RepeatedRevision) -> FlowyResult<()> {
         let doc_id = doc_id.as_ref().to_owned();
         let doc_id = doc_id.as_ref().to_owned();
         let db_pool = self.document_user.db_pool()?;
         let db_pool = self.document_user.db_pool()?;
         let rev_manager = self.make_rev_manager(&doc_id, db_pool)?;
         let rev_manager = self.make_rev_manager(&doc_id, db_pool)?;

+ 1 - 1
frontend/rust-lib/flowy-folder/src/services/folder_editor.rs

@@ -103,7 +103,7 @@ struct FolderPadBuilder();
 impl RevisionObjectBuilder for FolderPadBuilder {
 impl RevisionObjectBuilder for FolderPadBuilder {
     type Output = FolderPad;
     type Output = FolderPad;
 
 
-    fn build_with_revisions(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
+    fn build_object(_object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
         let pad = FolderPad::from_revisions(revisions)?;
         let pad = FolderPad::from_revisions(revisions)?;
         Ok(pad)
         Ok(pad)
     }
     }

+ 2 - 2
frontend/rust-lib/flowy-folder/src/services/view/controller.rs

@@ -73,7 +73,7 @@ impl ViewController {
             Revision::initial_revision(&user_id, &params.view_id, delta_data).into();
             Revision::initial_revision(&user_id, &params.view_id, delta_data).into();
         let _ = self
         let _ = self
             .document_manager
             .document_manager
-            .receive_revisions(&params.view_id, repeated_revision)
+            .reset_with_revisions(&params.view_id, repeated_revision)
             .await?;
             .await?;
         let view = self.create_view_on_server(params).await?;
         let view = self.create_view_on_server(params).await?;
         let _ = self.create_view_on_local(view.clone()).await?;
         let _ = self.create_view_on_local(view.clone()).await?;
@@ -96,7 +96,7 @@ impl ViewController {
         let repeated_revision: RepeatedRevision = Revision::initial_revision(&user_id, view_id, delta_data).into();
         let repeated_revision: RepeatedRevision = Revision::initial_revision(&user_id, view_id, delta_data).into();
         let _ = self
         let _ = self
             .document_manager
             .document_manager
-            .receive_revisions(view_id, repeated_revision)
+            .reset_with_revisions(view_id, repeated_revision)
             .await?;
             .await?;
         Ok(())
         Ok(())
     }
     }

+ 169 - 42
frontend/rust-lib/flowy-sync/src/cache/mod.rs

@@ -10,42 +10,135 @@ use flowy_collaboration::entities::revision::{Revision, RevisionRange, RevisionS
 use flowy_database::ConnectionPool;
 use flowy_database::ConnectionPool;
 use flowy_error::{internal_error, FlowyError, FlowyResult};
 use flowy_error::{internal_error, FlowyError, FlowyResult};
 
 
-use std::{
-    borrow::Cow,
-    sync::{
-        atomic::{AtomicI64, Ordering::SeqCst},
-        Arc,
-    },
-};
+use crate::RevisionCompact;
+use std::collections::VecDeque;
+use std::{borrow::Cow, sync::Arc};
+use tokio::sync::RwLock;
 use tokio::task::spawn_blocking;
 use tokio::task::spawn_blocking;
 
 
 pub const REVISION_WRITE_INTERVAL_IN_MILLIS: u64 = 600;
 pub const REVISION_WRITE_INTERVAL_IN_MILLIS: u64 = 600;
 
 
 pub struct RevisionCache {
 pub struct RevisionCache {
+    user_id: String,
     object_id: String,
     object_id: String,
     disk_cache: Arc<dyn RevisionDiskCache<Error = FlowyError>>,
     disk_cache: Arc<dyn RevisionDiskCache<Error = FlowyError>>,
     memory_cache: Arc<RevisionMemoryCache>,
     memory_cache: Arc<RevisionMemoryCache>,
-    latest_rev_id: AtomicI64,
+    sync_seq: RwLock<SyncSequence>,
 }
 }
 impl RevisionCache {
 impl RevisionCache {
     pub fn new(user_id: &str, object_id: &str, pool: Arc<ConnectionPool>) -> RevisionCache {
     pub fn new(user_id: &str, object_id: &str, pool: Arc<ConnectionPool>) -> RevisionCache {
         let disk_cache = Arc::new(SQLitePersistence::new(user_id, pool));
         let disk_cache = Arc::new(SQLitePersistence::new(user_id, pool));
         let memory_cache = Arc::new(RevisionMemoryCache::new(object_id, Arc::new(disk_cache.clone())));
         let memory_cache = Arc::new(RevisionMemoryCache::new(object_id, Arc::new(disk_cache.clone())));
         let object_id = object_id.to_owned();
         let object_id = object_id.to_owned();
+        let user_id = user_id.to_owned();
+        let sync_seq = RwLock::new(SyncSequence::new());
         Self {
         Self {
+            user_id,
             object_id,
             object_id,
             disk_cache,
             disk_cache,
             memory_cache,
             memory_cache,
-            latest_rev_id: AtomicI64::new(0),
+            sync_seq,
+        }
+    }
+
+    /// Save the revision that comes from remote to disk.
+    #[tracing::instrument(level = "trace", skip(self, revision), fields(rev_id, object_id=%self.object_id), err)]
+    pub(crate) async fn add_ack_revision(&self, revision: &Revision) -> FlowyResult<()> {
+        tracing::Span::current().record("rev_id", &revision.rev_id);
+        self.add(revision.clone(), RevisionState::Ack, true).await
+    }
+
+    /// Append the revision that already existed in the local DB state to sync sequence
+    #[tracing::instrument(level = "trace", skip(self), fields(rev_id, object_id=%self.object_id), err)]
+    pub(crate) async fn sync_revision(&self, revision: &Revision) -> FlowyResult<()> {
+        tracing::Span::current().record("rev_id", &revision.rev_id);
+        self.add(revision.clone(), RevisionState::Sync, false).await?;
+        self.sync_seq.write().await.add(revision.rev_id)?;
+        Ok(())
+    }
+
+    /// Save the revision to disk and append it to the end of the sync sequence.
+    #[tracing::instrument(level = "trace", skip(self, revision), fields(rev_id, compact_range, object_id=%self.object_id), err)]
+    pub(crate) async fn add_sync_revision<C>(&self, revision: &Revision) -> FlowyResult<i64>
+    where
+        C: RevisionCompact,
+    {
+        let result = self.sync_seq.read().await.compact();
+        match result {
+            None => {
+                tracing::Span::current().record("rev_id", &revision.rev_id);
+                self.add(revision.clone(), RevisionState::Sync, true).await?;
+                self.sync_seq.write().await.add(revision.rev_id)?;
+                Ok(revision.rev_id)
+            }
+            Some((range, mut compact_seq)) => {
+                tracing::Span::current().record("compact_range", &format!("{}", range).as_str());
+                let mut revisions = self.revisions_in_range(&range).await?;
+                if range.to_rev_ids().len() != revisions.len() {
+                    debug_assert_eq!(range.to_rev_ids().len(), revisions.len());
+                }
+
+                // append the new revision
+                revisions.push(revision.clone());
+
+                // compact multiple revisions into one
+                let compact_revision = C::compact_revisions(&self.user_id, &self.object_id, revisions)?;
+                let rev_id = compact_revision.rev_id;
+                tracing::Span::current().record("rev_id", &rev_id);
+
+                // insert new revision
+                compact_seq.push_back(rev_id);
+
+                // replace the revisions in range with compact revision
+                self.compact(&range, compact_revision).await?;
+                debug_assert_eq!(self.sync_seq.read().await.len(), compact_seq.len());
+                self.sync_seq.write().await.reset(compact_seq);
+                Ok(rev_id)
+            }
         }
         }
     }
     }
 
 
-    pub async fn add(&self, revision: Revision, state: RevisionState, write_to_disk: bool) -> FlowyResult<()> {
+    /// Remove the revision with rev_id from the sync sequence.
+    pub(crate) async fn ack_revision(&self, rev_id: i64) -> FlowyResult<()> {
+        if self.sync_seq.write().await.ack(&rev_id).is_ok() {
+            self.memory_cache.ack(&rev_id).await;
+        }
+        Ok(())
+    }
+
+    pub(crate) async fn next_sync_revision(&self) -> FlowyResult<Option<Revision>> {
+        match self.sync_seq.read().await.next_rev_id() {
+            None => Ok(None),
+            Some(rev_id) => Ok(self.get(rev_id).await.map(|record| record.revision)),
+        }
+    }
+
+    /// The cache gets reset while it conflicts with the remote revisions.
+    #[tracing::instrument(level = "trace", skip(self, revisions), err)]
+    pub(crate) async fn reset(&self, revisions: Vec<Revision>) -> FlowyResult<()> {
+        let records = revisions
+            .to_vec()
+            .into_iter()
+            .map(|revision| RevisionRecord {
+                revision,
+                state: RevisionState::Sync,
+                write_to_disk: false,
+            })
+            .collect::<Vec<_>>();
+
+        let _ = self
+            .disk_cache
+            .delete_and_insert_records(&self.object_id, None, records.clone())?;
+        let _ = self.memory_cache.reset_with_revisions(records).await;
+        self.sync_seq.write().await.clear();
+        Ok(())
+    }
+
+    async fn add(&self, revision: Revision, state: RevisionState, write_to_disk: bool) -> FlowyResult<()> {
         if self.memory_cache.contains(&revision.rev_id) {
         if self.memory_cache.contains(&revision.rev_id) {
             tracing::warn!("Duplicate revision: {}:{}-{:?}", self.object_id, revision.rev_id, state);
             tracing::warn!("Duplicate revision: {}:{}-{:?}", self.object_id, revision.rev_id, state);
             return Ok(());
             return Ok(());
         }
         }
-        let rev_id = revision.rev_id;
         let record = RevisionRecord {
         let record = RevisionRecord {
             revision,
             revision,
             state,
             state,
@@ -53,11 +146,10 @@ impl RevisionCache {
         };
         };
 
 
         self.memory_cache.add(Cow::Owned(record)).await;
         self.memory_cache.add(Cow::Owned(record)).await;
-        self.set_latest_rev_id(rev_id);
         Ok(())
         Ok(())
     }
     }
 
 
-    pub async fn compact(&self, range: &RevisionRange, new_revision: Revision) -> FlowyResult<()> {
+    async fn compact(&self, range: &RevisionRange, new_revision: Revision) -> FlowyResult<()> {
         self.memory_cache.remove_with_range(range);
         self.memory_cache.remove_with_range(range);
         let rev_ids = range.to_rev_ids();
         let rev_ids = range.to_rev_ids();
         let _ = self
         let _ = self
@@ -68,10 +160,6 @@ impl RevisionCache {
         Ok(())
         Ok(())
     }
     }
 
 
-    pub async fn ack(&self, rev_id: i64) {
-        self.memory_cache.ack(&rev_id).await;
-    }
-
     pub async fn get(&self, rev_id: i64) -> Option<RevisionRecord> {
     pub async fn get(&self, rev_id: i64) -> Option<RevisionRecord> {
         match self.memory_cache.get(&rev_id).await {
         match self.memory_cache.get(&rev_id).await {
             None => match self
             None => match self
@@ -122,31 +210,6 @@ impl RevisionCache {
             .map(|record| record.revision)
             .map(|record| record.revision)
             .collect::<Vec<Revision>>())
             .collect::<Vec<Revision>>())
     }
     }
-
-    #[tracing::instrument(level = "debug", skip(self, revisions), err)]
-    pub async fn reset_with_revisions(&self, object_id: &str, revisions: Vec<Revision>) -> FlowyResult<()> {
-        let records = revisions
-            .to_vec()
-            .into_iter()
-            .map(|revision| RevisionRecord {
-                revision,
-                state: RevisionState::Sync,
-                write_to_disk: false,
-            })
-            .collect::<Vec<_>>();
-
-        let _ = self
-            .disk_cache
-            .delete_and_insert_records(object_id, None, records.clone())?;
-        let _ = self.memory_cache.reset_with_revisions(records).await;
-
-        Ok(())
-    }
-
-    #[inline]
-    fn set_latest_rev_id(&self, rev_id: i64) {
-        let _ = self.latest_rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id));
-    }
 }
 }
 
 
 pub fn mk_revision_disk_cache(
 pub fn mk_revision_disk_cache(
@@ -196,3 +259,67 @@ impl RevisionRecord {
         self.state = RevisionState::Ack;
         self.state = RevisionState::Ack;
     }
     }
 }
 }
+
+#[derive(Default)]
+struct SyncSequence(VecDeque<i64>);
+impl SyncSequence {
+    fn new() -> Self {
+        SyncSequence::default()
+    }
+
+    fn add(&mut self, new_rev_id: i64) -> FlowyResult<()> {
+        // The last revision's rev_id must be greater than the new one.
+        if let Some(rev_id) = self.0.back() {
+            if *rev_id >= new_rev_id {
+                return Err(
+                    FlowyError::internal().context(format!("The new revision's id must be greater than {}", rev_id))
+                );
+            }
+        }
+        self.0.push_back(new_rev_id);
+        Ok(())
+    }
+
+    fn ack(&mut self, rev_id: &i64) -> FlowyResult<()> {
+        let cur_rev_id = self.0.front().cloned();
+        if let Some(pop_rev_id) = cur_rev_id {
+            if &pop_rev_id != rev_id {
+                let desc = format!(
+                    "The ack rev_id:{} is not equal to the current rev_id:{}",
+                    rev_id, pop_rev_id
+                );
+                return Err(FlowyError::internal().context(desc));
+            }
+            let _ = self.0.pop_front();
+        }
+        Ok(())
+    }
+
+    fn next_rev_id(&self) -> Option<i64> {
+        self.0.front().cloned()
+    }
+
+    fn reset(&mut self, new_seq: VecDeque<i64>) {
+        self.0 = new_seq;
+    }
+
+    fn clear(&mut self) {
+        self.0.clear();
+    }
+
+    fn len(&self) -> usize {
+        self.0.len()
+    }
+
+    // Compact the rev_ids into one except the current synchronizing rev_id.
+    fn compact(&self) -> Option<(RevisionRange, VecDeque<i64>)> {
+        self.next_rev_id()?;
+
+        let mut new_seq = self.0.clone();
+        let mut drained = new_seq.drain(1..).collect::<VecDeque<_>>();
+
+        let start = drained.pop_front()?;
+        let end = drained.pop_back().unwrap_or(start);
+        Some((RevisionRange { start, end }, new_seq))
+    }
+}

+ 21 - 209
frontend/rust-lib/flowy-sync/src/rev_manager.rs

@@ -6,8 +6,7 @@ use flowy_collaboration::{
 use flowy_error::{FlowyError, FlowyResult};
 use flowy_error::{FlowyError, FlowyResult};
 use lib_infra::future::FutureResult;
 use lib_infra::future::FutureResult;
 
 
-use std::{collections::VecDeque, sync::Arc};
-use tokio::sync::RwLock;
+use std::sync::Arc;
 
 
 pub trait RevisionCloudService: Send + Sync {
 pub trait RevisionCloudService: Send + Sync {
     fn fetch_object(&self, user_id: &str, object_id: &str) -> FutureResult<Vec<Revision>, FlowyError>;
     fn fetch_object(&self, user_id: &str, object_id: &str) -> FutureResult<Vec<Revision>, FlowyError>;
@@ -15,7 +14,7 @@ pub trait RevisionCloudService: Send + Sync {
 
 
 pub trait RevisionObjectBuilder: Send + Sync {
 pub trait RevisionObjectBuilder: Send + Sync {
     type Output;
     type Output;
-    fn build_with_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output>;
+    fn build_object(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output>;
 }
 }
 
 
 pub trait RevisionCompact: Send + Sync {
 pub trait RevisionCompact: Send + Sync {
@@ -26,16 +25,15 @@ pub struct RevisionManager {
     pub object_id: String,
     pub object_id: String,
     user_id: String,
     user_id: String,
     rev_id_counter: RevIdCounter,
     rev_id_counter: RevIdCounter,
-    rev_compressor: Arc<RwLock<RevisionCompressor>>,
+    rev_cache: Arc<RevisionCache>,
 
 
     #[cfg(feature = "flowy_unit_test")]
     #[cfg(feature = "flowy_unit_test")]
     rev_ack_notifier: tokio::sync::broadcast::Sender<i64>,
     rev_ack_notifier: tokio::sync::broadcast::Sender<i64>,
 }
 }
 
 
 impl RevisionManager {
 impl RevisionManager {
-    pub fn new(user_id: &str, object_id: &str, revision_cache: Arc<RevisionCache>) -> Self {
+    pub fn new(user_id: &str, object_id: &str, rev_cache: Arc<RevisionCache>) -> Self {
         let rev_id_counter = RevIdCounter::new(0);
         let rev_id_counter = RevIdCounter::new(0);
-        let rev_compressor = Arc::new(RwLock::new(RevisionCompressor::new(object_id, user_id, revision_cache)));
         #[cfg(feature = "flowy_unit_test")]
         #[cfg(feature = "flowy_unit_test")]
         let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1);
         let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1);
 
 
@@ -43,7 +41,7 @@ impl RevisionManager {
             object_id: object_id.to_string(),
             object_id: object_id.to_string(),
             user_id: user_id.to_owned(),
             user_id: user_id.to_owned(),
             rev_id_counter,
             rev_id_counter,
-            rev_compressor,
+            rev_cache,
 
 
             #[cfg(feature = "flowy_unit_test")]
             #[cfg(feature = "flowy_unit_test")]
             rev_ack_notifier: revision_ack_notifier,
             rev_ack_notifier: revision_ack_notifier,
@@ -59,20 +57,18 @@ impl RevisionManager {
             object_id: self.object_id.clone(),
             object_id: self.object_id.clone(),
             user_id: self.user_id.clone(),
             user_id: self.user_id.clone(),
             cloud,
             cloud,
-            rev_compressor: self.rev_compressor.clone(),
+            rev_cache: self.rev_cache.clone(),
         }
         }
-        .load::<C>()
+        .load()
         .await?;
         .await?;
         self.rev_id_counter.set(rev_id);
         self.rev_id_counter.set(rev_id);
-        B::build_with_revisions(&self.object_id, revisions)
+        B::build_object(&self.object_id, revisions)
     }
     }
 
 
     #[tracing::instrument(level = "debug", skip(self, revisions), err)]
     #[tracing::instrument(level = "debug", skip(self, revisions), err)]
     pub async fn reset_object(&self, revisions: RepeatedRevision) -> FlowyResult<()> {
     pub async fn reset_object(&self, revisions: RepeatedRevision) -> FlowyResult<()> {
         let rev_id = pair_rev_id_from_revisions(&revisions).1;
         let rev_id = pair_rev_id_from_revisions(&revisions).1;
-
-        let write_guard = self.rev_compressor.write().await;
-        let _ = write_guard.reset(revisions.into_inner()).await?;
+        let _ = self.rev_cache.reset(revisions.into_inner()).await?;
         self.rev_id_counter.set(rev_id);
         self.rev_id_counter.set(rev_id);
         Ok(())
         Ok(())
     }
     }
@@ -83,8 +79,7 @@ impl RevisionManager {
             return Err(FlowyError::internal().context("Delta data should be empty"));
             return Err(FlowyError::internal().context("Delta data should be empty"));
         }
         }
 
 
-        let write_guard = self.rev_compressor.write().await;
-        let _ = write_guard.add_ack_revision(revision).await?;
+        let _ = self.rev_cache.add_ack_revision(revision).await?;
         self.rev_id_counter.set(revision.rev_id);
         self.rev_id_counter.set(revision.rev_id);
         Ok(())
         Ok(())
     }
     }
@@ -97,16 +92,14 @@ impl RevisionManager {
         if revision.delta_data.is_empty() {
         if revision.delta_data.is_empty() {
             return Err(FlowyError::internal().context("Delta data should be empty"));
             return Err(FlowyError::internal().context("Delta data should be empty"));
         }
         }
-        let mut write_guard = self.rev_compressor.write().await;
-        let rev_id = write_guard.write_sync_revision::<C>(revision).await?;
-
+        let rev_id = self.rev_cache.add_sync_revision::<C>(revision).await?;
         self.rev_id_counter.set(rev_id);
         self.rev_id_counter.set(rev_id);
         Ok(())
         Ok(())
     }
     }
 
 
     #[tracing::instrument(level = "debug", skip(self), err)]
     #[tracing::instrument(level = "debug", skip(self), err)]
     pub async fn ack_revision(&self, rev_id: i64) -> Result<(), FlowyError> {
     pub async fn ack_revision(&self, rev_id: i64) -> Result<(), FlowyError> {
-        if self.rev_compressor.write().await.ack_revision(rev_id).await.is_ok() {
+        if self.rev_cache.ack_revision(rev_id).await.is_ok() {
             #[cfg(feature = "flowy_unit_test")]
             #[cfg(feature = "flowy_unit_test")]
             let _ = self.rev_ack_notifier.send(rev_id);
             let _ = self.rev_ack_notifier.send(rev_id);
         }
         }
@@ -124,222 +117,46 @@ impl RevisionManager {
     }
     }
 
 
     pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result<Vec<Revision>, FlowyError> {
     pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result<Vec<Revision>, FlowyError> {
-        let revisions = self.rev_compressor.read().await.revisions_in_range(&range).await?;
+        let revisions = self.rev_cache.revisions_in_range(&range).await?;
         Ok(revisions)
         Ok(revisions)
     }
     }
 
 
     pub async fn next_sync_revision(&self) -> FlowyResult<Option<Revision>> {
     pub async fn next_sync_revision(&self) -> FlowyResult<Option<Revision>> {
-        Ok(self.rev_compressor.read().await.next_sync_revision().await?)
+        Ok(self.rev_cache.next_sync_revision().await?)
     }
     }
 
 
     pub async fn get_revision(&self, rev_id: i64) -> Option<Revision> {
     pub async fn get_revision(&self, rev_id: i64) -> Option<Revision> {
-        self.rev_compressor
-            .read()
-            .await
-            .get(rev_id)
-            .await
-            .map(|record| record.revision)
+        self.rev_cache.get(rev_id).await.map(|record| record.revision)
     }
     }
 }
 }
 
 
 #[cfg(feature = "flowy_unit_test")]
 #[cfg(feature = "flowy_unit_test")]
 impl RevisionManager {
 impl RevisionManager {
     pub async fn revision_cache(&self) -> Arc<RevisionCache> {
     pub async fn revision_cache(&self) -> Arc<RevisionCache> {
-        self.rev_compressor.read().await.inner.clone()
+        self.rev_cache.clone()
     }
     }
     pub fn ack_notify(&self) -> tokio::sync::broadcast::Receiver<i64> {
     pub fn ack_notify(&self) -> tokio::sync::broadcast::Receiver<i64> {
         self.rev_ack_notifier.subscribe()
         self.rev_ack_notifier.subscribe()
     }
     }
 }
 }
 
 
-struct RevisionCompressor {
-    object_id: String,
-    user_id: String,
-    inner: Arc<RevisionCache>,
-    sync_seq: RevisionSyncSequence,
-}
-
-impl RevisionCompressor {
-    fn new(object_id: &str, user_id: &str, inner: Arc<RevisionCache>) -> Self {
-        let sync_seq = RevisionSyncSequence::new();
-        let object_id = object_id.to_owned();
-        let user_id = user_id.to_owned();
-        Self {
-            object_id,
-            user_id,
-            inner,
-            sync_seq,
-        }
-    }
-
-    // Call this method to write the revisions that fetch from server to disk.
-    #[tracing::instrument(level = "trace", skip(self, revision), fields(rev_id, object_id=%self.object_id), err)]
-    async fn add_ack_revision(&self, revision: &Revision) -> FlowyResult<()> {
-        tracing::Span::current().record("rev_id", &revision.rev_id);
-        self.inner.add(revision.clone(), RevisionState::Ack, true).await
-    }
-
-    // Call this method to sync the revisions that already in local db.
-    #[tracing::instrument(level = "trace", skip(self), fields(rev_id, object_id=%self.object_id), err)]
-    async fn add_sync_revision(&mut self, revision: &Revision) -> FlowyResult<()> {
-        tracing::Span::current().record("rev_id", &revision.rev_id);
-        self.inner.add(revision.clone(), RevisionState::Sync, false).await?;
-        self.sync_seq.add(revision.rev_id)?;
-        Ok(())
-    }
-
-    // Call this method to save the new revisions generated by the user input.
-    #[tracing::instrument(level = "trace", skip(self, revision), fields(rev_id, compact_range, object_id=%self.object_id), err)]
-    async fn write_sync_revision<C>(&mut self, revision: &Revision) -> FlowyResult<i64>
-    where
-        C: RevisionCompact,
-    {
-        match self.sync_seq.compact() {
-            None => {
-                tracing::Span::current().record("rev_id", &revision.rev_id);
-                self.inner.add(revision.clone(), RevisionState::Sync, true).await?;
-                self.sync_seq.add(revision.rev_id)?;
-                Ok(revision.rev_id)
-            }
-            Some((range, mut compact_seq)) => {
-                tracing::Span::current().record("compact_range", &format!("{}", range).as_str());
-                let mut revisions = self.inner.revisions_in_range(&range).await?;
-                if range.to_rev_ids().len() != revisions.len() {
-                    debug_assert_eq!(range.to_rev_ids().len(), revisions.len());
-                }
-
-                // append the new revision
-                revisions.push(revision.clone());
-
-                // compact multiple revisions into one
-                let compact_revision = C::compact_revisions(&self.user_id, &self.object_id, revisions)?;
-                let rev_id = compact_revision.rev_id;
-                tracing::Span::current().record("rev_id", &rev_id);
-
-                // insert new revision
-                compact_seq.push_back(rev_id);
-
-                // replace the revisions in range with compact revision
-                self.inner.compact(&range, compact_revision).await?;
-                debug_assert_eq!(self.sync_seq.len(), compact_seq.len());
-                self.sync_seq.reset(compact_seq);
-                Ok(rev_id)
-            }
-        }
-    }
-
-    async fn ack_revision(&mut self, rev_id: i64) -> FlowyResult<()> {
-        if self.sync_seq.ack(&rev_id).is_ok() {
-            self.inner.ack(rev_id).await;
-        }
-        Ok(())
-    }
-
-    async fn next_sync_revision(&self) -> FlowyResult<Option<Revision>> {
-        if cfg!(feature = "flowy_unit_test") {
-            match self.sync_seq.next_rev_id() {
-                None => Ok(None),
-                Some(rev_id) => Ok(self.inner.get(rev_id).await.map(|record| record.revision)),
-            }
-        } else {
-            Ok(None)
-        }
-    }
-
-    async fn reset(&self, revisions: Vec<Revision>) -> FlowyResult<()> {
-        self.inner.reset_with_revisions(&self.object_id, revisions).await?;
-        Ok(())
-    }
-}
-
-impl std::ops::Deref for RevisionCompressor {
-    type Target = Arc<RevisionCache>;
-
-    fn deref(&self) -> &Self::Target {
-        &self.inner
-    }
-}
-
-#[derive(Default)]
-struct RevisionSyncSequence(VecDeque<i64>);
-impl RevisionSyncSequence {
-    fn new() -> Self {
-        RevisionSyncSequence::default()
-    }
-
-    fn add(&mut self, new_rev_id: i64) -> FlowyResult<()> {
-        // The last revision's rev_id must be greater than the new one.
-        if let Some(rev_id) = self.0.back() {
-            if *rev_id >= new_rev_id {
-                return Err(
-                    FlowyError::internal().context(format!("The new revision's id must be greater than {}", rev_id))
-                );
-            }
-        }
-        self.0.push_back(new_rev_id);
-        Ok(())
-    }
-
-    fn ack(&mut self, rev_id: &i64) -> FlowyResult<()> {
-        let cur_rev_id = self.0.front().cloned();
-        if let Some(pop_rev_id) = cur_rev_id {
-            if &pop_rev_id != rev_id {
-                let desc = format!(
-                    "The ack rev_id:{} is not equal to the current rev_id:{}",
-                    rev_id, pop_rev_id
-                );
-                return Err(FlowyError::internal().context(desc));
-            }
-            let _ = self.0.pop_front();
-        }
-        Ok(())
-    }
-
-    fn next_rev_id(&self) -> Option<i64> {
-        self.0.front().cloned()
-    }
-
-    fn reset(&mut self, new_seq: VecDeque<i64>) {
-        self.0 = new_seq;
-    }
-
-    fn len(&self) -> usize {
-        self.0.len()
-    }
-
-    // Compact the rev_ids into one except the current synchronizing rev_id.
-    fn compact(&self) -> Option<(RevisionRange, VecDeque<i64>)> {
-        self.next_rev_id()?;
-
-        let mut new_seq = self.0.clone();
-        let mut drained = new_seq.drain(1..).collect::<VecDeque<_>>();
-
-        let start = drained.pop_front()?;
-        let end = drained.pop_back().unwrap_or(start);
-        Some((RevisionRange { start, end }, new_seq))
-    }
-}
-
 struct RevisionLoader {
 struct RevisionLoader {
     object_id: String,
     object_id: String,
     user_id: String,
     user_id: String,
     cloud: Arc<dyn RevisionCloudService>,
     cloud: Arc<dyn RevisionCloudService>,
-    rev_compressor: Arc<RwLock<RevisionCompressor>>,
+    rev_cache: Arc<RevisionCache>,
 }
 }
 
 
 impl RevisionLoader {
 impl RevisionLoader {
-    async fn load<C>(&self) -> Result<(Vec<Revision>, i64), FlowyError>
-    where
-        C: RevisionCompact,
-    {
-        let records = self.rev_compressor.read().await.batch_get(&self.object_id)?;
+    async fn load(&self) -> Result<(Vec<Revision>, i64), FlowyError> {
+        let records = self.rev_cache.batch_get(&self.object_id)?;
         let revisions: Vec<Revision>;
         let revisions: Vec<Revision>;
         let mut rev_id = 0;
         let mut rev_id = 0;
         if records.is_empty() {
         if records.is_empty() {
             let remote_revisions = self.cloud.fetch_object(&self.user_id, &self.object_id).await?;
             let remote_revisions = self.cloud.fetch_object(&self.user_id, &self.object_id).await?;
             for revision in &remote_revisions {
             for revision in &remote_revisions {
                 rev_id = revision.rev_id;
                 rev_id = revision.rev_id;
-                let _ = self.rev_compressor.read().await.add_ack_revision(revision).await?;
+                let _ = self.rev_cache.add_ack_revision(revision).await?;
             }
             }
             revisions = remote_revisions;
             revisions = remote_revisions;
         } else {
         } else {
@@ -347,12 +164,7 @@ impl RevisionLoader {
                 rev_id = record.revision.rev_id;
                 rev_id = record.revision.rev_id;
                 if record.state == RevisionState::Sync {
                 if record.state == RevisionState::Sync {
                     // Sync the records if their state is RevisionState::Sync.
                     // Sync the records if their state is RevisionState::Sync.
-                    let _ = self
-                        .rev_compressor
-                        .write()
-                        .await
-                        .add_sync_revision(&record.revision)
-                        .await?;
+                    let _ = self.rev_cache.sync_revision(&record.revision).await?;
                 }
                 }
             }
             }
             revisions = records.into_iter().map(|record| record.revision).collect::<_>();
             revisions = records.into_iter().map(|record| record.revision).collect::<_>();