Forráskód Böngészése

compact the revsions

appflowy 3 éve
szülő
commit
4cdf3e3e3e
21 módosított fájl, 301 hozzáadás és 721 törlés
  1. 10 24
      frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/revision.pb.dart
  2. 3 4
      frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/revision.pbjson.dart
  3. 35 3
      frontend/rust-lib/flowy-core/src/services/folder_editor.rs
  4. 1 1
      frontend/rust-lib/flowy-core/tests/workspace/script.rs
  5. 4 1
      frontend/rust-lib/flowy-document/src/core/editor.rs
  6. 29 3
      frontend/rust-lib/flowy-document/src/core/queue.rs
  7. 1 1
      frontend/rust-lib/flowy-document/tests/document/edit_script.rs
  8. 1 0
      frontend/rust-lib/flowy-sync/src/cache/disk/mod.rs
  9. 8 5
      frontend/rust-lib/flowy-sync/src/cache/mod.rs
  10. 140 85
      frontend/rust-lib/flowy-sync/src/rev_manager.rs
  11. 4 10
      shared-lib/flowy-collaboration/src/client_folder/builder.rs
  12. 1 4
      shared-lib/flowy-collaboration/src/entities/revision.rs
  13. 58 103
      shared-lib/flowy-collaboration/src/protobuf/model/revision.rs
  14. 2 3
      shared-lib/flowy-collaboration/src/protobuf/proto/revision.proto
  15. 0 1
      shared-lib/flowy-collaboration/src/synchronizer.rs
  16. 4 0
      shared-lib/flowy-collaboration/src/util.rs
  17. 0 113
      shared-lib/flowy-core-data-model/src/entities/app/app_create.rs
  18. 0 177
      shared-lib/flowy-core-data-model/src/entities/view/view_create.rs
  19. 0 67
      shared-lib/flowy-core-data-model/src/entities/view/view_query.rs
  20. 0 74
      shared-lib/flowy-core-data-model/src/entities/workspace/workspace_create.rs
  21. 0 42
      shared-lib/flowy-core-data-model/src/entities/workspace/workspace_query.rs

+ 10 - 24
frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/revision.pb.dart

