瀏覽代碼

save rev to db and update rev state to acked if server ack client rev

appflowy 3 年之前
父節點
當前提交
11bc536df8
共有 23 個文件被更改,包括 727 次插入192 次删除
  1. 75 0
      app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart
  2. 12 0
      app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart
  3. 5 3
      app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart
  4. 4 3
      app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart
  5. 41 18
      backend/src/service/doc/edit_doc_context.rs
  6. 11 4
      backend/src/service/doc/ws_handler.rs
  7. 1 1
      rust-lib/flowy-database/migrations/2021-07-22-234458_flowy-editor/up.sql
  8. 2 2
      rust-lib/flowy-database/migrations/2021-09-22-074638_flowy-doc-op/up.sql
  9. 3 3
      rust-lib/flowy-database/src/schema.rs
  10. 1 0
      rust-lib/flowy-derive/src/derive_cache/derive_cache.rs
  11. 12 0
      rust-lib/flowy-document/src/entities/doc/revision.rs
  12. 4 3
      rust-lib/flowy-document/src/entities/ws/ws.rs
  13. 267 26
      rust-lib/flowy-document/src/protobuf/model/revision.rs
  14. 30 25
      rust-lib/flowy-document/src/protobuf/model/ws.rs
  15. 5 0
      rust-lib/flowy-document/src/protobuf/proto/revision.proto
  16. 3 2
      rust-lib/flowy-document/src/protobuf/proto/ws.proto
  17. 6 6
      rust-lib/flowy-document/src/services/doc/doc_controller.rs
  18. 18 15
      rust-lib/flowy-document/src/services/doc/edit_doc_context.rs
  19. 104 34
      rust-lib/flowy-document/src/services/doc/rev_manager.rs
  20. 1 1
      rust-lib/flowy-document/src/services/mod.rs
  21. 79 13
      rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs
  22. 38 28
      rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs
  23. 5 5
      rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs

+ 75 - 0
app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pb.dart

@@ -131,3 +131,78 @@ class Revision extends $pb.GeneratedMessage {
   void clearTy() => clearField(6);
 }
 
+class RevisionRange extends $pb.GeneratedMessage {
+  static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RevisionRange', createEmptyInstance: create)
+    ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId')
+    ..aInt64(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'fromRevId')
+    ..aInt64(3, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'toRevId')
+    ..hasRequiredFields = false
+  ;
+
+  RevisionRange._() : super();
+  factory RevisionRange({
+    $core.String? docId,
+    $fixnum.Int64? fromRevId,
+    $fixnum.Int64? toRevId,
+  }) {
+    final _result = create();
+    if (docId != null) {
+      _result.docId = docId;
+    }
+    if (fromRevId != null) {
+      _result.fromRevId = fromRevId;
+    }
+    if (toRevId != null) {
+      _result.toRevId = toRevId;
+    }
+    return _result;
+  }
+  factory RevisionRange.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
+  factory RevisionRange.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
+  @$core.Deprecated(
+  'Using this can add significant overhead to your binary. '
+  'Use [GeneratedMessageGenericExtensions.deepCopy] instead. '
+  'Will be removed in next major version')
+  RevisionRange clone() => RevisionRange()..mergeFromMessage(this);
+  @$core.Deprecated(
+  'Using this can add significant overhead to your binary. '
+  'Use [GeneratedMessageGenericExtensions.rebuild] instead. '
+  'Will be removed in next major version')
+  RevisionRange copyWith(void Function(RevisionRange) updates) => super.copyWith((message) => updates(message as RevisionRange)) as RevisionRange; // ignore: deprecated_member_use
+  $pb.BuilderInfo get info_ => _i;
+  @$core.pragma('dart2js:noInline')
+  static RevisionRange create() => RevisionRange._();
+  RevisionRange createEmptyInstance() => create();
+  static $pb.PbList<RevisionRange> createRepeated() => $pb.PbList<RevisionRange>();
+  @$core.pragma('dart2js:noInline')
+  static RevisionRange getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<RevisionRange>(create);
+  static RevisionRange? _defaultInstance;
+
+  @$pb.TagNumber(1)
+  $core.String get docId => $_getSZ(0);
+  @$pb.TagNumber(1)
+  set docId($core.String v) { $_setString(0, v); }
+  @$pb.TagNumber(1)
+  $core.bool hasDocId() => $_has(0);
+  @$pb.TagNumber(1)
+  void clearDocId() => clearField(1);
+
+  @$pb.TagNumber(2)
+  $fixnum.Int64 get fromRevId => $_getI64(1);
+  @$pb.TagNumber(2)
+  set fromRevId($fixnum.Int64 v) { $_setInt64(1, v); }
+  @$pb.TagNumber(2)
+  $core.bool hasFromRevId() => $_has(1);
+  @$pb.TagNumber(2)
+  void clearFromRevId() => clearField(2);
+
+  @$pb.TagNumber(3)
+  $fixnum.Int64 get toRevId => $_getI64(2);
+  @$pb.TagNumber(3)
+  set toRevId($fixnum.Int64 v) { $_setInt64(2, v); }
+  @$pb.TagNumber(3)
+  $core.bool hasToRevId() => $_has(2);
+  @$pb.TagNumber(3)
+  void clearToRevId() => clearField(3);
+}
+

+ 12 - 0
app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/revision.pbjson.dart

