浏览代码

refactor sync seq

appflowy 3 年之前
父节点
当前提交
61dd68199e

+ 6 - 18
frontend/rust-lib/flowy-sync/src/cache/mod.rs

@@ -45,12 +45,7 @@ impl RevisionCache {
         }
     }
 
-    pub async fn add(
-        &self,
-        revision: Revision,
-        state: RevisionState,
-        write_to_disk: bool,
-    ) -> FlowyResult<RevisionRecord> {
+    pub async fn add(&self, revision: Revision, state: RevisionState, write_to_disk: bool) -> FlowyResult<()> {
         if self.memory_cache.contains(&revision.rev_id) {
             return Err(FlowyError::internal().context(format!("Duplicate revision: {} {:?}", revision.rev_id, state)));
         }
@@ -62,12 +57,11 @@ impl RevisionCache {
             write_to_disk,
         };
 
-        self.memory_cache.add(Cow::Borrowed(&record)).await;
+        self.memory_cache.add(Cow::Owned(record)).await;
         self.set_latest_rev_id(rev_id);
-        Ok(record)
+        Ok(())
     }
 
-    #[allow(dead_code)]
     pub async fn ack(&self, rev_id: i64) {
         self.memory_cache.ack(&rev_id).await;
     }
@@ -79,10 +73,9 @@ impl RevisionCache {
                 .read_revision_records(&self.object_id, Some(vec![rev_id]))
             {
                 Ok(mut records) => {
-                    if !records.is_empty() {
-                        assert_eq!(records.len(), 1);
-                    }
-                    records.pop()
+                    let record = records.pop()?;
+                    assert!(records.is_empty());
+                    Some(record)
                 }
                 Err(e) => {
                     tracing::error!("{}", e);
@@ -97,11 +90,6 @@ impl RevisionCache {
         self.disk_cache.read_revision_records(doc_id, None)
     }
 
-    pub async fn latest_revision(&self) -> Revision {
-        let rev_id = self.latest_rev_id.load(SeqCst);
-        self.get(rev_id).await.unwrap().revision
-    }
-
     pub async fn revisions_in_range(&self, range: RevisionRange) -> FlowyResult<Vec<Revision>> {
         let mut records = self.memory_cache.get_with_range(&range).await?;
         let range_len = range.len() as usize;

+ 26 - 68
frontend/rust-lib/flowy-sync/src/rev_manager.rs

@@ -1,5 +1,5 @@
-use crate::{RevisionCache, RevisionRecord};
-use dashmap::DashMap;
+use crate::RevisionCache;
+
 use flowy_collaboration::{
     entities::revision::{RepeatedRevision, Revision, RevisionRange, RevisionState},
     util::{pair_rev_id_from_revisions, RevIdCounter},
@@ -23,7 +23,7 @@ pub struct RevisionManager {
     user_id: String,
     rev_id_counter: RevIdCounter,
     revision_cache: Arc<RevisionCache>,
-    revision_sync_seq: Arc<RevisionSyncSequence>,
+    sync_seq: Arc<RevisionSyncSequence>,
 
     #[cfg(feature = "flowy_unit_test")]
     revision_ack_notifier: tokio::sync::broadcast::Sender<i64>,
@@ -32,7 +32,7 @@ pub struct RevisionManager {
 impl RevisionManager {
     pub fn new(user_id: &str, object_id: &str, revision_cache: Arc<RevisionCache>) -> Self {
         let rev_id_counter = RevIdCounter::new(0);
-        let revision_sync_seq = Arc::new(RevisionSyncSequence::new());
+        let sync_seq = Arc::new(RevisionSyncSequence::new());
         #[cfg(feature = "flowy_unit_test")]
         let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1);
 
@@ -41,7 +41,7 @@ impl RevisionManager {
             user_id: user_id.to_owned(),
             rev_id_counter,
             revision_cache,
-            revision_sync_seq,
+            sync_seq,
 
             #[cfg(feature = "flowy_unit_test")]
             revision_ack_notifier,
@@ -57,7 +57,7 @@ impl RevisionManager {
             user_id: self.user_id.clone(),
             cloud,
             revision_cache: self.revision_cache.clone(),
-            revision_sync_seq: self.revision_sync_seq.clone(),
+            revision_sync_seq: self.sync_seq.clone(),
         }
         .load()
         .await?;
@@ -95,18 +95,17 @@ impl RevisionManager {
             return Err(FlowyError::internal().context("Delta data should be empty"));
         }
 
-        let record = self
-            .revision_cache
+        self.sync_seq.add_record(revision.rev_id).await?;
+        self.revision_cache
             .add(revision.clone(), RevisionState::Sync, true)
             .await?;
-        self.revision_sync_seq.add_revision_record(record).await?;
+
         Ok(())
     }
 
     #[tracing::instrument(level = "debug", skip(self), err)]
     pub async fn ack_revision(&self, rev_id: i64) -> Result<(), FlowyError> {
-        #[cfg(feature = "flowy_unit_test")]
-        if self.revision_sync_seq.ack(&rev_id).await.is_ok() {
+        if self.sync_seq.ack(&rev_id).await.is_ok() {
             self.revision_cache.ack(rev_id).await;
 
             #[cfg(feature = "flowy_unit_test")]
@@ -132,40 +131,25 @@ impl RevisionManager {
     }
 
     pub fn next_sync_revision(&self) -> FutureResult<Option<Revision>, FlowyError> {
-        let revision_sync_seq = self.revision_sync_seq.clone();
+        let sync_seq = self.sync_seq.clone();
         let revision_cache = self.revision_cache.clone();
         FutureResult::new(async move {
-            match revision_sync_seq.next_sync_revision_record().await {
-                None => match revision_sync_seq.next_sync_rev_id().await {
-                    None => Ok(None),
-                    Some(rev_id) => Ok(revision_cache.get(rev_id).await.map(|record| record.revision)),
-                },
-                Some((_, record)) => Ok(Some(record.revision)),
+            match sync_seq.next_rev_id().await {
+                None => Ok(None),
+                Some(rev_id) => Ok(revision_cache.get(rev_id).await.map(|record| record.revision)),
             }
         })
     }
 
-    pub async fn latest_revision(&self) -> Revision {
-        self.revision_cache.latest_revision().await
-    }
-
     pub async fn get_revision(&self, rev_id: i64) -> Option<Revision> {
         self.revision_cache.get(rev_id).await.map(|record| record.revision)
     }
 }
 
-struct RevisionSyncSequence {
-    revs_map: Arc<DashMap<i64, RevisionRecord>>,
-    local_revs: Arc<RwLock<VecDeque<i64>>>,
-}
-
+struct RevisionSyncSequence(Arc<RwLock<VecDeque<i64>>>);
 impl std::default::Default for RevisionSyncSequence {
     fn default() -> Self {
-        let local_revs = Arc::new(RwLock::new(VecDeque::new()));
-        RevisionSyncSequence {
-            revs_map: Arc::new(DashMap::new()),
-            local_revs,
-        }
+        RevisionSyncSequence(Arc::new(RwLock::new(VecDeque::new())))
     }
 }
 
@@ -174,27 +158,22 @@ impl RevisionSyncSequence {
         RevisionSyncSequence::default()
     }
 
-    async fn add_revision_record(&self, record: RevisionRecord) -> FlowyResult<()> {
-        if !record.state.is_need_sync() {
-            return Ok(());
-        }
-
+    async fn add_record(&self, new_rev_id: i64) -> FlowyResult<()> {
         // The last revision's rev_id must be greater than the new one.
-        if let Some(rev_id) = self.local_revs.read().await.back() {
-            if *rev_id >= record.revision.rev_id {
+        if let Some(rev_id) = self.0.read().await.back() {
+            if *rev_id >= new_rev_id {
                 return Err(
                     FlowyError::internal().context(format!("The new revision's id must be greater than {}", rev_id))
                 );
             }
         }
-        self.local_revs.write().await.push_back(record.revision.rev_id);
-        self.revs_map.insert(record.revision.rev_id, record);
+        self.0.write().await.push_back(new_rev_id);
         Ok(())
     }
 
-    #[allow(dead_code)]
     async fn ack(&self, rev_id: &i64) -> FlowyResult<()> {
-        if let Some(pop_rev_id) = self.next_sync_rev_id().await {
+        let cur_rev_id = self.0.read().await.front().cloned();
+        if let Some(pop_rev_id) = cur_rev_id {
             if &pop_rev_id != rev_id {
                 let desc = format!(
                     "The ack rev_id:{} is not equal to the current rev_id:{}",
@@ -202,22 +181,13 @@ impl RevisionSyncSequence {
                 );
                 return Err(FlowyError::internal().context(desc));
             }
-
-            self.revs_map.remove(&pop_rev_id);
-            let _ = self.local_revs.write().await.pop_front();
+            let _ = self.0.write().await.pop_front();
         }
         Ok(())
     }
 
-    async fn next_sync_revision_record(&self) -> Option<(i64, RevisionRecord)> {
-        match self.local_revs.read().await.front() {
-            None => None,
-            Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())),
-        }
-    }
-
-    async fn next_sync_rev_id(&self) -> Option<i64> {
-        self.local_revs.read().await.front().copied()
+    async fn next_rev_id(&self) -> Option<i64> {
+        self.0.read().await.front().cloned()
     }
 }
 
@@ -250,7 +220,7 @@ impl RevisionLoader {
                     rev_id = record.revision.rev_id;
                     if record.state == RevisionState::Sync {
                         // Sync the records if their state is RevisionState::Sync.
-                        let _ = self.revision_sync_seq.add_revision_record(record.clone()).await?;
+                        let _ = self.revision_sync_seq.add_record(record.revision.rev_id).await?;
                         let _ = self.revision_cache.add(record.revision, record.state, false).await?;
                     }
                     Ok::<(), FlowyError>(())
@@ -272,18 +242,6 @@ impl RevisionLoader {
     }
 }
 
-#[cfg(feature = "flowy_unit_test")]
-impl RevisionSyncSequence {
-    #[allow(dead_code)]
-    pub fn revs_map(&self) -> Arc<DashMap<i64, RevisionRecord>> {
-        self.revs_map.clone()
-    }
-    #[allow(dead_code)]
-    pub fn pending_revs(&self) -> Arc<RwLock<VecDeque<i64>>> {
-        self.local_revs.clone()
-    }
-}
-
 #[cfg(feature = "flowy_unit_test")]
 impl RevisionManager {
     pub fn revision_cache(&self) -> Arc<RevisionCache> {

+ 12 - 8
frontend/rust-lib/flowy-sync/src/ws_manager.rs

@@ -287,15 +287,19 @@ impl RevisionWSSink {
     }
 
     async fn send_next_revision(&self) -> FlowyResult<()> {
-        match self.provider.next().await? {
-            None => {
-                tracing::trace!("[{}]: Finish synchronizing revisions", self);
-                Ok(())
-            }
-            Some(data) => {
-                tracing::trace!("[{}]: send {}:{}-{:?}", self, data.object_id, data.id(), data.ty);
-                self.ws_sender.send(data).await
+        if cfg!(feature = "flowy_unit_test") {
+            match self.provider.next().await? {
+                None => {
+                    tracing::trace!("[{}]: Finish synchronizing revisions", self);
+                    Ok(())
+                }
+                Some(data) => {
+                    tracing::trace!("[{}]: send {}:{}-{:?}", self, data.object_id, data.id(), data.ty);
+                    self.ws_sender.send(data).await
+                }
             }
+        } else {
+            Ok(())
         }
     }
 }