@@ -235,22 +235,17 @@ class RevId extends $pb.GeneratedMessage {
 
 class RevisionRange extends $pb.GeneratedMessage {
   static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RevisionRange', createEmptyInstance: create)
-    ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'objectId')
-    ..aInt64(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'start')
-    ..aInt64(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'end')
+    ..aInt64(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'start')
+    ..aInt64(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'end')
     ..hasRequiredFields = false
   ;
 
   RevisionRange._() : super();
   factory RevisionRange({
-    $core.String? objectId,
     $fixnum.Int64? start,
     $fixnum.Int64? end,
   }) {
     final _result = create();
-    if (objectId != null) {
-      _result.objectId = objectId;
-    }
     if (start != null) {
       _result.start = start;
     }
@@ -281,30 +276,21 @@ class RevisionRange extends $pb.GeneratedMessage {
   static RevisionRange? _defaultInstance;
 
   @$pb.TagNumber(1)
-  $core.String get objectId => $_getSZ(0);
+  $fixnum.Int64 get start => $_getI64(0);
   @$pb.TagNumber(1)
-  set objectId($core.String v) { $_setString(0, v); }
+  set start($fixnum.Int64 v) { $_setInt64(0, v); }
   @$pb.TagNumber(1)
-  $core.bool hasObjectId() => $_has(0);
+  $core.bool hasStart() => $_has(0);
   @$pb.TagNumber(1)
-  void clearObjectId() => clearField(1);
+  void clearStart() => clearField(1);
 
   @$pb.TagNumber(2)
-  $fixnum.Int64 get start => $_getI64(1);
+  $fixnum.Int64 get end => $_getI64(1);
   @$pb.TagNumber(2)
-  set start($fixnum.Int64 v) { $_setInt64(1, v); }
+  set end($fixnum.Int64 v) { $_setInt64(1, v); }
   @$pb.TagNumber(2)
-  $core.bool hasStart() => $_has(1);
+  $core.bool hasEnd() => $_has(1);
   @$pb.TagNumber(2)
-  void clearStart() => clearField(2);
-
-  @$pb.TagNumber(3)
-  $fixnum.Int64 get end => $_getI64(2);
-  @$pb.TagNumber(3)
-  set end($fixnum.Int64 v) { $_setInt64(2, v); }
-  @$pb.TagNumber(3)
-  $core.bool hasEnd() => $_has(2);
-  @$pb.TagNumber(3)
-  void clearEnd() => clearField(3);
+  void clearEnd() => clearField(2);
 }
 

+ 3 - 4
frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-collaboration/revision.pbjson.dart

@@ -70,11 +70,10 @@ final $typed_data.Uint8List revIdDescriptor = $convert.base64Decode('CgVSZXZJZBI
 const RevisionRange$json = const {
   '1': 'RevisionRange',
   '2': const [
-    const {'1': 'object_id', '3': 1, '4': 1, '5': 9, '10': 'objectId'},
-    const {'1': 'start', '3': 2, '4': 1, '5': 3, '10': 'start'},
-    const {'1': 'end', '3': 3, '4': 1, '5': 3, '10': 'end'},
+    const {'1': 'start', '3': 1, '4': 1, '5': 3, '10': 'start'},
+    const {'1': 'end', '3': 2, '4': 1, '5': 3, '10': 'end'},
   ],
 };
 
 /// Descriptor for `RevisionRange`. Decode as a `google.protobuf.DescriptorProto`.
-final $typed_data.Uint8List revisionRangeDescriptor = $convert.base64Decode('Cg1SZXZpc2lvblJhbmdlEhsKCW9iamVjdF9pZBgBIAEoCVIIb2JqZWN0SWQSFAoFc3RhcnQYAiABKANSBXN0YXJ0EhAKA2VuZBgDIAEoA1IDZW5k');
+final $typed_data.Uint8List revisionRangeDescriptor = $convert.base64Decode('Cg1SZXZpc2lvblJhbmdlEhQKBXN0YXJ0GAEgASgDUgVzdGFydBIQCgNlbmQYAiABKANSA2VuZA==');

+ 35 - 3
frontend/rust-lib/flowy-core/src/services/folder_editor.rs

@@ -5,12 +5,14 @@ use flowy_collaboration::{
 };
 
 use crate::controller::FolderId;
+use flowy_collaboration::util::make_delta_from_revisions;
 use flowy_error::{FlowyError, FlowyResult};
 use flowy_sync::{
-    RevisionCache, RevisionCloudService, RevisionManager, RevisionObjectBuilder, RevisionWebSocket,
+    RevisionCache, RevisionCloudService, RevisionCompact, RevisionManager, RevisionObjectBuilder, RevisionWebSocket,
     RevisionWebSocketManager,
 };
 use lib_infra::future::FutureResult;
+use lib_ot::core::PlainTextAttributes;
 use lib_sqlite::ConnectionPool;
 use parking_lot::RwLock;
 use std::sync::Arc;
@@ -36,7 +38,11 @@ impl FolderEditor {
         let cloud = Arc::new(FolderRevisionCloudServiceImpl {
             token: token.to_string(),
         });
-        let folder = Arc::new(RwLock::new(rev_manager.load::<FolderPadBuilder>(cloud).await?));
+        let folder = Arc::new(RwLock::new(
+            rev_manager
+                .load::<FolderPadBuilder, FolderRevisionCompact>(cloud)
+                .await?,
+        ));
         let rev_manager = Arc::new(rev_manager);
         let ws_manager = make_folder_ws_manager(
             user_id,
@@ -78,7 +84,11 @@ impl FolderEditor {
             &self.user_id,
             md5,
         );
-        let _ = futures::executor::block_on(async { self.rev_manager.add_local_revision(&revision).await })?;
+        let _ = futures::executor::block_on(async {
+            self.rev_manager
+                .add_local_revision::<FolderRevisionCompact>(&revision)
+                .await
+        })?;
         Ok(())
     }
 }
@@ -112,3 +122,25 @@ impl FolderEditor {
         self.rev_manager.clone()
     }
 }
+
+struct FolderRevisionCompact();
+impl RevisionCompact for FolderRevisionCompact {
+    fn compact_revisions(user_id: &str, object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Revision> {
+        match revisions.last() {
+            None => Err(FlowyError::internal().context("compact revisions is empty")),
+            Some(last_revision) => {
+                let (base_rev_id, rev_id) = last_revision.pair_rev_id();
+                let md5 = last_revision.md5.clone();
+                let delta = make_delta_from_revisions::<PlainTextAttributes>(revisions)?;
+                Ok(Revision::new(
+                    object_id,
+                    base_rev_id,
+                    rev_id,
+                    delta.to_bytes(),
+                    user_id,
+                    md5,
+                ))
+            }
+        }
+    }
+}

+ 1 - 1
frontend/rust-lib/flowy-core/tests/workspace/script.rs

@@ -97,7 +97,7 @@ impl FolderTest {
         let sdk = &self.sdk;
         let folder_editor: Arc<FolderEditor> = sdk.folder_manager.folder_editor().await;
         let rev_manager = folder_editor.rev_manager();
-        let cache = rev_manager.revision_cache();
+        let cache = rev_manager.revision_cache().await;
 
         match script {
             FolderScript::ReadAllWorkspaces => {

+ 4 - 1
frontend/rust-lib/flowy-document/src/core/editor.rs

@@ -1,3 +1,4 @@
+use crate::core::DocumentRevisionCompact;
 use crate::{
     core::{make_document_ws_manager, EditorCommand, EditorCommandQueue, EditorCommandSender},
     errors::FlowyError,
@@ -36,7 +37,9 @@ impl ClientDocumentEditor {
         web_socket: Arc<dyn RevisionWebSocket>,
         server: Arc<dyn RevisionCloudService>,
     ) -> FlowyResult<Arc<Self>> {
-        let document_info = rev_manager.load::<DocumentInfoBuilder>(server).await?;
+        let document_info = rev_manager
+            .load::<DocumentInfoBuilder, DocumentRevisionCompact>(server)
+            .await?;
         let delta = document_info.delta()?;
         let rev_manager = Arc::new(rev_manager);
         let doc_id = doc_id.to_string();

+ 29 - 3
frontend/rust-lib/flowy-document/src/core/queue.rs

@@ -1,12 +1,13 @@
 use crate::{core::web_socket::EditorCommandReceiver, DocumentUser};
 use async_stream::stream;
+use flowy_collaboration::util::make_delta_from_revisions;
 use flowy_collaboration::{
     client_document::{history::UndoResult, ClientDocument},
     entities::revision::{RevId, Revision},
     errors::CollaborateError,
 };
-use flowy_error::FlowyError;
-use flowy_sync::{DeltaMD5, RevisionManager, TransformDeltas};
+use flowy_error::{FlowyError, FlowyResult};
+use flowy_sync::{DeltaMD5, RevisionCompact, RevisionManager, TransformDeltas};
 use futures::stream::StreamExt;
 use lib_ot::{
     core::{Interval, OperationTransformable},
@@ -183,11 +184,36 @@ impl EditorCommandQueue {
             &user_id,
             md5,
         );
-        let _ = self.rev_manager.add_local_revision(&revision).await?;
+        let _ = self
+            .rev_manager
+            .add_local_revision::<DocumentRevisionCompact>(&revision)
+            .await?;
         Ok(rev_id.into())
     }
 }
 
+pub(crate) struct DocumentRevisionCompact();
+impl RevisionCompact for DocumentRevisionCompact {
+    fn compact_revisions(user_id: &str, object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Revision> {
+        match revisions.last() {
+            None => Err(FlowyError::internal().context("compact revisions is empty")),
+            Some(last_revision) => {
+                let (base_rev_id, rev_id) = last_revision.pair_rev_id();
+                let md5 = last_revision.md5.clone();
+                let delta = make_delta_from_revisions::<RichTextAttributes>(revisions)?;
+                Ok(Revision::new(
+                    object_id,
+                    base_rev_id,
+                    rev_id,
+                    delta.to_bytes(),
+                    user_id,
+                    md5,
+                ))
+            }
+        }
+    }
+}
+
 pub(crate) type Ret<T> = oneshot::Sender<Result<T, CollaborateError>>;
 
 pub(crate) enum EditorCommand {

+ 1 - 1
frontend/rust-lib/flowy-document/tests/document/edit_script.rs

@@ -38,7 +38,7 @@ impl EditorTest {
 
     async fn run_script(&mut self, script: EditorScript) {
         let rev_manager = self.editor.rev_manager();
-        let cache = rev_manager.revision_cache();
+        let cache = rev_manager.revision_cache().await;
         let _user_id = self.sdk.user_session.user_id().unwrap();
         // let ws_manager = self.sdk.ws_conn.clone();
         // let token = self.sdk.user_session.token().unwrap();

+ 1 - 0
frontend/rust-lib/flowy-sync/src/cache/disk/mod.rs

@@ -22,6 +22,7 @@ pub trait RevisionDiskCache: Sync + Send {
         rev_ids: Option<Vec<i64>>,
     ) -> Result<Vec<RevisionRecord>, Self::Error>;
 
+    // Read the revision which rev_id >= range.start && rev_id <= range.end
     fn read_revision_records_with_range(
         &self,
         object_id: &str,

+ 8 - 5
frontend/rust-lib/flowy-sync/src/cache/mod.rs

@@ -5,6 +5,7 @@ use crate::cache::{
     disk::{RevisionChangeset, RevisionDiskCache, RevisionTableState, SQLitePersistence},
     memory::{RevisionMemoryCache, RevisionMemoryCacheDelegate},
 };
+use crate::RevisionCompact;
 use flowy_collaboration::entities::revision::{Revision, RevisionRange, RevisionState};
 use flowy_database::ConnectionPool;
 use flowy_error::{internal_error, FlowyError, FlowyResult};
@@ -16,6 +17,7 @@ use std::{
     },
 };
 use tokio::task::spawn_blocking;
+
 pub const REVISION_WRITE_INTERVAL_IN_MILLIS: u64 = 600;
 
 pub struct RevisionCache {
@@ -90,13 +92,14 @@ impl RevisionCache {
         self.disk_cache.read_revision_records(doc_id, None)
     }
 
+    // Read the revision which rev_id >= range.start && rev_id <= range.end
     pub async fn revisions_in_range(&self, range: RevisionRange) -> FlowyResult<Vec<Revision>> {
         let mut records = self.memory_cache.get_with_range(&range).await?;
         let range_len = range.len() as usize;
         if records.len() != range_len {
             let disk_cache = self.disk_cache.clone();
-            let doc_id = self.object_id.clone();
-            records = spawn_blocking(move || disk_cache.read_revision_records_with_range(&doc_id, &range))
+            let object_id = self.object_id.clone();
+            records = spawn_blocking(move || disk_cache.read_revision_records_with_range(&object_id, &range))
                 .await
                 .map_err(internal_error)??;
 
@@ -110,8 +113,8 @@ impl RevisionCache {
             .collect::<Vec<Revision>>())
     }
 
-    #[tracing::instrument(level = "debug", skip(self, doc_id, revisions))]
-    pub async fn reset_with_revisions(&self, doc_id: &str, revisions: Vec<Revision>) -> FlowyResult<()> {
+    #[tracing::instrument(level = "debug", skip(self, revisions))]
+    pub async fn reset_with_revisions(&self, object_id: &str, revisions: Vec<Revision>) -> FlowyResult<()> {
         let revision_records = revisions
             .to_vec()
             .into_iter()
@@ -123,7 +126,7 @@ impl RevisionCache {
             .collect::<Vec<_>>();
 
         let _ = self.memory_cache.reset_with_revisions(&revision_records).await?;
-        let _ = self.disk_cache.reset_object(doc_id, revision_records)?;
+        let _ = self.disk_cache.reset_object(object_id, revision_records)?;
         Ok(())
     }
 

+ 140 - 85
frontend/rust-lib/flowy-sync/src/rev_manager.rs

@@ -6,6 +6,7 @@ use flowy_collaboration::{
 };
 use flowy_error::{FlowyError, FlowyResult};
 use lib_infra::future::FutureResult;
+use lib_ot::core::Attributes;
 use std::{collections::VecDeque, sync::Arc};
 use tokio::sync::RwLock;
 
@@ -18,12 +19,15 @@ pub trait RevisionObjectBuilder: Send + Sync {
     fn build_with_revisions(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output>;
 }
 
+pub trait RevisionCompact: Send + Sync {
+    fn compact_revisions(user_id: &str, object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Revision>;
+}
+
 pub struct RevisionManager {
     pub object_id: String,
     user_id: String,
     rev_id_counter: RevIdCounter,
-    revision_cache: Arc<RevisionCache>,
-    sync_seq: Arc<RevisionSyncSequence>,
+    cache: Arc<RwLock<RevisionCacheCompact>>,
 
     #[cfg(feature = "flowy_unit_test")]
     revision_ack_notifier: tokio::sync::broadcast::Sender<i64>,
@@ -32,7 +36,7 @@ pub struct RevisionManager {
 impl RevisionManager {
     pub fn new(user_id: &str, object_id: &str, revision_cache: Arc<RevisionCache>) -> Self {
         let rev_id_counter = RevIdCounter::new(0);
-        let sync_seq = Arc::new(RevisionSyncSequence::new());
+        let cache = Arc::new(RwLock::new(RevisionCacheCompact::new(object_id, revision_cache)));
         #[cfg(feature = "flowy_unit_test")]
         let (revision_ack_notifier, _) = tokio::sync::broadcast::channel(1);
 
@@ -40,38 +44,34 @@ impl RevisionManager {
             object_id: object_id.to_string(),
             user_id: user_id.to_owned(),
             rev_id_counter,
-            revision_cache,
-            sync_seq,
+            cache,
 
             #[cfg(feature = "flowy_unit_test")]
             revision_ack_notifier,
         }
     }
 
-    pub async fn load<Builder>(&mut self, cloud: Arc<dyn RevisionCloudService>) -> FlowyResult<Builder::Output>
+    pub async fn load<B, C>(&mut self, cloud: Arc<dyn RevisionCloudService>) -> FlowyResult<B::Output>
     where
-        Builder: RevisionObjectBuilder,
+        B: RevisionObjectBuilder,
+        C: RevisionCompact,
     {
         let (revisions, rev_id) = RevisionLoader {
             object_id: self.object_id.clone(),
             user_id: self.user_id.clone(),
             cloud,
-            revision_cache: self.revision_cache.clone(),
-            revision_sync_seq: self.sync_seq.clone(),
+            cache: self.cache.clone(),
         }
-        .load()
+        .load::<C>()
         .await?;
         self.rev_id_counter.set(rev_id);
-        Builder::build_with_revisions(&self.object_id, revisions)
+        B::build_with_revisions(&self.object_id, revisions)
     }
 
     #[tracing::instrument(level = "debug", skip(self, revisions), err)]
     pub async fn reset_object(&self, revisions: RepeatedRevision) -> FlowyResult<()> {
         let rev_id = pair_rev_id_from_revisions(&revisions).1;
-        let _ = self
-            .revision_cache
-            .reset_with_revisions(&self.object_id, revisions.into_inner())
-            .await?;
+        let _ = self.cache.write().await.reset(revisions.into_inner()).await?;
         self.rev_id_counter.set(rev_id);
         Ok(())
     }
@@ -81,33 +81,26 @@ impl RevisionManager {
         if revision.delta_data.is_empty() {
             return Err(FlowyError::internal().context("Delta data should be empty"));
         }
-        let _ = self
-            .revision_cache
-            .add(revision.clone(), RevisionState::Ack, true)
-            .await?;
+        self.cache.read().await.add_ack_revision(revision).await?;
         self.rev_id_counter.set(revision.rev_id);
         Ok(())
     }
 
     #[tracing::instrument(level = "debug", skip(self, revision))]
-    pub async fn add_local_revision(&self, revision: &Revision) -> Result<(), FlowyError> {
+    pub async fn add_local_revision<C>(&self, revision: &Revision) -> Result<(), FlowyError>
+    where
+        C: RevisionCompact,
+    {
         if revision.delta_data.is_empty() {
             return Err(FlowyError::internal().context("Delta data should be empty"));
         }
-
-        self.sync_seq.add_record(revision.rev_id).await?;
-        self.revision_cache
-            .add(revision.clone(), RevisionState::Sync, true)
-            .await?;
-
+        self.cache.write().await.add_sync_revision::<C>(revision, true).await?;
         Ok(())
     }
 
     #[tracing::instrument(level = "debug", skip(self), err)]
     pub async fn ack_revision(&self, rev_id: i64) -> Result<(), FlowyError> {
-        if self.sync_seq.ack(&rev_id).await.is_ok() {
-            self.revision_cache.ack(rev_id).await;
-
+        if self.cache.write().await.ack_revision(rev_id).await.is_ok() {
             #[cfg(feature = "flowy_unit_test")]
             let _ = self.revision_ack_notifier.send(rev_id);
         }
@@ -125,54 +118,119 @@ impl RevisionManager {
     }
 
     pub async fn get_revisions_in_range(&self, range: RevisionRange) -> Result<Vec<Revision>, FlowyError> {
-        debug_assert!(range.object_id == self.object_id);
-        let revisions = self.revision_cache.revisions_in_range(range.clone()).await?;
+        let revisions = self.cache.read().await.revisions_in_range(range.clone()).await?;
         Ok(revisions)
     }
 
-    pub fn next_sync_revision(&self) -> FutureResult<Option<Revision>, FlowyError> {
-        let sync_seq = self.sync_seq.clone();
-        let revision_cache = self.revision_cache.clone();
-        FutureResult::new(async move {
-            match sync_seq.next_rev_id().await {
-                None => Ok(None),
-                Some(rev_id) => Ok(revision_cache.get(rev_id).await.map(|record| record.revision)),
-            }
-        })
+    pub async fn next_sync_revision(&self) -> FlowyResult<Option<Revision>> {
+        Ok(self.cache.read().await.next_sync_revision().await?)
     }
 
     pub async fn get_revision(&self, rev_id: i64) -> Option<Revision> {
-        self.revision_cache.get(rev_id).await.map(|record| record.revision)
+        self.cache.read().await.get(rev_id).await.map(|record| record.revision)
     }
 }
 
-struct RevisionSyncSequence(Arc<RwLock<VecDeque<i64>>>);
-impl std::default::Default for RevisionSyncSequence {
-    fn default() -> Self {
-        RevisionSyncSequence(Arc::new(RwLock::new(VecDeque::new())))
+#[cfg(feature = "flowy_unit_test")]
+impl RevisionManager {
+    pub async fn revision_cache(&self) -> Arc<RevisionCache> {
+        self.cache.read().await.inner.clone()
+    }
+    pub fn revision_ack_receiver(&self) -> tokio::sync::broadcast::Receiver<i64> {
+        self.revision_ack_notifier.subscribe()
     }
 }
 
+struct RevisionCacheCompact {
+    object_id: String,
+    inner: Arc<RevisionCache>,
+    sync_seq: RevisionSyncSequence,
+}
+
+impl RevisionCacheCompact {
+    fn new(object_id: &str, inner: Arc<RevisionCache>) -> Self {
+        let sync_seq = RevisionSyncSequence::new();
+        let object_id = object_id.to_owned();
+        Self {
+            object_id,
+            inner,
+            sync_seq,
+        }
+    }
+
+    async fn add_ack_revision(&self, revision: &Revision) -> FlowyResult<()> {
+        self.inner.add(revision.clone(), RevisionState::Ack, true).await
+    }
+
+    async fn add_sync_revision<C>(&mut self, revision: &Revision, write_to_disk: bool) -> FlowyResult<()>
+    where
+        C: RevisionCompact,
+    {
+        // match self.sync_seq.remaining_rev_ids() {
+        //     None => {}
+        //     Some(range) => {
+        //         let revisions = self.inner.revisions_in_range(range).await?;
+        //         let compact_revision = C::compact_revisions("", "", revisions)?;
+        //     }
+        // }
+
+        self.inner
+            .add(revision.clone(), RevisionState::Sync, write_to_disk)
+            .await?;
+        self.sync_seq.add_record(revision.rev_id)?;
+        Ok(())
+    }
+
+    async fn ack_revision(&mut self, rev_id: i64) -> FlowyResult<()> {
+        if self.sync_seq.ack(&rev_id).is_ok() {
+            self.inner.ack(rev_id).await;
+        }
+        Ok(())
+    }
+
+    async fn next_sync_revision(&self) -> FlowyResult<Option<Revision>> {
+        match self.sync_seq.next_rev_id() {
+            None => Ok(None),
+            Some(rev_id) => Ok(self.inner.get(rev_id).await.map(|record| record.revision)),
+        }
+    }
+
+    async fn reset(&self, revisions: Vec<Revision>) -> FlowyResult<()> {
+        self.inner.reset_with_revisions(&self.object_id, revisions).await?;
+        Ok(())
+    }
+}
+
+impl std::ops::Deref for RevisionCacheCompact {
+    type Target = Arc<RevisionCache>;
+
+    fn deref(&self) -> &Self::Target {
+        &self.inner
+    }
+}
+
+#[derive(Default)]
+struct RevisionSyncSequence(VecDeque<i64>);
 impl RevisionSyncSequence {
     fn new() -> Self {
         RevisionSyncSequence::default()
     }
 
-    async fn add_record(&self, new_rev_id: i64) -> FlowyResult<()> {
+    fn add_record(&mut self, new_rev_id: i64) -> FlowyResult<()> {
         // The last revision's rev_id must be greater than the new one.
-        if let Some(rev_id) = self.0.read().await.back() {
+        if let Some(rev_id) = self.0.back() {
             if *rev_id >= new_rev_id {
                 return Err(
                     FlowyError::internal().context(format!("The new revision's id must be greater than {}", rev_id))
                 );
             }
         }
-        self.0.write().await.push_back(new_rev_id);
+        self.0.push_back(new_rev_id);
         Ok(())
     }
 
-    async fn ack(&self, rev_id: &i64) -> FlowyResult<()> {
-        let cur_rev_id = self.0.read().await.front().cloned();
+    fn ack(&mut self, rev_id: &i64) -> FlowyResult<()> {
+        let cur_rev_id = self.0.front().cloned();
         if let Some(pop_rev_id) = cur_rev_id {
             if &pop_rev_id != rev_id {
                 let desc = format!(
@@ -181,13 +239,25 @@ impl RevisionSyncSequence {
                 );
                 return Err(FlowyError::internal().context(desc));
             }
-            let _ = self.0.write().await.pop_front();
+            let _ = self.0.pop_front();
         }
         Ok(())
     }
 
-    async fn next_rev_id(&self) -> Option<i64> {
-        self.0.read().await.front().cloned()
+    fn next_rev_id(&self) -> Option<i64> {
+        self.0.front().cloned()
+    }
+
+    fn remaining_rev_ids(&self) -> Option<RevisionRange> {
+        if self.next_rev_id().is_some() {
+            let mut seq = self.0.clone();
+            let mut drained = seq.drain(1..).collect::<VecDeque<_>>();
+            let start = drained.pop_front()?;
+            let end = drained.pop_back().unwrap_or_else(|| start);
+            Some(RevisionRange { start, end })
+        } else {
+            None
+        }
     }
 }
 
@@ -195,42 +265,37 @@ struct RevisionLoader {
     object_id: String,
     user_id: String,
     cloud: Arc<dyn RevisionCloudService>,
-    revision_cache: Arc<RevisionCache>,
-    revision_sync_seq: Arc<RevisionSyncSequence>,
+    cache: Arc<RwLock<RevisionCacheCompact>>,
 }
 
 impl RevisionLoader {
-    async fn load(&self) -> Result<(Vec<Revision>, i64), FlowyError> {
-        let records = self.revision_cache.batch_get(&self.object_id)?;
+    async fn load<C>(&self) -> Result<(Vec<Revision>, i64), FlowyError>
+    where
+        C: RevisionCompact,
+    {
+        let records = self.cache.read().await.batch_get(&self.object_id)?;
         let revisions: Vec<Revision>;
         let mut rev_id = 0;
         if records.is_empty() {
             let remote_revisions = self.cloud.fetch_object(&self.user_id, &self.object_id).await?;
             for revision in &remote_revisions {
                 rev_id = revision.rev_id;
-                let _ = self
-                    .revision_cache
-                    .add(revision.clone(), RevisionState::Ack, true)
-                    .await?;
+                let _ = self.cache.read().await.add_ack_revision(revision).await?;
             }
             revisions = remote_revisions;
         } else {
-            for record in records.clone() {
-                let f = || async {
-                    rev_id = record.revision.rev_id;
-                    if record.state == RevisionState::Sync {
-                        // Sync the records if their state is RevisionState::Sync.
-                        let _ = self.revision_sync_seq.add_record(record.revision.rev_id).await?;
-                        let _ = self.revision_cache.add(record.revision, record.state, false).await?;
-                    }
-                    Ok::<(), FlowyError>(())
-                };
-                match f().await {
-                    Ok(_) => {}
-                    Err(e) => tracing::error!("[RevisionLoader]: {}", e),
+            for record in &records {
+                rev_id = record.revision.rev_id;
+                if record.state == RevisionState::Sync {
+                    // Sync the records if their state is RevisionState::Sync.
+                    let _ = self
+                        .cache
+                        .write()
+                        .await
+                        .add_sync_revision::<C>(&record.revision, false)
+                        .await?;
                 }
             }
-
             revisions = records.into_iter().map(|record| record.revision).collect::<_>();
         }
 
@@ -241,13 +306,3 @@ impl RevisionLoader {
         Ok((revisions, rev_id))
     }
 }
-
-#[cfg(feature = "flowy_unit_test")]
-impl RevisionManager {
-    pub fn revision_cache(&self) -> Arc<RevisionCache> {
-        self.revision_cache.clone()
-    }
-    pub fn revision_ack_receiver(&self) -> tokio::sync::broadcast::Receiver<i64> {
-        self.revision_ack_notifier.subscribe()
-    }
-}

+ 4 - 10
shared-lib/flowy-collaboration/src/client_folder/builder.rs

@@ -1,10 +1,12 @@
+use crate::entities::folder_info::FolderDelta;
+use crate::util::make_delta_from_revisions;
 use crate::{
     client_folder::{default_folder_delta, FolderPad},
     entities::revision::Revision,
     errors::{CollaborateError, CollaborateResult},
 };
 use flowy_core_data_model::entities::{trash::Trash, workspace::Workspace};
-use lib_ot::core::{OperationTransformable, PlainDelta, PlainDeltaBuilder};
+use lib_ot::core::{OperationTransformable, PlainDelta, PlainDeltaBuilder, PlainTextAttributes};
 use serde::{Deserialize, Serialize};
 use std::sync::Arc;
 
@@ -45,15 +47,7 @@ impl FolderPadBuilder {
     }
 
     pub(crate) fn build_with_revisions(self, revisions: Vec<Revision>) -> CollaborateResult<FolderPad> {
-        let mut folder_delta = PlainDelta::new();
-        for revision in revisions {
-            if revision.delta_data.is_empty() {
-                tracing::warn!("revision delta_data is empty");
-            }
-
-            let delta = PlainDelta::from_bytes(revision.delta_data)?;
-            folder_delta = folder_delta.compose(&delta)?;
-        }
+        let folder_delta: FolderDelta = make_delta_from_revisions::<PlainTextAttributes>(revisions)?;
         self.build_with_delta(folder_delta)
     }
 

+ 1 - 4
shared-lib/flowy-collaboration/src/entities/revision.rs

@@ -173,12 +173,9 @@ impl std::fmt::Display for RevId {
 #[derive(Debug, Clone, Default, ProtoBuf)]
 pub struct RevisionRange {
     #[pb(index = 1)]
-    pub object_id: String,
-
-    #[pb(index = 2)]
     pub start: i64,
 
-    #[pb(index = 3)]
+    #[pb(index = 2)]
     pub end: i64,
 }
 

+ 58 - 103
shared-lib/flowy-collaboration/src/protobuf/model/revision.rs

@@ -730,7 +730,6 @@ impl ::protobuf::reflect::ProtobufValue for RevId {
 #[derive(PartialEq,Clone,Default)]
 pub struct RevisionRange {
     // message fields
-    pub object_id: ::std::string::String,
     pub start: i64,
     pub end: i64,
     // special fields
@@ -749,33 +748,7 @@ impl RevisionRange {
         ::std::default::Default::default()
     }
 
-    // string object_id = 1;
-
-
-    pub fn get_object_id(&self) -> &str {
-        &self.object_id
-    }
-    pub fn clear_object_id(&mut self) {
-        self.object_id.clear();
-    }
-
-    // Param is passed by value, moved
-    pub fn set_object_id(&mut self, v: ::std::string::String) {
-        self.object_id = v;
-    }
-
-    // Mutable pointer to the field.
-    // If field is not initialized, it is initialized with default value first.
-    pub fn mut_object_id(&mut self) -> &mut ::std::string::String {
-        &mut self.object_id
-    }
-
-    // Take field
-    pub fn take_object_id(&mut self) -> ::std::string::String {
-        ::std::mem::replace(&mut self.object_id, ::std::string::String::new())
-    }
-
-    // int64 start = 2;
+    // int64 start = 1;
 
 
     pub fn get_start(&self) -> i64 {
@@ -790,7 +763,7 @@ impl RevisionRange {
         self.start = v;
     }
 
-    // int64 end = 3;
+    // int64 end = 2;
 
 
     pub fn get_end(&self) -> i64 {
@@ -816,16 +789,13 @@ impl ::protobuf::Message for RevisionRange {
             let (field_number, wire_type) = is.read_tag_unpack()?;
             match field_number {
                 1 => {
-                    ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.object_id)?;
-                },
-                2 => {
                     if wire_type != ::protobuf::wire_format::WireTypeVarint {
                         return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
                     }
                     let tmp = is.read_int64()?;
                     self.start = tmp;
                 },
-                3 => {
+                2 => {
                     if wire_type != ::protobuf::wire_format::WireTypeVarint {
                         return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
                     }
@@ -844,14 +814,11 @@ impl ::protobuf::Message for RevisionRange {
     #[allow(unused_variables)]
     fn compute_size(&self) -> u32 {
         let mut my_size = 0;
-        if !self.object_id.is_empty() {
-            my_size += ::protobuf::rt::string_size(1, &self.object_id);
-        }
         if self.start != 0 {
-            my_size += ::protobuf::rt::value_size(2, self.start, ::protobuf::wire_format::WireTypeVarint);
+            my_size += ::protobuf::rt::value_size(1, self.start, ::protobuf::wire_format::WireTypeVarint);
         }
         if self.end != 0 {
-            my_size += ::protobuf::rt::value_size(3, self.end, ::protobuf::wire_format::WireTypeVarint);
+            my_size += ::protobuf::rt::value_size(2, self.end, ::protobuf::wire_format::WireTypeVarint);
         }
         my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
         self.cached_size.set(my_size);
@@ -859,14 +826,11 @@ impl ::protobuf::Message for RevisionRange {
     }
 
     fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
-        if !self.object_id.is_empty() {
-            os.write_string(1, &self.object_id)?;
-        }
         if self.start != 0 {
-            os.write_int64(2, self.start)?;
+            os.write_int64(1, self.start)?;
         }
         if self.end != 0 {
-            os.write_int64(3, self.end)?;
+            os.write_int64(2, self.end)?;
         }
         os.write_unknown_fields(self.get_unknown_fields())?;
         ::std::result::Result::Ok(())
@@ -906,11 +870,6 @@ impl ::protobuf::Message for RevisionRange {
         static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
         descriptor.get(|| {
             let mut fields = ::std::vec::Vec::new();
-            fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
-                "object_id",
-                |m: &RevisionRange| { &m.object_id },
-                |m: &mut RevisionRange| { &mut m.object_id },
-            ));
             fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>(
                 "start",
                 |m: &RevisionRange| { &m.start },
@@ -937,7 +896,6 @@ impl ::protobuf::Message for RevisionRange {
 
 impl ::protobuf::Clear for RevisionRange {
     fn clear(&mut self) {
-        self.object_id.clear();
         self.start = 0;
         self.end = 0;
         self.unknown_fields.clear();
@@ -1065,60 +1023,57 @@ static file_descriptor_proto_data: &'static [u8] = b"\
     evTypeR\x02ty\x12\x17\n\x07user_id\x18\x07\x20\x01(\tR\x06userId\"3\n\
     \x10RepeatedRevision\x12\x1f\n\x05items\x18\x01\x20\x03(\x0b2\t.Revision\
     R\x05items\"\x1d\n\x05RevId\x12\x14\n\x05value\x18\x01\x20\x01(\x03R\x05\
-    value\"T\n\rRevisionRange\x12\x1b\n\tobject_id\x18\x01\x20\x01(\tR\x08ob\
-    jectId\x12\x14\n\x05start\x18\x02\x20\x01(\x03R\x05start\x12\x10\n\x03en\
-    d\x18\x03\x20\x01(\x03R\x03end*\"\n\rRevisionState\x12\x08\n\x04Sync\x10\
-    \0\x12\x07\n\x03Ack\x10\x01*4\n\x07RevType\x12\x13\n\x0fDeprecatedLocal\
-    \x10\0\x12\x14\n\x10DeprecatedRemote\x10\x01J\xe8\x07\n\x06\x12\x04\0\0\
-    \x1d\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\n\
-    \x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\
-    \x03\x03\x04\x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\
-    \x05\x04\0\x02\0\x01\x12\x03\x03\n\x15\n\x0c\n\x05\x04\0\x02\0\x03\x12\
-    \x03\x03\x18\x19\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x15\n\x0c\n\
-    \x05\x04\0\x02\x01\x05\x12\x03\x04\x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\
-    \x12\x03\x04\n\x10\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x13\x14\n\
-    \x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x19\n\x0c\n\x05\x04\0\x02\x02\
-    \x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x14\n\
-    \x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x17\x18\n\x0b\n\x04\x04\0\x02\
-    \x03\x12\x03\x06\x04\x13\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x06\x04\n\
-    \n\x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x06\x0b\x0e\n\x0c\n\x05\x04\0\x02\
-    \x03\x03\x12\x03\x06\x11\x12\n\x0b\n\x04\x04\0\x02\x04\x12\x03\x07\x04\
-    \x19\n\x0c\n\x05\x04\0\x02\x04\x05\x12\x03\x07\x04\n\n\x0c\n\x05\x04\0\
-    \x02\x04\x01\x12\x03\x07\x0b\x14\n\x0c\n\x05\x04\0\x02\x04\x03\x12\x03\
-    \x07\x17\x18\n\x0b\n\x04\x04\0\x02\x05\x12\x03\x08\x04\x13\n\x0c\n\x05\
-    \x04\0\x02\x05\x06\x12\x03\x08\x04\x0b\n\x0c\n\x05\x04\0\x02\x05\x01\x12\
-    \x03\x08\x0c\x0e\n\x0c\n\x05\x04\0\x02\x05\x03\x12\x03\x08\x11\x12\n\x0b\
-    \n\x04\x04\0\x02\x06\x12\x03\t\x04\x17\n\x0c\n\x05\x04\0\x02\x06\x05\x12\
-    \x03\t\x04\n\n\x0c\n\x05\x04\0\x02\x06\x01\x12\x03\t\x0b\x12\n\x0c\n\x05\
-    \x04\0\x02\x06\x03\x12\x03\t\x15\x16\n\n\n\x02\x04\x01\x12\x04\x0b\0\r\
-    \x01\n\n\n\x03\x04\x01\x01\x12\x03\x0b\x08\x18\n\x0b\n\x04\x04\x01\x02\0\
-    \x12\x03\x0c\x04\x20\n\x0c\n\x05\x04\x01\x02\0\x04\x12\x03\x0c\x04\x0c\n\
-    \x0c\n\x05\x04\x01\x02\0\x06\x12\x03\x0c\r\x15\n\x0c\n\x05\x04\x01\x02\0\
-    \x01\x12\x03\x0c\x16\x1b\n\x0c\n\x05\x04\x01\x02\0\x03\x12\x03\x0c\x1e\
-    \x1f\n\n\n\x02\x04\x02\x12\x04\x0e\0\x10\x01\n\n\n\x03\x04\x02\x01\x12\
-    \x03\x0e\x08\r\n\x0b\n\x04\x04\x02\x02\0\x12\x03\x0f\x04\x14\n\x0c\n\x05\
-    \x04\x02\x02\0\x05\x12\x03\x0f\x04\t\n\x0c\n\x05\x04\x02\x02\0\x01\x12\
-    \x03\x0f\n\x0f\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\x0f\x12\x13\n\n\n\
-    \x02\x04\x03\x12\x04\x11\0\x15\x01\n\n\n\x03\x04\x03\x01\x12\x03\x11\x08\
-    \x15\n\x0b\n\x04\x04\x03\x02\0\x12\x03\x12\x04\x19\n\x0c\n\x05\x04\x03\
-    \x02\0\x05\x12\x03\x12\x04\n\n\x0c\n\x05\x04\x03\x02\0\x01\x12\x03\x12\
-    \x0b\x14\n\x0c\n\x05\x04\x03\x02\0\x03\x12\x03\x12\x17\x18\n\x0b\n\x04\
-    \x04\x03\x02\x01\x12\x03\x13\x04\x14\n\x0c\n\x05\x04\x03\x02\x01\x05\x12\
-    \x03\x13\x04\t\n\x0c\n\x05\x04\x03\x02\x01\x01\x12\x03\x13\n\x0f\n\x0c\n\
-    \x05\x04\x03\x02\x01\x03\x12\x03\x13\x12\x13\n\x0b\n\x04\x04\x03\x02\x02\
-    \x12\x03\x14\x04\x12\n\x0c\n\x05\x04\x03\x02\x02\x05\x12\x03\x14\x04\t\n\
-    \x0c\n\x05\x04\x03\x02\x02\x01\x12\x03\x14\n\r\n\x0c\n\x05\x04\x03\x02\
-    \x02\x03\x12\x03\x14\x10\x11\n\n\n\x02\x05\0\x12\x04\x16\0\x19\x01\n\n\n\
-    \x03\x05\0\x01\x12\x03\x16\x05\x12\n\x0b\n\x04\x05\0\x02\0\x12\x03\x17\
-    \x04\r\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x17\x04\x08\n\x0c\n\x05\x05\0\
-    \x02\0\x02\x12\x03\x17\x0b\x0c\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x18\x04\
-    \x0c\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x18\x04\x07\n\x0c\n\x05\x05\0\
-    \x02\x01\x02\x12\x03\x18\n\x0b\n\n\n\x02\x05\x01\x12\x04\x1a\0\x1d\x01\n\
-    \n\n\x03\x05\x01\x01\x12\x03\x1a\x05\x0c\n\x0b\n\x04\x05\x01\x02\0\x12\
-    \x03\x1b\x04\x18\n\x0c\n\x05\x05\x01\x02\0\x01\x12\x03\x1b\x04\x13\n\x0c\
-    \n\x05\x05\x01\x02\0\x02\x12\x03\x1b\x16\x17\n\x0b\n\x04\x05\x01\x02\x01\
-    \x12\x03\x1c\x04\x19\n\x0c\n\x05\x05\x01\x02\x01\x01\x12\x03\x1c\x04\x14\
-    \n\x0c\n\x05\x05\x01\x02\x01\x02\x12\x03\x1c\x17\x18b\x06proto3\
+    value\"7\n\rRevisionRange\x12\x14\n\x05start\x18\x01\x20\x01(\x03R\x05st\
+    art\x12\x10\n\x03end\x18\x02\x20\x01(\x03R\x03end*\"\n\rRevisionState\
+    \x12\x08\n\x04Sync\x10\0\x12\x07\n\x03Ack\x10\x01*4\n\x07RevType\x12\x13\
+    \n\x0fDeprecatedLocal\x10\0\x12\x14\n\x10DeprecatedRemote\x10\x01J\xb1\
+    \x07\n\x06\x12\x04\0\0\x1c\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\
+    \x04\0\x12\x04\x02\0\n\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x10\n\x0b\
+    \n\x04\x04\0\x02\0\x12\x03\x03\x04\x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\
+    \x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\n\x15\n\x0c\n\x05\
+    \x04\0\x02\0\x03\x12\x03\x03\x18\x19\n\x0b\n\x04\x04\0\x02\x01\x12\x03\
+    \x04\x04\x15\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\t\n\x0c\n\x05\
+    \x04\0\x02\x01\x01\x12\x03\x04\n\x10\n\x0c\n\x05\x04\0\x02\x01\x03\x12\
+    \x03\x04\x13\x14\n\x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x19\n\x0c\n\
+    \x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\
+    \x12\x03\x05\n\x14\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x17\x18\n\
+    \x0b\n\x04\x04\0\x02\x03\x12\x03\x06\x04\x13\n\x0c\n\x05\x04\0\x02\x03\
+    \x05\x12\x03\x06\x04\n\n\x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x06\x0b\x0e\
+    \n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\x06\x11\x12\n\x0b\n\x04\x04\0\x02\
+    \x04\x12\x03\x07\x04\x19\n\x0c\n\x05\x04\0\x02\x04\x05\x12\x03\x07\x04\n\
+    \n\x0c\n\x05\x04\0\x02\x04\x01\x12\x03\x07\x0b\x14\n\x0c\n\x05\x04\0\x02\
+    \x04\x03\x12\x03\x07\x17\x18\n\x0b\n\x04\x04\0\x02\x05\x12\x03\x08\x04\
+    \x13\n\x0c\n\x05\x04\0\x02\x05\x06\x12\x03\x08\x04\x0b\n\x0c\n\x05\x04\0\
+    \x02\x05\x01\x12\x03\x08\x0c\x0e\n\x0c\n\x05\x04\0\x02\x05\x03\x12\x03\
+    \x08\x11\x12\n\x0b\n\x04\x04\0\x02\x06\x12\x03\t\x04\x17\n\x0c\n\x05\x04\
+    \0\x02\x06\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\0\x02\x06\x01\x12\x03\t\
+    \x0b\x12\n\x0c\n\x05\x04\0\x02\x06\x03\x12\x03\t\x15\x16\n\n\n\x02\x04\
+    \x01\x12\x04\x0b\0\r\x01\n\n\n\x03\x04\x01\x01\x12\x03\x0b\x08\x18\n\x0b\
+    \n\x04\x04\x01\x02\0\x12\x03\x0c\x04\x20\n\x0c\n\x05\x04\x01\x02\0\x04\
+    \x12\x03\x0c\x04\x0c\n\x0c\n\x05\x04\x01\x02\0\x06\x12\x03\x0c\r\x15\n\
+    \x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x0c\x16\x1b\n\x0c\n\x05\x04\x01\x02\
+    \0\x03\x12\x03\x0c\x1e\x1f\n\n\n\x02\x04\x02\x12\x04\x0e\0\x10\x01\n\n\n\
+    \x03\x04\x02\x01\x12\x03\x0e\x08\r\n\x0b\n\x04\x04\x02\x02\0\x12\x03\x0f\
+    \x04\x14\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0f\x04\t\n\x0c\n\x05\x04\
+    \x02\x02\0\x01\x12\x03\x0f\n\x0f\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\
+    \x0f\x12\x13\n\n\n\x02\x04\x03\x12\x04\x11\0\x14\x01\n\n\n\x03\x04\x03\
+    \x01\x12\x03\x11\x08\x15\n\x0b\n\x04\x04\x03\x02\0\x12\x03\x12\x04\x14\n\
+    \x0c\n\x05\x04\x03\x02\0\x05\x12\x03\x12\x04\t\n\x0c\n\x05\x04\x03\x02\0\
+    \x01\x12\x03\x12\n\x0f\n\x0c\n\x05\x04\x03\x02\0\x03\x12\x03\x12\x12\x13\
+    \n\x0b\n\x04\x04\x03\x02\x01\x12\x03\x13\x04\x12\n\x0c\n\x05\x04\x03\x02\
+    \x01\x05\x12\x03\x13\x04\t\n\x0c\n\x05\x04\x03\x02\x01\x01\x12\x03\x13\n\
+    \r\n\x0c\n\x05\x04\x03\x02\x01\x03\x12\x03\x13\x10\x11\n\n\n\x02\x05\0\
+    \x12\x04\x15\0\x18\x01\n\n\n\x03\x05\0\x01\x12\x03\x15\x05\x12\n\x0b\n\
+    \x04\x05\0\x02\0\x12\x03\x16\x04\r\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\
+    \x16\x04\x08\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x16\x0b\x0c\n\x0b\n\x04\
+    \x05\0\x02\x01\x12\x03\x17\x04\x0c\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\
+    \x17\x04\x07\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x17\n\x0b\n\n\n\x02\
+    \x05\x01\x12\x04\x19\0\x1c\x01\n\n\n\x03\x05\x01\x01\x12\x03\x19\x05\x0c\
+    \n\x0b\n\x04\x05\x01\x02\0\x12\x03\x1a\x04\x18\n\x0c\n\x05\x05\x01\x02\0\
+    \x01\x12\x03\x1a\x04\x13\n\x0c\n\x05\x05\x01\x02\0\x02\x12\x03\x1a\x16\
+    \x17\n\x0b\n\x04\x05\x01\x02\x01\x12\x03\x1b\x04\x19\n\x0c\n\x05\x05\x01\
+    \x02\x01\x01\x12\x03\x1b\x04\x14\n\x0c\n\x05\x05\x01\x02\x01\x02\x12\x03\
+    \x1b\x17\x18b\x06proto3\
 ";
 
 static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

+ 2 - 3
shared-lib/flowy-collaboration/src/protobuf/proto/revision.proto

@@ -16,9 +16,8 @@ message RevId {
     int64 value = 1;
 }
 message RevisionRange {
-    string object_id = 1;
-    int64 start = 2;
-    int64 end = 3;
+    int64 start = 1;
+    int64 end = 2;
 }
 enum RevisionState {
     Sync = 0;

+ 0 - 1
shared-lib/flowy-collaboration/src/synchronizer.rs

@@ -118,7 +118,6 @@ where
                 } else {
                     // The server delta is outdated, pull the missing revision from the client.
                     let range = RevisionRange {
-                        object_id: self.object_id.clone(),
                         start: server_rev_id,
                         end: first_revision.rev_id,
                     };

+ 4 - 0
shared-lib/flowy-collaboration/src/util.rs

@@ -72,6 +72,10 @@ where
 {
     let mut delta = Delta::<T>::new();
     for revision in revisions {
+        if revision.delta_data.is_empty() {
+            tracing::warn!("revision delta_data is empty");
+        }
+
         let revision_delta = Delta::<T>::from_bytes(revision.delta_data).map_err(|e| {
             let err_msg = format!("Deserialize remote revision failed: {:?}", e);
             CollaborateError::internal().context(err_msg)

+ 0 - 113
shared-lib/flowy-core-data-model/src/entities/app/app_create.rs

@@ -1,113 +0,0 @@
-use crate::{
-    entities::view::RepeatedView,
-    errors::*,
-    impl_def_and_def_mut,
-    parser::{
-        app::{AppColorStyle, AppName},
-        workspace::WorkspaceIdentify,
-    },
-};
-use flowy_derive::ProtoBuf;
-use std::convert::TryInto;
-
-#[derive(ProtoBuf, Default)]
-pub struct CreateAppRequest {
-    #[pb(index = 1)]
-    pub workspace_id: String,
-
-    #[pb(index = 2)]
-    pub name: String,
-
-    #[pb(index = 3)]
-    pub desc: String,
-
-    #[pb(index = 4)]
-    pub color_style: ColorStyle,
-}
-
-#[derive(ProtoBuf, Default, Debug, Clone)]
-pub struct ColorStyle {
-    #[pb(index = 1)]
-    pub theme_color: String,
-}
-
-#[derive(ProtoBuf, Default, Debug)]
-pub struct CreateAppParams {
-    #[pb(index = 1)]
-    pub workspace_id: String,
-
-    #[pb(index = 2)]
-    pub name: String,
-
-    #[pb(index = 3)]
-    pub desc: String,
-
-    #[pb(index = 4)]
-    pub color_style: ColorStyle,
-}
-
-impl TryInto<CreateAppParams> for CreateAppRequest {
-    type Error = ErrorCode;
-
-    fn try_into(self) -> Result<CreateAppParams, Self::Error> {
-        let name = AppName::parse(self.name)?;
-        let id = WorkspaceIdentify::parse(self.workspace_id)?;
-        let color_style = AppColorStyle::parse(self.color_style.theme_color.clone())?;
-
-        Ok(CreateAppParams {
-            workspace_id: id.0,
-            name: name.0,
-            desc: self.desc,
-            color_style: color_style.into(),
-        })
-    }
-}
-
-impl std::convert::From<AppColorStyle> for ColorStyle {
-    fn from(data: AppColorStyle) -> Self {
-        ColorStyle {
-            theme_color: data.theme_color,
-        }
-    }
-}
-
-#[derive(PartialEq, ProtoBuf, Default, Debug, Clone)]
-pub struct App {
-    #[pb(index = 1)]
-    pub id: String,
-
-    #[pb(index = 2)]
-    pub workspace_id: String,
-
-    #[pb(index = 3)]
-    pub name: String,
-
-    #[pb(index = 4)]
-    pub desc: String,
-
-    #[pb(index = 5)]
-    pub belongings: RepeatedView,
-
-    #[pb(index = 6)]
-    pub version: i64,
-
-    #[pb(index = 7)]
-    pub modified_time: i64,
-
-    #[pb(index = 8)]
-    pub create_time: i64,
-}
-
-impl App {
-    pub fn take_belongings(&mut self) -> RepeatedView {
-        std::mem::take(&mut self.belongings)
-    }
-}
-
-#[derive(PartialEq, Debug, Default, ProtoBuf, Clone)]
-pub struct RepeatedApp {
-    #[pb(index = 1)]
-    pub items: Vec<App>,
-}
-
-impl_def_and_def_mut!(RepeatedApp, App);

+ 0 - 177
shared-lib/flowy-core-data-model/src/entities/view/view_create.rs

@@ -1,177 +0,0 @@
-use crate::{
-    entities::trash::{Trash, TrashType},
-    errors::ErrorCode,
-    impl_def_and_def_mut,
-    parser::{
-        app::AppIdentify,
-        view::{ViewName, ViewThumbnail},
-    },
-};
-use flowy_collaboration::document::default::initial_delta_string;
-use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
-use std::convert::TryInto;
-
-#[derive(PartialEq, Debug, ProtoBuf_Enum, Clone)]
-pub enum ViewType {
-    Blank = 0,
-    Doc = 1,
-}
-
-impl std::default::Default for ViewType {
-    fn default() -> Self {
-        ViewType::Blank
-    }
-}
-
-impl std::convert::From<i32> for ViewType {
-    fn from(val: i32) -> Self {
-        match val {
-            1 => ViewType::Doc,
-            0 => ViewType::Blank,
-            _ => {
-                log::error!("Invalid view type: {}", val);
-                ViewType::Blank
-            }
-        }
-    }
-}
-
-#[derive(Default, ProtoBuf)]
-pub struct CreateViewRequest {
-    #[pb(index = 1)]
-    pub belong_to_id: String,
-
-    #[pb(index = 2)]
-    pub name: String,
-
-    #[pb(index = 3)]
-    pub desc: String,
-
-    #[pb(index = 4, one_of)]
-    pub thumbnail: Option<String>,
-
-    #[pb(index = 5)]
-    pub view_type: ViewType,
-}
-
-#[derive(Default, ProtoBuf, Debug, Clone)]
-pub struct CreateViewParams {
-    #[pb(index = 1)]
-    pub belong_to_id: String,
-
-    #[pb(index = 2)]
-    pub name: String,
-
-    #[pb(index = 3)]
-    pub desc: String,
-
-    #[pb(index = 4)]
-    pub thumbnail: String,
-
-    #[pb(index = 5)]
-    pub view_type: ViewType,
-
-    // ViewType::Doc -> Delta string
-    #[pb(index = 6)]
-    pub view_data: String,
-
-    #[pb(index = 7)]
-    pub view_id: String,
-}
-
-impl CreateViewParams {
-    pub fn new(
-        belong_to_id: String,
-        name: String,
-        desc: String,
-        view_type: ViewType,
-        thumbnail: String,
-        view_data: String,
-        view_id: String,
-    ) -> Self {
-        Self {
-            belong_to_id,
-            name,
-            desc,
-            thumbnail,
-            view_type,
-            view_data,
-            view_id,
-        }
-    }
-}
-
-impl TryInto<CreateViewParams> for CreateViewRequest {
-    type Error = ErrorCode;
-
-    fn try_into(self) -> Result<CreateViewParams, Self::Error> {
-        let name = ViewName::parse(self.name)?.0;
-        let belong_to_id = AppIdentify::parse(self.belong_to_id)?.0;
-        let view_data = initial_delta_string();
-        let view_id = uuid::Uuid::new_v4().to_string();
-        let thumbnail = match self.thumbnail {
-            None => "".to_string(),
-            Some(thumbnail) => ViewThumbnail::parse(thumbnail)?.0,
-        };
-
-        Ok(CreateViewParams::new(
-            belong_to_id,
-            name,
-            self.desc,
-            self.view_type,
-            thumbnail,
-            view_data,
-            view_id,
-        ))
-    }
-}
-
-#[derive(PartialEq, ProtoBuf, Default, Debug, Clone)]
-pub struct View {
-    #[pb(index = 1)]
-    pub id: String,
-
-    #[pb(index = 2)]
-    pub belong_to_id: String,
-
-    #[pb(index = 3)]
-    pub name: String,
-
-    #[pb(index = 4)]
-    pub desc: String,
-
-    #[pb(index = 5)]
-    pub view_type: ViewType,
-
-    #[pb(index = 6)]
-    pub version: i64,
-
-    #[pb(index = 7)]
-    pub belongings: RepeatedView,
-
-    #[pb(index = 8)]
-    pub modified_time: i64,
-
-    #[pb(index = 9)]
-    pub create_time: i64,
-}
-
-#[derive(PartialEq, Debug, Default, ProtoBuf, Clone)]
-pub struct RepeatedView {
-    #[pb(index = 1)]
-    pub items: Vec<View>,
-}
-
-impl_def_and_def_mut!(RepeatedView, View);
-
-impl std::convert::From<View> for Trash {
-    fn from(view: View) -> Self {
-        Trash {
-            id: view.id,
-            name: view.name,
-            modified_time: view.modified_time,
-            create_time: view.create_time,
-            ty: TrashType::View,
-        }
-    }
-}

+ 0 - 67
shared-lib/flowy-core-data-model/src/entities/view/view_query.rs

@@ -1,67 +0,0 @@
-use crate::{errors::ErrorCode, parser::view::ViewIdentify};
-use flowy_collaboration::entities::doc::DocumentId;
-use flowy_derive::ProtoBuf;
-use std::convert::TryInto;
-
-#[derive(Default, ProtoBuf)]
-pub struct QueryViewRequest {
-    #[pb(index = 1)]
-    pub view_ids: Vec<String>,
-}
-
-#[derive(Default, ProtoBuf, Clone, Debug)]
-pub struct ViewId {
-    #[pb(index = 1)]
-    pub view_id: String,
-}
-
-impl std::convert::From<String> for ViewId {
-    fn from(view_id: String) -> Self {
-        ViewId { view_id }
-    }
-}
-
-impl std::convert::From<ViewId> for DocumentId {
-    fn from(identifier: ViewId) -> Self {
-        DocumentId {
-            doc_id: identifier.view_id,
-        }
-    }
-}
-
-impl TryInto<ViewId> for QueryViewRequest {
-    type Error = ErrorCode;
-    fn try_into(self) -> Result<ViewId, Self::Error> {
-        debug_assert!(self.view_ids.len() == 1);
-        if self.view_ids.len() != 1 {
-            log::error!("The len of view_ids should be equal to 1");
-            return Err(ErrorCode::ViewIdInvalid);
-        }
-
-        let view_id = self.view_ids.first().unwrap().clone();
-        let view_id = ViewIdentify::parse(view_id)?.0;
-
-        Ok(ViewId { view_id })
-    }
-}
-
-#[derive(Default, ProtoBuf)]
-pub struct RepeatedViewId {
-    #[pb(index = 1)]
-    pub items: Vec<String>,
-}
-
-impl TryInto<RepeatedViewId> for QueryViewRequest {
-    type Error = ErrorCode;
-
-    fn try_into(self) -> Result<RepeatedViewId, Self::Error> {
-        let mut view_ids = vec![];
-        for view_id in self.view_ids {
-            let view_id = ViewIdentify::parse(view_id)?.0;
-
-            view_ids.push(view_id);
-        }
-
-        Ok(RepeatedViewId { items: view_ids })
-    }
-}

+ 0 - 74
shared-lib/flowy-core-data-model/src/entities/workspace/workspace_create.rs

@@ -1,74 +0,0 @@
-use crate::{
-    entities::app::RepeatedApp,
-    errors::*,
-    impl_def_and_def_mut,
-    parser::workspace::{WorkspaceDesc, WorkspaceName},
-};
-use flowy_derive::ProtoBuf;
-use std::convert::TryInto;
-
-#[derive(ProtoBuf, Default)]
-pub struct CreateWorkspaceRequest {
-    #[pb(index = 1)]
-    pub name: String,
-
-    #[pb(index = 2)]
-    pub desc: String,
-}
-
-#[derive(Clone, ProtoBuf, Default, Debug)]
-pub struct CreateWorkspaceParams {
-    #[pb(index = 1)]
-    pub name: String,
-
-    #[pb(index = 2)]
-    pub desc: String,
-}
-
-impl TryInto<CreateWorkspaceParams> for CreateWorkspaceRequest {
-    type Error = ErrorCode;
-
-    fn try_into(self) -> Result<CreateWorkspaceParams, Self::Error> {
-        let name = WorkspaceName::parse(self.name)?;
-        let desc = WorkspaceDesc::parse(self.desc)?;
-
-        Ok(CreateWorkspaceParams {
-            name: name.0,
-            desc: desc.0,
-        })
-    }
-}
-
-#[derive(PartialEq, ProtoBuf, Default, Debug, Clone)]
-pub struct Workspace {
-    #[pb(index = 1)]
-    pub id: String,
-
-    #[pb(index = 2)]
-    pub name: String,
-
-    #[pb(index = 3)]
-    pub desc: String,
-
-    #[pb(index = 4)]
-    pub apps: RepeatedApp,
-
-    #[pb(index = 5)]
-    pub modified_time: i64,
-
-    #[pb(index = 6)]
-    pub create_time: i64,
-}
-
-impl Workspace {
-    pub fn take_apps(&mut self) -> RepeatedApp {
-        std::mem::take(&mut self.apps)
-    }
-}
-#[derive(PartialEq, Debug, Default, ProtoBuf)]
-pub struct RepeatedWorkspace {
-    #[pb(index = 1)]
-    pub items: Vec<Workspace>,
-}
-
-impl_def_and_def_mut!(RepeatedWorkspace, Workspace);

+ 0 - 42
shared-lib/flowy-core-data-model/src/entities/workspace/workspace_query.rs

@@ -1,42 +0,0 @@
-use crate::{errors::*, parser::workspace::WorkspaceIdentify};
-use flowy_derive::ProtoBuf;
-use std::convert::TryInto;
-
-#[derive(Default, ProtoBuf, Clone)]
-pub struct QueryWorkspaceRequest {
-    // return all workspace if workspace_id is None
-    #[pb(index = 1, one_of)]
-    pub workspace_id: Option<String>,
-}
-
-impl QueryWorkspaceRequest {
-    pub fn new(workspace_id: Option<String>) -> Self {
-        Self { workspace_id }
-    }
-}
-
-// Read all workspaces if the workspace_id is None
-#[derive(Clone, ProtoBuf, Default, Debug)]
-pub struct WorkspaceId {
-    #[pb(index = 1, one_of)]
-    pub workspace_id: Option<String>,
-}
-
-impl WorkspaceId {
-    pub fn new(workspace_id: Option<String>) -> Self {
-        Self { workspace_id }
-    }
-}
-
-impl TryInto<WorkspaceId> for QueryWorkspaceRequest {
-    type Error = ErrorCode;
-
-    fn try_into(self) -> Result<WorkspaceId, Self::Error> {
-        let workspace_id = match self.workspace_id {
-            None => None,
-            Some(workspace_id) => Some(WorkspaceIdentify::parse(workspace_id)?.0),
-        };
-
-        Ok(WorkspaceId { workspace_id })
-    }
-}