@@ -34,3 +34,15 @@ const Revision$json = const {
 
 /// Descriptor for `Revision`. Decode as a `google.protobuf.DescriptorProto`.
 final $typed_data.Uint8List revisionDescriptor = $convert.base64Decode('CghSZXZpc2lvbhIeCgtiYXNlX3Jldl9pZBgBIAEoA1IJYmFzZVJldklkEhUKBnJldl9pZBgCIAEoA1IFcmV2SWQSFAoFZGVsdGEYAyABKAxSBWRlbHRhEhAKA21kNRgEIAEoCVIDbWQ1EhUKBmRvY19pZBgFIAEoCVIFZG9jSWQSGAoCdHkYBiABKA4yCC5SZXZUeXBlUgJ0eQ==');
+@$core.Deprecated('Use revisionRangeDescriptor instead')
+const RevisionRange$json = const {
+  '1': 'RevisionRange',
+  '2': const [
+    const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'},
+    const {'1': 'from_rev_id', '3': 2, '4': 1, '5': 3, '10': 'fromRevId'},
+    const {'1': 'to_rev_id', '3': 3, '4': 1, '5': 3, '10': 'toRevId'},
+  ],
+};
+
+/// Descriptor for `RevisionRange`. Decode as a `google.protobuf.DescriptorProto`.
+final $typed_data.Uint8List revisionRangeDescriptor = $convert.base64Decode('Cg1SZXZpc2lvblJhbmdlEhUKBmRvY19pZBgBIAEoCVIFZG9jSWQSHgoLZnJvbV9yZXZfaWQYAiABKANSCWZyb21SZXZJZBIaCgl0b19yZXZfaWQYAyABKANSB3RvUmV2SWQ=');

+ 5 - 3
app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbenum.dart

@@ -11,12 +11,14 @@ import 'package:protobuf/protobuf.dart' as $pb;
 
 class WsDataType extends $pb.ProtobufEnum {
   static const WsDataType Acked = WsDataType._(0, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Acked');
-  static const WsDataType Rev = WsDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Rev');
-  static const WsDataType Conflict = WsDataType._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Conflict');
+  static const WsDataType PushRev = WsDataType._(1, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PushRev');
+  static const WsDataType PullRev = WsDataType._(2, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'PullRev');
+  static const WsDataType Conflict = WsDataType._(3, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'Conflict');
 
   static const $core.List<WsDataType> values = <WsDataType> [
     Acked,
-    Rev,
+    PushRev,
+    PullRev,
     Conflict,
   ];
 

+ 4 - 3
app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/ws.pbjson.dart

@@ -13,13 +13,14 @@ const WsDataType$json = const {
   '1': 'WsDataType',
   '2': const [
     const {'1': 'Acked', '2': 0},
-    const {'1': 'Rev', '2': 1},
-    const {'1': 'Conflict', '2': 2},
+    const {'1': 'PushRev', '2': 1},
+    const {'1': 'PullRev', '2': 2},
+    const {'1': 'Conflict', '2': 3},
   ],
 };
 
 /// Descriptor for `WsDataType`. Decode as a `google.protobuf.EnumDescriptorProto`.
-final $typed_data.Uint8List wsDataTypeDescriptor = $convert.base64Decode('CgpXc0RhdGFUeXBlEgkKBUFja2VkEAASBwoDUmV2EAESDAoIQ29uZmxpY3QQAg==');
+final $typed_data.Uint8List wsDataTypeDescriptor = $convert.base64Decode('CgpXc0RhdGFUeXBlEgkKBUFja2VkEAASCwoHUHVzaFJldhABEgsKB1B1bGxSZXYQAhIMCghDb25mbGljdBAD');
 @$core.Deprecated('Use wsDocumentDataDescriptor instead')
 const WsDocumentData$json = const {
   '1': 'WsDocumentData',

+ 41 - 18
backend/src/service/doc/edit_doc_context.rs

@@ -9,7 +9,7 @@ use bytes::Bytes;
 use dashmap::DashMap;
 use flowy_document::{
     entities::ws::{WsDataType, WsDocumentData},
-    protobuf::{Doc, RevType, Revision, UpdateDocParams},
+    protobuf::{Doc, RevType, Revision, RevisionRange, UpdateDocParams},
     services::doc::Document,
 };
 use flowy_net::errors::{internal_error, ServerError};
@@ -22,6 +22,7 @@ use parking_lot::RwLock;
 use protobuf::Message;
 use sqlx::PgPool;
 use std::{
+    cmp::min,
     convert::TryInto,
     sync::{
         atomic::{AtomicI64, Ordering::SeqCst},
@@ -64,7 +65,7 @@ impl EditDocContext {
         revision: Revision,
     ) -> Result<(), ServerError> {
         let _ = self.verify_md5(&revision)?;
-        // Rest EditUser for each client websocket message to keep the socket available.
+        // Opti: find out another way to keep the user socket available.
         let user = EditUser {
             user: client_data.user.clone(),
             socket: client_data.socket.clone(),
@@ -80,34 +81,40 @@ impl EditDocContext {
 
         let cli_socket = client_data.socket;
         let cur_rev_id = self.rev_id.load(SeqCst);
-        // Transform the revision if client rev_id less than server rev_id. Sending the
-        // prime delta to client.
         if cur_rev_id > revision.rev_id {
+            // The client document is outdated. Transform the client revision delta and then
+            // send the prime delta to the client. Client should compose the this prime
+            // delta.
+
             let (cli_prime, server_prime) = self.transform(&revision.delta).map_err(internal_error)?;
             let _ = self.update_document_delta(server_prime)?;
 
             log::debug!("{} client delta: {}", self.doc_id, cli_prime.to_json());
             let cli_revision = self.mk_revision(revision.rev_id, cli_prime);
-            let ws_cli_revision = mk_rev_ws_message(&self.doc_id, cli_revision);
+            let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision);
             cli_socket.do_send(ws_cli_revision).map_err(internal_error)?;
             Ok(())
         } else if cur_rev_id < revision.rev_id {
             if cur_rev_id != revision.base_rev_id {
-                let missing_rev_range = revision.rev_id - cur_rev_id;
-                // TODO: pull the missing revs from client
+                // The server document is outdated, try to get the missing revision from the
+                // client.
+                cli_socket
+                    .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id))
+                    .map_err(internal_error)?;
             } else {
                 let delta = Delta::from_bytes(&revision.delta).map_err(internal_error)?;
                 let _ = self.update_document_delta(delta)?;
-                cli_socket.do_send(mk_acked_ws_message(&revision));
+                cli_socket
+                    .do_send(mk_acked_ws_message(&revision))
+                    .map_err(internal_error)?;
                 self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id));
-
-                // Opti: save with multiple revisions
                 let _ = self.save_revision(&revision).await?;
             }
 
             Ok(())
         } else {
             log::error!("Client rev_id should not equal to server rev_id");
+            Ok(())
         }
     }
 
@@ -162,29 +169,41 @@ impl EditDocContext {
 
     #[tracing::instrument(level = "debug", skip(self, revision))]
     async fn save_revision(&self, revision: &Revision) -> Result<(), ServerError> {
+        // Opti: save with multiple revisions
         let mut params = UpdateDocParams::new();
         params.set_doc_id(self.doc_id.clone());
         params.set_data(self.document.read().to_json());
         params.set_rev_id(revision.rev_id);
-
         let _ = update_doc(self.pg_pool.get_ref(), params).await?;
-
         Ok(())
     }
 }
 
-fn mk_rev_ws_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor {
+fn mk_push_rev_ws_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor {
     let bytes = revision.write_to_bytes().unwrap();
-
     let data = WsDocumentData {
         id: doc_id.to_string(),
-        ty: WsDataType::Rev,
+        ty: WsDataType::PushRev,
         data: bytes,
     };
+    mk_ws_message(data)
+}
 
-    let msg: WsMessage = data.into();
-    let bytes: Bytes = msg.try_into().unwrap();
-    WsMessageAdaptor(bytes)
+fn mk_pull_rev_ws_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor {
+    let range = RevisionRange {
+        doc_id: doc_id.to_string(),
+        from_rev_id,
+        to_rev_id,
+        ..Default::default()
+    };
+
+    let bytes = range.write_to_bytes().unwrap();
+    let data = WsDocumentData {
+        id: doc_id.to_string(),
+        ty: WsDataType::PullRev,
+        data: bytes,
+    };
+    mk_ws_message(data)
 }
 
 fn mk_acked_ws_message(revision: &Revision) -> WsMessageAdaptor {
@@ -197,6 +216,10 @@ fn mk_acked_ws_message(revision: &Revision) -> WsMessageAdaptor {
         data: wtr,
     };
 
+    mk_ws_message(data)
+}
+
+fn mk_ws_message<T: Into<WsMessage>>(data: T) -> WsMessageAdaptor {
     let msg: WsMessage = data.into();
     let bytes: Bytes = msg.try_into().unwrap();
     WsMessageAdaptor(bytes)

+ 11 - 4
backend/src/service/doc/ws_handler.rs

@@ -7,7 +7,7 @@ use crate::service::{
 use actix_web::web::Data;
 
 use crate::service::ws::WsUser;
-use flowy_document::protobuf::{QueryDocParams, Revision, WsDataType, WsDocumentData};
+use flowy_document::protobuf::{QueryDocParams, Revision, RevisionRange, WsDataType, WsDocumentData};
 use flowy_net::errors::ServerError;
 use parking_lot::{RwLock, RwLockUpgradableReadGuard};
 use protobuf::Message;
@@ -55,8 +55,10 @@ impl EditDocManager {
     async fn handle(&self, client_data: WsClientData) -> Result<(), ServerError> {
         let document_data: WsDocumentData = parse_from_bytes(&client_data.data)?;
         match document_data.ty {
-            WsDataType::Acked => {},
-            WsDataType::Rev => {
+            WsDataType::Acked => {
+                // Do nothing,
+            },
+            WsDataType::PushRev => {
                 let revision: Revision = parse_from_bytes(&document_data.data)?;
                 let edited_doc = self.get_edit_doc(&revision.doc_id).await?;
                 tokio::spawn(async move {
@@ -66,7 +68,12 @@ impl EditDocManager {
                     }
                 });
             },
-            _ => {},
+            WsDataType::PullRev => {
+                // Do nothing
+            },
+            WsDataType::Conflict => {
+                unimplemented!()
+            },
         }
 
         Ok(())

+ 1 - 1
rust-lib/flowy-database/migrations/2021-07-22-234458_flowy-editor/up.sql

@@ -3,5 +3,5 @@ CREATE TABLE doc_table (
     id TEXT NOT NULL PRIMARY KEY,
 --     data BLOB NOT NULL DEFAULT (x''),
     data TEXT NOT NULL DEFAULT '',
-    revision BIGINT NOT NULL DEFAULT 0
+    rev_id BIGINT NOT NULL DEFAULT 0
 );

+ 2 - 2
rust-lib/flowy-database/migrations/2021-09-22-074638_flowy-doc-op/up.sql

@@ -1,10 +1,10 @@
 -- Your SQL goes here
 CREATE TABLE rev_table (
-    doc_id TEXT NOT NULL PRIMARY KEY,
+    id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
+    doc_id TEXT NOT NULL DEFAULT '',
     base_rev_id BIGINT NOT NULL DEFAULT 0,
     rev_id BIGINT NOT NULL DEFAULT 0,
     data BLOB NOT NULL DEFAULT (x''),
-    md5 TEXT NOT NULL DEFAULT '',
     state INTEGER NOT NULL DEFAULT 0,
     ty INTEGER NOT NULL DEFAULT 0
 );

+ 3 - 3
rust-lib/flowy-database/src/schema.rs

@@ -17,17 +17,17 @@ table! {
     doc_table (id) {
         id -> Text,
         data -> Text,
-        revision -> BigInt,
+        rev_id -> BigInt,
     }
 }
 
 table! {
-    rev_table (doc_id) {
+    rev_table (id) {
+        id -> Integer,
         doc_id -> Text,
         base_rev_id -> BigInt,
         rev_id -> BigInt,
         data -> Binary,
-        md5 -> Text,
         state -> Integer,
         ty -> Integer,
     }

+ 1 - 0
rust-lib/flowy-derive/src/derive_cache/derive_cache.rs

@@ -59,6 +59,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory {
         | "DocDelta"
         | "QueryDocParams"
         | "Revision"
+        | "RevisionRange"
         | "WsDocumentData"
         | "DocError"
         | "FFIRequest"

+ 12 - 0
rust-lib/flowy-document/src/entities/doc/revision.rs

@@ -43,3 +43,15 @@ impl Revision {
         }
     }
 }
+
+#[derive(Debug, Clone, Default, ProtoBuf)]
+pub struct RevisionRange {
+    #[pb(index = 1)]
+    pub doc_id: String,
+
+    #[pb(index = 2)]
+    pub from_rev_id: i64,
+
+    #[pb(index = 3)]
+    pub to_rev_id: i64,
+}

+ 4 - 3
rust-lib/flowy-document/src/entities/ws/ws.rs

@@ -7,8 +7,9 @@ use std::convert::TryInto;
 #[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)]
 pub enum WsDataType {
     Acked    = 0,
-    Rev      = 1,
-    Conflict = 2,
+    PushRev  = 1,
+    PullRev  = 2,
+    Conflict = 3,
 }
 
 impl std::default::Default for WsDataType {
@@ -34,7 +35,7 @@ impl std::convert::From<Revision> for WsDocumentData {
         let data = bytes.to_vec();
         Self {
             id,
-            ty: WsDataType::Rev,
+            ty: WsDataType::PushRev,
             data,
         }
     }

+ 267 - 26
rust-lib/flowy-document/src/protobuf/model/revision.rs

@@ -367,6 +367,235 @@ impl ::protobuf::reflect::ProtobufValue for Revision {
     }
 }
 
+#[derive(PartialEq,Clone,Default)]
+pub struct RevisionRange {
+    // message fields
+    pub doc_id: ::std::string::String,
+    pub from_rev_id: i64,
+    pub to_rev_id: i64,
+    // special fields
+    pub unknown_fields: ::protobuf::UnknownFields,
+    pub cached_size: ::protobuf::CachedSize,
+}
+
+impl<'a> ::std::default::Default for &'a RevisionRange {
+    fn default() -> &'a RevisionRange {
+        <RevisionRange as ::protobuf::Message>::default_instance()
+    }
+}
+
+impl RevisionRange {
+    pub fn new() -> RevisionRange {
+        ::std::default::Default::default()
+    }
+
+    // string doc_id = 1;
+
+
+    pub fn get_doc_id(&self) -> &str {
+        &self.doc_id
+    }
+    pub fn clear_doc_id(&mut self) {
+        self.doc_id.clear();
+    }
+
+    // Param is passed by value, moved
+    pub fn set_doc_id(&mut self, v: ::std::string::String) {
+        self.doc_id = v;
+    }
+
+    // Mutable pointer to the field.
+    // If field is not initialized, it is initialized with default value first.
+    pub fn mut_doc_id(&mut self) -> &mut ::std::string::String {
+        &mut self.doc_id
+    }
+
+    // Take field
+    pub fn take_doc_id(&mut self) -> ::std::string::String {
+        ::std::mem::replace(&mut self.doc_id, ::std::string::String::new())
+    }
+
+    // int64 from_rev_id = 2;
+
+
+    pub fn get_from_rev_id(&self) -> i64 {
+        self.from_rev_id
+    }
+    pub fn clear_from_rev_id(&mut self) {
+        self.from_rev_id = 0;
+    }
+
+    // Param is passed by value, moved
+    pub fn set_from_rev_id(&mut self, v: i64) {
+        self.from_rev_id = v;
+    }
+
+    // int64 to_rev_id = 3;
+
+
+    pub fn get_to_rev_id(&self) -> i64 {
+        self.to_rev_id
+    }
+    pub fn clear_to_rev_id(&mut self) {
+        self.to_rev_id = 0;
+    }
+
+    // Param is passed by value, moved
+    pub fn set_to_rev_id(&mut self, v: i64) {
+        self.to_rev_id = v;
+    }
+}
+
+impl ::protobuf::Message for RevisionRange {
+    fn is_initialized(&self) -> bool {
+        true
+    }
+
+    fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+        while !is.eof()? {
+            let (field_number, wire_type) = is.read_tag_unpack()?;
+            match field_number {
+                1 => {
+                    ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.doc_id)?;
+                },
+                2 => {
+                    if wire_type != ::protobuf::wire_format::WireTypeVarint {
+                        return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
+                    }
+                    let tmp = is.read_int64()?;
+                    self.from_rev_id = tmp;
+                },
+                3 => {
+                    if wire_type != ::protobuf::wire_format::WireTypeVarint {
+                        return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
+                    }
+                    let tmp = is.read_int64()?;
+                    self.to_rev_id = tmp;
+                },
+                _ => {
+                    ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
+                },
+            };
+        }
+        ::std::result::Result::Ok(())
+    }
+
+    // Compute sizes of nested messages
+    #[allow(unused_variables)]
+    fn compute_size(&self) -> u32 {
+        let mut my_size = 0;
+        if !self.doc_id.is_empty() {
+            my_size += ::protobuf::rt::string_size(1, &self.doc_id);
+        }
+        if self.from_rev_id != 0 {
+            my_size += ::protobuf::rt::value_size(2, self.from_rev_id, ::protobuf::wire_format::WireTypeVarint);
+        }
+        if self.to_rev_id != 0 {
+            my_size += ::protobuf::rt::value_size(3, self.to_rev_id, ::protobuf::wire_format::WireTypeVarint);
+        }
+        my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
+        self.cached_size.set(my_size);
+        my_size
+    }
+
+    fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+        if !self.doc_id.is_empty() {
+            os.write_string(1, &self.doc_id)?;
+        }
+        if self.from_rev_id != 0 {
+            os.write_int64(2, self.from_rev_id)?;
+        }
+        if self.to_rev_id != 0 {
+            os.write_int64(3, self.to_rev_id)?;
+        }
+        os.write_unknown_fields(self.get_unknown_fields())?;
+        ::std::result::Result::Ok(())
+    }
+
+    fn get_cached_size(&self) -> u32 {
+        self.cached_size.get()
+    }
+
+    fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
+        &self.unknown_fields
+    }
+
+    fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
+        &mut self.unknown_fields
+    }
+
+    fn as_any(&self) -> &dyn (::std::any::Any) {
+        self as &dyn (::std::any::Any)
+    }
+    fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
+        self as &mut dyn (::std::any::Any)
+    }
+    fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
+        self
+    }
+
+    fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
+        Self::descriptor_static()
+    }
+
+    fn new() -> RevisionRange {
+        RevisionRange::new()
+    }
+
+    fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
+        static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
+        descriptor.get(|| {
+            let mut fields = ::std::vec::Vec::new();
+            fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
+                "doc_id",
+                |m: &RevisionRange| { &m.doc_id },
+                |m: &mut RevisionRange| { &mut m.doc_id },
+            ));
+            fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>(
+                "from_rev_id",
+                |m: &RevisionRange| { &m.from_rev_id },
+                |m: &mut RevisionRange| { &mut m.from_rev_id },
+            ));
+            fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>(
+                "to_rev_id",
+                |m: &RevisionRange| { &m.to_rev_id },
+                |m: &mut RevisionRange| { &mut m.to_rev_id },
+            ));
+            ::protobuf::reflect::MessageDescriptor::new_pb_name::<RevisionRange>(
+                "RevisionRange",
+                fields,
+                file_descriptor_proto()
+            )
+        })
+    }
+
+    fn default_instance() -> &'static RevisionRange {
+        static instance: ::protobuf::rt::LazyV2<RevisionRange> = ::protobuf::rt::LazyV2::INIT;
+        instance.get(RevisionRange::new)
+    }
+}
+
+impl ::protobuf::Clear for RevisionRange {
+    fn clear(&mut self) {
+        self.doc_id.clear();
+        self.from_rev_id = 0;
+        self.to_rev_id = 0;
+        self.unknown_fields.clear();
+    }
+}
+
+impl ::std::fmt::Debug for RevisionRange {
+    fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
+        ::protobuf::text_format::fmt(self, f)
+    }
+}
+
+impl ::protobuf::reflect::ProtobufValue for RevisionRange {
+    fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
+        ::protobuf::reflect::ReflectValueRef::Message(self)
+    }
+}
+
 #[derive(Clone,PartialEq,Eq,Debug,Hash)]
 pub enum RevType {
     Local = 0,
@@ -422,32 +651,44 @@ static file_descriptor_proto_data: &'static [u8] = b"\
     \x18\x01\x20\x01(\x03R\tbaseRevId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\
     \x03R\x05revId\x12\x14\n\x05delta\x18\x03\x20\x01(\x0cR\x05delta\x12\x10\
     \n\x03md5\x18\x04\x20\x01(\tR\x03md5\x12\x15\n\x06doc_id\x18\x05\x20\x01\
-    (\tR\x05docId\x12\x18\n\x02ty\x18\x06\x20\x01(\x0e2\x08.RevTypeR\x02ty*\
-    \x20\n\x07RevType\x12\t\n\x05Local\x10\0\x12\n\n\x06Remote\x10\x01J\xde\
-    \x03\n\x06\x12\x04\0\0\r\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\
-    \x04\0\x12\x04\x02\0\t\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x10\n\x0b\
-    \n\x04\x04\0\x02\0\x12\x03\x03\x04\x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\
-    \x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\n\x15\n\x0c\n\x05\
-    \x04\0\x02\0\x03\x12\x03\x03\x18\x19\n\x0b\n\x04\x04\0\x02\x01\x12\x03\
-    \x04\x04\x15\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\x04\t\n\x0c\n\x05\
-    \x04\0\x02\x01\x01\x12\x03\x04\n\x10\n\x0c\n\x05\x04\0\x02\x01\x03\x12\
-    \x03\x04\x13\x14\n\x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x14\n\x0c\n\
-    \x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\
-    \x12\x03\x05\n\x0f\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x12\x13\n\
-    \x0b\n\x04\x04\0\x02\x03\x12\x03\x06\x04\x13\n\x0c\n\x05\x04\0\x02\x03\
-    \x05\x12\x03\x06\x04\n\n\x0c\n\x05\x04\0\x02\x03\x01\x12\x03\x06\x0b\x0e\
-    \n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\x06\x11\x12\n\x0b\n\x04\x04\0\x02\
-    \x04\x12\x03\x07\x04\x16\n\x0c\n\x05\x04\0\x02\x04\x05\x12\x03\x07\x04\n\
-    \n\x0c\n\x05\x04\0\x02\x04\x01\x12\x03\x07\x0b\x11\n\x0c\n\x05\x04\0\x02\
-    \x04\x03\x12\x03\x07\x14\x15\n\x0b\n\x04\x04\0\x02\x05\x12\x03\x08\x04\
-    \x13\n\x0c\n\x05\x04\0\x02\x05\x06\x12\x03\x08\x04\x0b\n\x0c\n\x05\x04\0\
-    \x02\x05\x01\x12\x03\x08\x0c\x0e\n\x0c\n\x05\x04\0\x02\x05\x03\x12\x03\
-    \x08\x11\x12\n\n\n\x02\x05\0\x12\x04\n\0\r\x01\n\n\n\x03\x05\0\x01\x12\
-    \x03\n\x05\x0c\n\x0b\n\x04\x05\0\x02\0\x12\x03\x0b\x04\x0e\n\x0c\n\x05\
-    \x05\0\x02\0\x01\x12\x03\x0b\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\
-    \x0b\x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x0c\x04\x0f\n\x0c\n\x05\x05\
-    \0\x02\x01\x01\x12\x03\x0c\x04\n\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\
-    \x0c\r\x0eb\x06proto3\
+    (\tR\x05docId\x12\x18\n\x02ty\x18\x06\x20\x01(\x0e2\x08.RevTypeR\x02ty\"\
+    b\n\rRevisionRange\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\
+    \x1e\n\x0bfrom_rev_id\x18\x02\x20\x01(\x03R\tfromRevId\x12\x1a\n\tto_rev\
+    _id\x18\x03\x20\x01(\x03R\x07toRevId*\x20\n\x07RevType\x12\t\n\x05Local\
+    \x10\0\x12\n\n\x06Remote\x10\x01J\x9b\x05\n\x06\x12\x04\0\0\x12\x01\n\
+    \x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\t\x01\n\n\n\
+    \x03\x04\0\x01\x12\x03\x02\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\
+    \x04\x1a\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\
+    \x02\0\x01\x12\x03\x03\n\x15\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x18\
+    \x19\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x15\n\x0c\n\x05\x04\0\x02\
+    \x01\x05\x12\x03\x04\x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\n\
+    \x10\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x13\x14\n\x0b\n\x04\x04\0\
+    \x02\x02\x12\x03\x05\x04\x14\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\
+    \x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0f\n\x0c\n\x05\x04\0\
+    \x02\x02\x03\x12\x03\x05\x12\x13\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x06\
+    \x04\x13\n\x0c\n\x05\x04\0\x02\x03\x05\x12\x03\x06\x04\n\n\x0c\n\x05\x04\
+    \0\x02\x03\x01\x12\x03\x06\x0b\x0e\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\
+    \x06\x11\x12\n\x0b\n\x04\x04\0\x02\x04\x12\x03\x07\x04\x16\n\x0c\n\x05\
+    \x04\0\x02\x04\x05\x12\x03\x07\x04\n\n\x0c\n\x05\x04\0\x02\x04\x01\x12\
+    \x03\x07\x0b\x11\n\x0c\n\x05\x04\0\x02\x04\x03\x12\x03\x07\x14\x15\n\x0b\
+    \n\x04\x04\0\x02\x05\x12\x03\x08\x04\x13\n\x0c\n\x05\x04\0\x02\x05\x06\
+    \x12\x03\x08\x04\x0b\n\x0c\n\x05\x04\0\x02\x05\x01\x12\x03\x08\x0c\x0e\n\
+    \x0c\n\x05\x04\0\x02\x05\x03\x12\x03\x08\x11\x12\n\n\n\x02\x04\x01\x12\
+    \x04\n\0\x0e\x01\n\n\n\x03\x04\x01\x01\x12\x03\n\x08\x15\n\x0b\n\x04\x04\
+    \x01\x02\0\x12\x03\x0b\x04\x16\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x0b\
+    \x04\n\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x0b\x0b\x11\n\x0c\n\x05\x04\
+    \x01\x02\0\x03\x12\x03\x0b\x14\x15\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\
+    \x0c\x04\x1a\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\x0c\x04\t\n\x0c\n\
+    \x05\x04\x01\x02\x01\x01\x12\x03\x0c\n\x15\n\x0c\n\x05\x04\x01\x02\x01\
+    \x03\x12\x03\x0c\x18\x19\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\r\x04\x18\n\
+    \x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\r\x04\t\n\x0c\n\x05\x04\x01\x02\
+    \x02\x01\x12\x03\r\n\x13\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\r\x16\
+    \x17\n\n\n\x02\x05\0\x12\x04\x0f\0\x12\x01\n\n\n\x03\x05\0\x01\x12\x03\
+    \x0f\x05\x0c\n\x0b\n\x04\x05\0\x02\0\x12\x03\x10\x04\x0e\n\x0c\n\x05\x05\
+    \0\x02\0\x01\x12\x03\x10\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x10\
+    \x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x11\x04\x0f\n\x0c\n\x05\x05\0\
+    \x02\x01\x01\x12\x03\x11\x04\n\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x11\
+    \r\x0eb\x06proto3\
 ";
 
 static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

+ 30 - 25
rust-lib/flowy-document/src/protobuf/model/ws.rs

@@ -258,8 +258,9 @@ impl ::protobuf::reflect::ProtobufValue for WsDocumentData {
 #[derive(Clone,PartialEq,Eq,Debug,Hash)]
 pub enum WsDataType {
     Acked = 0,
-    Rev = 1,
-    Conflict = 2,
+    PushRev = 1,
+    PullRev = 2,
+    Conflict = 3,
 }
 
 impl ::protobuf::ProtobufEnum for WsDataType {
@@ -270,8 +271,9 @@ impl ::protobuf::ProtobufEnum for WsDataType {
     fn from_i32(value: i32) -> ::std::option::Option<WsDataType> {
         match value {
             0 => ::std::option::Option::Some(WsDataType::Acked),
-            1 => ::std::option::Option::Some(WsDataType::Rev),
-            2 => ::std::option::Option::Some(WsDataType::Conflict),
+            1 => ::std::option::Option::Some(WsDataType::PushRev),
+            2 => ::std::option::Option::Some(WsDataType::PullRev),
+            3 => ::std::option::Option::Some(WsDataType::Conflict),
             _ => ::std::option::Option::None
         }
     }
@@ -279,7 +281,8 @@ impl ::protobuf::ProtobufEnum for WsDataType {
     fn values() -> &'static [Self] {
         static values: &'static [WsDataType] = &[
             WsDataType::Acked,
-            WsDataType::Rev,
+            WsDataType::PushRev,
+            WsDataType::PullRev,
             WsDataType::Conflict,
         ];
         values
@@ -311,26 +314,28 @@ impl ::protobuf::reflect::ProtobufValue for WsDataType {
 static file_descriptor_proto_data: &'static [u8] = b"\
     \n\x08ws.proto\"Q\n\x0eWsDocumentData\x12\x0e\n\x02id\x18\x01\x20\x01(\t\
     R\x02id\x12\x1b\n\x02ty\x18\x02\x20\x01(\x0e2\x0b.WsDataTypeR\x02ty\x12\
-    \x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*.\n\nWsDataType\x12\t\n\x05\
-    Acked\x10\0\x12\x07\n\x03Rev\x10\x01\x12\x0c\n\x08Conflict\x10\x02J\xe2\
-    \x02\n\x06\x12\x04\0\0\x0b\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\
-    \x04\0\x12\x04\x02\0\x06\x01\n\n\n\x03\x04\0\x01\x12\x03\x02\x08\x16\n\
-    \x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x12\n\x0c\n\x05\x04\0\x02\0\x05\
-    \x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\x0b\r\n\x0c\n\
-    \x05\x04\0\x02\0\x03\x12\x03\x03\x10\x11\n\x0b\n\x04\x04\0\x02\x01\x12\
-    \x03\x04\x04\x16\n\x0c\n\x05\x04\0\x02\x01\x06\x12\x03\x04\x04\x0e\n\x0c\
-    \n\x05\x04\0\x02\x01\x01\x12\x03\x04\x0f\x11\n\x0c\n\x05\x04\0\x02\x01\
-    \x03\x12\x03\x04\x14\x15\n\x0b\n\x04\x04\0\x02\x02\x12\x03\x05\x04\x13\n\
-    \x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\
-    \x01\x12\x03\x05\n\x0e\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\x05\x11\x12\
-    \n\n\n\x02\x05\0\x12\x04\x07\0\x0b\x01\n\n\n\x03\x05\0\x01\x12\x03\x07\
-    \x05\x0f\n\x0b\n\x04\x05\0\x02\0\x12\x03\x08\x04\x0e\n\x0c\n\x05\x05\0\
-    \x02\0\x01\x12\x03\x08\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x08\x0c\
-    \r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\t\x04\x0c\n\x0c\n\x05\x05\0\x02\x01\
-    \x01\x12\x03\t\x04\x07\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\t\n\x0b\n\
-    \x0b\n\x04\x05\0\x02\x02\x12\x03\n\x04\x11\n\x0c\n\x05\x05\0\x02\x02\x01\
-    \x12\x03\n\x04\x0c\n\x0c\n\x05\x05\0\x02\x02\x02\x12\x03\n\x0f\x10b\x06p\
-    roto3\
+    \x12\n\x04data\x18\x03\x20\x01(\x0cR\x04data*?\n\nWsDataType\x12\t\n\x05\
+    Acked\x10\0\x12\x0b\n\x07PushRev\x10\x01\x12\x0b\n\x07PullRev\x10\x02\
+    \x12\x0c\n\x08Conflict\x10\x03J\x8b\x03\n\x06\x12\x04\0\0\x0c\x01\n\x08\
+    \n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x06\x01\n\n\n\
+    \x03\x04\0\x01\x12\x03\x02\x08\x16\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\
+    \x04\x12\n\x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\
+    \x02\0\x01\x12\x03\x03\x0b\r\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x10\
+    \x11\n\x0b\n\x04\x04\0\x02\x01\x12\x03\x04\x04\x16\n\x0c\n\x05\x04\0\x02\
+    \x01\x06\x12\x03\x04\x04\x0e\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\
+    \x0f\x11\n\x0c\n\x05\x04\0\x02\x01\x03\x12\x03\x04\x14\x15\n\x0b\n\x04\
+    \x04\0\x02\x02\x12\x03\x05\x04\x13\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\
+    \x05\x04\t\n\x0c\n\x05\x04\0\x02\x02\x01\x12\x03\x05\n\x0e\n\x0c\n\x05\
+    \x04\0\x02\x02\x03\x12\x03\x05\x11\x12\n\n\n\x02\x05\0\x12\x04\x07\0\x0c\
+    \x01\n\n\n\x03\x05\0\x01\x12\x03\x07\x05\x0f\n\x0b\n\x04\x05\0\x02\0\x12\
+    \x03\x08\x04\x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x08\x04\t\n\x0c\n\
+    \x05\x05\0\x02\0\x02\x12\x03\x08\x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\
+    \t\x04\x10\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\t\x04\x0b\n\x0c\n\x05\
+    \x05\0\x02\x01\x02\x12\x03\t\x0e\x0f\n\x0b\n\x04\x05\0\x02\x02\x12\x03\n\
+    \x04\x10\n\x0c\n\x05\x05\0\x02\x02\x01\x12\x03\n\x04\x0b\n\x0c\n\x05\x05\
+    \0\x02\x02\x02\x12\x03\n\x0e\x0f\n\x0b\n\x04\x05\0\x02\x03\x12\x03\x0b\
+    \x04\x11\n\x0c\n\x05\x05\0\x02\x03\x01\x12\x03\x0b\x04\x0c\n\x0c\n\x05\
+    \x05\0\x02\x03\x02\x12\x03\x0b\x0f\x10b\x06proto3\
 ";
 
 static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

+ 5 - 0
rust-lib/flowy-document/src/protobuf/proto/revision.proto

@@ -8,6 +8,11 @@ message Revision {
     string doc_id = 5;
     RevType ty = 6;
 }
+message RevisionRange {
+    string doc_id = 1;
+    int64 from_rev_id = 2;
+    int64 to_rev_id = 3;
+}
 enum RevType {
     Local = 0;
     Remote = 1;

+ 3 - 2
rust-lib/flowy-document/src/protobuf/proto/ws.proto

@@ -7,6 +7,7 @@ message WsDocumentData {
 }
 enum WsDataType {
     Acked = 0;
-    Rev = 1;
-    Conflict = 2;
+    PushRev = 1;
+    PullRev = 2;
+    Conflict = 3;
 }

+ 6 - 6
rust-lib/flowy-document/src/services/doc/doc_controller.rs

@@ -115,18 +115,18 @@ impl DocController {
     ) -> Result<Arc<EditDocContext>, DocError> {
         // Opti: require upgradable_read lock and then upgrade to write lock using
         // RwLockUpgradableReadGuard::upgrade(xx) of ws
-        let delta = self.read_doc(doc_id, pool.clone()).await?;
+        let doc = self.read_doc(doc_id, pool.clone()).await?;
         let ws_sender = self.ws.read().sender();
-        let edit_ctx = Arc::new(EditDocContext::new(&doc_id, delta, pool, ws_sender).await?);
-        self.ws.write().register_handler(&doc_id, edit_ctx.clone());
+        let edit_ctx = Arc::new(EditDocContext::new(doc, pool, ws_sender).await?);
+        self.ws.write().register_handler(doc_id, edit_ctx.clone());
         self.cache.set(edit_ctx.clone());
         Ok(edit_ctx)
     }
 
     #[tracing::instrument(level = "debug", skip(self, pool), err)]
-    async fn read_doc(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<Delta, DocError> {
+    async fn read_doc(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<Doc, DocError> {
         match self.doc_sql.read_doc_table(doc_id, pool.clone()) {
-            Ok(doc_table) => Ok(Delta::from_bytes(doc_table.data)?),
+            Ok(doc_table) => Ok(doc_table.into()),
             Err(error) => {
                 if error.is_record_not_found() {
                     let token = self.user.token()?;
@@ -138,7 +138,7 @@ impl DocController {
                         Some(doc) => {
                             let conn = &*pool.get().map_err(internal_error)?;
                             let _ = self.doc_sql.create_doc_table(doc.clone().into(), conn)?;
-                            Ok(Delta::from_bytes(doc.data)?)
+                            Ok(doc)
                         },
                     }
                 } else {

+ 18 - 15
rust-lib/flowy-document/src/services/doc/edit_doc_context.rs

@@ -13,7 +13,7 @@ use crate::{
 use bytes::Bytes;
 
 use crate::{
-    entities::doc::RevType,
+    entities::doc::{RevType, RevisionRange},
     services::ws::WsDocumentSender,
     sql_tables::{doc::DocTableSql, DocTableChangeset},
 };
@@ -33,16 +33,15 @@ pub(crate) struct EditDocContext {
 
 impl EditDocContext {
     pub(crate) async fn new(
-        doc_id: &str,
-        delta: Delta,
+        doc: Doc,
         pool: Arc<ConnectionPool>,
         ws_sender: Arc<dyn WsDocumentSender>,
     ) -> Result<Self, DocError> {
-        let doc_id = doc_id.to_owned();
-        let rev_manager = Arc::new(RevisionManager::new(&doc_id, 1, pool.clone(), ws_sender));
+        let delta = Delta::from_bytes(doc.data)?;
+        let rev_manager = Arc::new(RevisionManager::new(&doc.id, doc.rev_id, pool.clone(), ws_sender));
         let document = Arc::new(RwLock::new(Document::from_delta(delta)));
         let edit_context = Self {
-            doc_id,
+            doc_id: doc.id,
             document,
             rev_manager,
             pool,
@@ -71,7 +70,7 @@ impl EditDocContext {
         );
 
         let _ = self.update_document(&revision)?;
-        self.rev_manager.add_revision(revision);
+        let _ = self.rev_manager.add_revision(revision)?;
         Ok(())
     }
 
@@ -83,12 +82,12 @@ impl EditDocContext {
         let changeset = DocTableChangeset {
             id: self.doc_id.clone(),
             data,
-            revision: revision.rev_id,
+            rev_id: revision.rev_id,
         };
 
         let sql = DocTableSql {};
         let conn = self.pool.get().map_err(internal_error)?;
-        sql.update_doc_table(changeset, &*conn);
+        let _ = sql.update_doc_table(changeset, &*conn)?;
         Ok(())
     }
 
@@ -122,18 +121,22 @@ impl EditDocContext {
 impl WsDocumentHandler for EditDocContext {
     fn receive(&self, doc_data: WsDocumentData) {
         let f = |doc_data: WsDocumentData| {
+            let bytes = Bytes::from(doc_data.data);
             match doc_data.ty {
-                WsDataType::Rev => {
-                    let bytes = Bytes::from(doc_data.data);
+                WsDataType::PushRev => {
                     let revision = Revision::try_from(bytes)?;
-                    self.rev_manager.add_revision(revision);
+                    let _ = self.rev_manager.add_revision(revision)?;
                     let _ = self.compose_remote_delta()?;
                 },
+                WsDataType::PullRev => {
+                    let range = RevisionRange::try_from(bytes)?;
+                    let _ = self.rev_manager.send_rev_with_range(range)?;
+                },
                 WsDataType::Acked => {
-                    let rev_id = bytes_to_rev_id(doc_data.data)?;
-                    self.rev_manager.remove(rev_id);
+                    let rev_id = bytes_to_rev_id(bytes.to_vec())?;
+                    let _ = self.rev_manager.ack(rev_id);
                 },
-                _ => {},
+                WsDataType::Conflict => {},
             }
             Result::<(), DocError>::Ok(())
         };

+ 104 - 34
rust-lib/flowy-document/src/services/doc/rev_manager.rs

@@ -1,21 +1,20 @@
 use crate::{
-    entities::doc::{RevType, Revision},
+    entities::doc::{RevType, Revision, RevisionRange},
     errors::{internal_error, DocError},
     services::{
         util::RevIdCounter,
         ws::{WsDocumentHandler, WsDocumentSender},
     },
-    sql_tables::{OpTableSql, RevTable},
+    sql_tables::{OpTableSql, RevChangeset, RevState, RevTable},
 };
-
+use dashmap::{DashMap, DashSet};
 use flowy_database::ConnectionPool;
-
-use parking_lot::RwLock;
+use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock};
 use std::{
-    collections::{BTreeMap, VecDeque},
+    collections::{HashMap, VecDeque},
     sync::Arc,
 };
-use tokio::sync::{futures::Notified, Notify};
+use tokio::{task::JoinHandle, time::Duration};
 
 pub struct RevisionManager {
     doc_id: String,
@@ -23,26 +22,29 @@ pub struct RevisionManager {
     pool: Arc<ConnectionPool>,
     rev_id_counter: RevIdCounter,
     ws_sender: Arc<dyn WsDocumentSender>,
-    local_rev_cache: Arc<RwLock<BTreeMap<i64, Revision>>>,
+    rev_cache: Arc<RwLock<HashMap<i64, Revision>>>,
+    ack_rev_cache: Arc<DashSet<i64>>,
     remote_rev_cache: RwLock<VecDeque<Revision>>,
-    notify: Notify,
+    save_operation: RwLock<Option<JoinHandle<()>>>,
 }
 
 impl RevisionManager {
     pub fn new(doc_id: &str, rev_id: i64, pool: Arc<ConnectionPool>, ws_sender: Arc<dyn WsDocumentSender>) -> Self {
         let op_sql = Arc::new(OpTableSql {});
         let rev_id_counter = RevIdCounter::new(rev_id);
-        let local_rev_cache = Arc::new(RwLock::new(BTreeMap::new()));
+        let rev_cache = Arc::new(RwLock::new(HashMap::new()));
         let remote_rev_cache = RwLock::new(VecDeque::new());
+        let ack_rev_cache = Arc::new(DashSet::new());
         Self {
             doc_id: doc_id.to_owned(),
             op_sql,
             pool,
             rev_id_counter,
             ws_sender,
-            local_rev_cache,
+            rev_cache,
+            ack_rev_cache,
             remote_rev_cache,
-            notify: Notify::new(),
+            save_operation: RwLock::new(None),
         }
     }
 
@@ -63,34 +65,30 @@ impl RevisionManager {
 
     #[tracing::instrument(level = "debug", skip(self, revision))]
     pub fn add_revision(&self, revision: Revision) -> Result<(), DocError> {
+        self.rev_cache.write().insert(revision.rev_id, revision.clone());
+        self.save_revisions();
         match revision.ty {
-            RevType::Local => {
-                self.local_rev_cache.write().insert(revision.rev_id, revision.clone());
-                // self.save_revision(revision.clone());
-                match self.ws_sender.send(revision.into()) {
-                    Ok(_) => {},
-                    Err(e) => {
-                        log::error!("Send delta failed: {:?}", e);
-                    },
-                }
+            RevType::Local => match self.ws_sender.send(revision.into()) {
+                Ok(_) => {},
+                Err(e) => {
+                    log::error!("Send delta failed: {:?}", e);
+                },
             },
             RevType::Remote => {
                 self.remote_rev_cache.write().push_back(revision);
-                self.notify.notify_waiters();
             },
         }
 
         Ok(())
     }
 
-    pub fn remove(&self, rev_id: i64) -> Result<(), DocError> {
-        self.local_rev_cache.write().remove(&rev_id);
-        // self.delete_revision(rev_id);
+    pub fn ack(&self, rev_id: i64) -> Result<(), DocError> {
+        log::debug!("Receive {} acked", rev_id);
+        self.ack_rev_cache.insert(rev_id);
+        self.update_revisions();
         Ok(())
     }
 
-    pub fn rev_notified(&self) -> Notified { self.notify.notified() }
-
     pub fn next_rev_id(&self) -> (i64, i64) {
         let cur = self.rev_id_counter.value();
         let next = self.rev_id_counter.next();
@@ -99,31 +97,103 @@ impl RevisionManager {
 
     pub fn rev_id(&self) -> i64 { self.rev_id_counter.value() }
 
-    fn save_revision(&self, revision: Revision) {
+    pub fn send_rev_with_range(&self, range: RevisionRange) -> Result<(), DocError> {
+        debug_assert!(&range.doc_id == &self.doc_id);
+
+        unimplemented!()
+    }
+
+    fn save_revisions(&self) {
         let op_sql = self.op_sql.clone();
         let pool = self.pool.clone();
-        tokio::spawn(async move {
+        let mut write_guard = self.save_operation.write();
+        if let Some(handler) = write_guard.take() {
+            handler.abort();
+        }
+
+        let rev_cache = self.rev_cache.clone();
+        let ack_rev_cache = self.ack_rev_cache.clone();
+        let ids = self.rev_cache.read().keys().map(|v| v.clone()).collect::<Vec<i64>>();
+        *write_guard = Some(tokio::spawn(async move {
+            tokio::time::sleep(Duration::from_millis(300)).await;
+
+            let revisions = rev_cache
+                .read()
+                .values()
+                .map(|v| {
+                    let state = match ack_rev_cache.contains(&v.rev_id) {
+                        true => RevState::Acked,
+                        false => RevState::Local,
+                    };
+                    (v.clone(), state)
+                })
+                .collect::<Vec<(Revision, RevState)>>();
+
+            let mut rev_cache_write = rev_cache.write();
             let conn = &*pool.get().map_err(internal_error).unwrap();
             let result = conn.immediate_transaction::<_, DocError, _>(|| {
-                let op_table: RevTable = revision.into();
-                let _ = op_sql.create_op_table(op_table, conn).unwrap();
+                let _ = op_sql.create_rev_table(revisions, conn).unwrap();
                 Ok(())
             });
 
             match result {
-                Ok(_) => {},
+                Ok(_) => rev_cache_write.retain(|k, _| !ids.contains(k)),
                 Err(e) => log::error!("Save revision failed: {:?}", e),
             }
-        });
+        }));
+    }
+
+    fn update_revisions(&self) {
+        match self.rev_cache.try_read_for(Duration::from_millis(300)) {
+            None => log::warn!("try read rev_cache failed"),
+            Some(read_guard) => {
+                let rev_ids = self
+                    .ack_rev_cache
+                    .iter()
+                    .flat_map(|k| match read_guard.contains_key(&k) {
+                        true => None,
+                        false => Some(k.clone()),
+                    })
+                    .collect::<Vec<i64>>();
+
+                log::debug!("Try to update {:?} state", rev_ids);
+                if rev_ids.is_empty() {
+                    return;
+                }
+
+                let conn = &*self.pool.get().map_err(internal_error).unwrap();
+                let result = conn.immediate_transaction::<_, DocError, _>(|| {
+                    for rev_id in &rev_ids {
+                        let changeset = RevChangeset {
+                            doc_id: self.doc_id.clone(),
+                            rev_id: rev_id.clone(),
+                            state: RevState::Acked,
+                        };
+                        let _ = self.op_sql.update_rev_table(changeset, conn)?;
+                    }
+                    Ok(())
+                });
+
+                match result {
+                    Ok(_) => {
+                        rev_ids.iter().for_each(|rev_id| {
+                            self.ack_rev_cache.remove(rev_id);
+                        });
+                    },
+                    Err(e) => log::error!("Save revision failed: {:?}", e),
+                }
+            },
+        }
     }
 
     fn delete_revision(&self, rev_id: i64) {
         let op_sql = self.op_sql.clone();
         let pool = self.pool.clone();
+        let doc_id = self.doc_id.clone();
         tokio::spawn(async move {
             let conn = &*pool.get().map_err(internal_error).unwrap();
             let result = conn.immediate_transaction::<_, DocError, _>(|| {
-                let _ = op_sql.delete_op_table(rev_id, conn)?;
+                let _ = op_sql.delete_rev_table(&doc_id, rev_id, conn)?;
                 Ok(())
             });
 

+ 1 - 1
rust-lib/flowy-document/src/services/mod.rs

@@ -1,5 +1,5 @@
 mod cache;
 pub mod doc;
 pub mod server;
-mod util;
+pub(crate) mod util;
 pub mod ws;

+ 79 - 13
rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs

@@ -1,35 +1,101 @@
 use crate::{
+    entities::doc::{RevType, Revision},
     errors::DocError,
-    sql_tables::doc::{RevChangeset, RevTable},
+    sql_tables::{doc::RevTable, RevChangeset, RevState, RevTableType},
 };
+use diesel::{insert_into, select, update};
 use flowy_database::{
     prelude::*,
-    schema::{rev_table, rev_table::dsl},
+    schema::{
+        rev_table,
+        rev_table::{columns::*, dsl, dsl::doc_id},
+    },
     SqliteConnection,
 };
 
 pub struct OpTableSql {}
 
 impl OpTableSql {
-    pub(crate) fn create_op_table(&self, op_table: RevTable, conn: &SqliteConnection) -> Result<(), DocError> {
-        let _ = diesel::insert_into(rev_table::table).values(op_table).execute(conn)?;
+    pub(crate) fn create_rev_table(
+        &self,
+        revisions: Vec<(Revision, RevState)>,
+        conn: &SqliteConnection,
+    ) -> Result<(), DocError> {
+        // Batch insert: https://diesel.rs/guides/all-about-inserts.html
+        let records = revisions
+            .into_iter()
+            .map(|(revision, new_state)| {
+                log::debug!("Set {} to {:?}", revision.rev_id, new_state);
+                let rev_ty: RevTableType = revision.ty.into();
+                (
+                    doc_id.eq(revision.doc_id),
+                    base_rev_id.eq(revision.base_rev_id),
+                    rev_id.eq(revision.rev_id),
+                    data.eq(revision.delta),
+                    state.eq(new_state),
+                    ty.eq(rev_ty),
+                )
+            })
+            .collect::<Vec<_>>();
+
+        let _ = insert_into(dsl::rev_table).values(&records).execute(conn)?;
         Ok(())
     }
 
-    pub(crate) fn update_op_table(&self, changeset: RevChangeset, conn: &SqliteConnection) -> Result<(), DocError> {
-        let filter = dsl::rev_table.filter(rev_table::dsl::rev_id.eq(changeset.rev_id));
-        let affected_row = diesel::update(filter).set(changeset).execute(conn)?;
-        debug_assert_eq!(affected_row, 1);
+    pub(crate) fn update_rev_table(&self, changeset: RevChangeset, conn: &SqliteConnection) -> Result<(), DocError> {
+        let filter = dsl::rev_table
+            .filter(rev_id.eq(changeset.rev_id))
+            .filter(doc_id.eq(changeset.doc_id));
+        let _ = update(filter).set(state.eq(changeset.state)).execute(conn)?;
+        log::debug!("Set {} to {:?}", changeset.rev_id, changeset.state);
         Ok(())
     }
 
-    pub(crate) fn read_op_table(&self, conn: &SqliteConnection) -> Result<Vec<RevTable>, DocError> {
-        let ops = dsl::rev_table.load::<RevTable>(conn)?;
-        Ok(ops)
+    pub(crate) fn read_rev_table(
+        &self,
+        doc_id_s: &str,
+        rev_id_s: i64,
+        conn: &SqliteConnection,
+    ) -> Result<Vec<Revision>, DocError> {
+        let rev_tables: Vec<RevTable> = dsl::rev_table
+            .filter(rev_id.eq(rev_id_s))
+            .filter(doc_id.eq(doc_id_s))
+            .load::<RevTable>(conn)?;
+
+        let revisions = rev_tables
+            .into_iter()
+            .map(|table| table.into())
+            .collect::<Vec<Revision>>();
+        Ok(revisions)
+    }
+
+    pub(crate) fn read_revs_table(
+        &self,
+        doc_id_s: &str,
+        from_rev_id: i64,
+        to_rev_id: i64,
+        conn: &SqliteConnection,
+    ) -> Result<Vec<Revision>, DocError> {
+        let rev_tables = dsl::rev_table
+            .filter(rev_id.ge(from_rev_id))
+            .filter(rev_id.lt(to_rev_id))
+            .filter(doc_id.eq(doc_id_s))
+            .load::<RevTable>(conn)?;
+
+        let revisions = rev_tables
+            .into_iter()
+            .map(|table| table.into())
+            .collect::<Vec<Revision>>();
+        Ok(revisions)
     }
 
-    pub(crate) fn delete_op_table(&self, rev_id: i64, conn: &SqliteConnection) -> Result<(), DocError> {
-        let filter = dsl::rev_table.filter(rev_table::dsl::rev_id.eq(rev_id));
+    pub(crate) fn delete_rev_table(
+        &self,
+        doc_id_s: &str,
+        rev_id_s: i64,
+        conn: &SqliteConnection,
+    ) -> Result<(), DocError> {
+        let filter = dsl::rev_table.filter(rev_id.eq(rev_id_s)).filter(doc_id.eq(doc_id_s));
         let affected_row = diesel::delete(filter).execute(conn)?;
         debug_assert_eq!(affected_row, 1);
         Ok(())

+ 38 - 28
rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs

@@ -1,16 +1,18 @@
-use crate::entities::doc::{RevType, Revision};
+use crate::{
+    entities::doc::{RevType, Revision},
+    services::util::md5,
+};
 use diesel::sql_types::Integer;
 use flowy_database::schema::rev_table;
 
 #[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
 #[table_name = "rev_table"]
-#[primary_key(doc_id)]
 pub(crate) struct RevTable {
+    id: i32,
     pub(crate) doc_id: String,
     pub(crate) base_rev_id: i64,
     pub(crate) rev_id: i64,
     pub(crate) data: Vec<u8>,
-    pub(crate) md5: String,
     pub(crate) state: RevState,
     pub(crate) ty: RevTableType,
 }
@@ -44,6 +46,38 @@ impl RevState {
 }
 impl_sql_integer_expression!(RevState);
 
+impl std::convert::Into<Revision> for RevTable {
+    fn into(self) -> Revision {
+        let md5 = md5(&self.data);
+        Revision {
+            base_rev_id: self.base_rev_id,
+            rev_id: self.rev_id,
+            delta: self.data,
+            md5,
+            doc_id: self.doc_id,
+            ty: self.ty.into(),
+        }
+    }
+}
+
+impl std::convert::Into<RevTableType> for RevType {
+    fn into(self) -> RevTableType {
+        match self {
+            RevType::Local => RevTableType::Local,
+            RevType::Remote => RevTableType::Remote,
+        }
+    }
+}
+
+impl std::convert::From<RevTableType> for RevType {
+    fn from(ty: RevTableType) -> Self {
+        match ty {
+            RevTableType::Local => RevType::Local,
+            RevTableType::Remote => RevType::Remote,
+        }
+    }
+}
+
 #[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)]
 #[repr(i32)]
 #[sql_type = "Integer"]
@@ -73,32 +107,8 @@ impl RevTableType {
 }
 impl_sql_integer_expression!(RevTableType);
 
-#[derive(AsChangeset, Identifiable, Default, Debug)]
-#[table_name = "rev_table"]
-#[primary_key(doc_id)]
 pub(crate) struct RevChangeset {
     pub(crate) doc_id: String,
     pub(crate) rev_id: i64,
-    pub(crate) state: Option<RevState>,
-}
-
-impl std::convert::Into<RevTable> for Revision {
-    fn into(self) -> RevTable {
-        RevTable {
-            doc_id: self.doc_id,
-            base_rev_id: self.base_rev_id,
-            rev_id: self.rev_id,
-            data: self.delta,
-            md5: self.md5,
-            state: RevState::Local,
-            ty: rev_ty_to_rev_state(self.ty),
-        }
-    }
-}
-
-fn rev_ty_to_rev_state(ty: RevType) -> RevTableType {
-    match ty {
-        RevType::Local => RevTableType::Local,
-        RevType::Remote => RevTableType::Remote,
-    }
+    pub(crate) state: RevState,
 }

+ 5 - 5
rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs

@@ -6,7 +6,7 @@ use flowy_database::schema::doc_table;
 pub(crate) struct DocTable {
     pub(crate) id: String,
     pub(crate) data: String,
-    pub(crate) revision: i64,
+    pub(crate) rev_id: i64,
 }
 
 impl DocTable {
@@ -14,7 +14,7 @@ impl DocTable {
         Self {
             id: doc.id,
             data: doc.data,
-            revision: 0,
+            rev_id: 0,
         }
     }
 }
@@ -24,7 +24,7 @@ impl DocTable {
 pub(crate) struct DocTableChangeset {
     pub id: String,
     pub data: String,
-    pub revision: i64,
+    pub rev_id: i64,
 }
 
 impl std::convert::Into<Doc> for DocTable {
@@ -32,7 +32,7 @@ impl std::convert::Into<Doc> for DocTable {
         Doc {
             id: self.id,
             data: self.data,
-            rev_id: self.revision,
+            rev_id: self.rev_id,
         }
     }
 }
@@ -42,7 +42,7 @@ impl std::convert::From<Doc> for DocTable {
         Self {
             id: doc.id,
             data: doc.data,
-            revision: doc.rev_id,
+            rev_id: doc.rev_id,
         }
     }
 }