Przeglądaj źródła

fix: require write lock to make sure exclusive accest to sync_seq

appflowy 2 lat temu
rodzic
commit
eb13a252ac

+ 12 - 5
frontend/rust-lib/flowy-revision/src/rev_persistence.rs

@@ -73,12 +73,13 @@ impl RevisionPersistence {
         revision: &'a Revision,
         compactor: &Arc<dyn RevisionCompactor + 'a>,
     ) -> FlowyResult<i64> {
-        let result = self.sync_seq.read().await.compact();
+        let mut sync_seq_write_guard = self.sync_seq.write().await;
+        let result = sync_seq_write_guard.compact();
         match result {
             None => {
                 tracing::Span::current().record("rev_id", &revision.rev_id);
                 self.add(revision.clone(), RevisionState::Sync, true).await?;
-                self.sync_seq.write().await.add(revision.rev_id)?;
+                sync_seq_write_guard.add(revision.rev_id)?;
                 Ok(revision.rev_id)
             }
             Some((range, mut compact_seq)) => {
@@ -101,8 +102,10 @@ impl RevisionPersistence {
 
                 // replace the revisions in range with compact revision
                 self.compact(&range, compact_revision).await?;
-                debug_assert_eq!(self.sync_seq.read().await.len(), compact_seq.len());
-                self.sync_seq.write().await.reset(compact_seq);
+                //
+                debug_assert_eq!(compact_seq.len(), 2);
+                debug_assert_eq!(sync_seq_write_guard.len(), compact_seq.len());
+                sync_seq_write_guard.reset(compact_seq);
                 Ok(rev_id)
             }
         }
@@ -315,7 +318,11 @@ impl RevisionSyncSequence {
 
     // Compact the rev_ids into one except the current synchronizing rev_id.
     fn compact(&self) -> Option<(RevisionRange, VecDeque<i64>)> {
-        self.next_rev_id()?;
+        // Make sure there are two rev_id going to sync. No need to compact if there is only
+        // one rev_id in queue.
+        if self.next_rev_id().is_none() {
+            return None;
+        }
 
         let mut new_seq = self.0.clone();
         let mut drained = new_seq.drain(1..).collect::<VecDeque<_>>();