Bläddra i källkod

add revision cache layer

appflowy 3 år sedan
förälder
incheckning
23f4684d3f
35 ändrade filer med 549 tillägg och 417 borttagningar
  1. 5 0
      backend/Cargo.lock
  2. 2 1
      backend/src/services/doc/edit/edit_actor.rs
  3. 6 2
      backend/src/services/doc/edit/editor.rs
  4. 2 1
      backend/src/services/doc/manager.rs
  5. 2 1
      backend/src/services/doc/ws_actor.rs
  6. 0 1
      frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/protobuf.dart
  7. 50 50
      frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pb.dart
  8. 1 1
      frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbenum.dart
  9. 11 11
      frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbjson.dart
  10. 2 2
      frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbserver.dart
  11. 2 0
      frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/protobuf.dart
  12. 2 1
      frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs
  13. 2 1
      frontend/rust-lib/flowy-document/src/services/doc/edit/model.rs
  14. 1 1
      frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs
  15. 6 5
      frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs
  16. 3 38
      frontend/rust-lib/flowy-document/src/services/doc/revision/model.rs
  17. 14 8
      frontend/rust-lib/flowy-document/src/services/doc/revision/persistence.rs
  18. 3 3
      frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs
  19. 22 14
      frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs
  20. 5 0
      shared-lib/Cargo.lock
  21. 4 4
      shared-lib/flowy-derive/src/derive_cache/derive_cache.rs
  22. 0 2
      shared-lib/flowy-document-infra/src/entities/doc/mod.rs
  23. 2 4
      shared-lib/flowy-document-infra/src/entities/ws/ws.rs
  24. 0 3
      shared-lib/flowy-document-infra/src/protobuf/model/mod.rs
  25. 6 0
      shared-lib/lib-ot/Cargo.toml
  26. 3 0
      shared-lib/lib-ot/Flowy.toml
  27. 25 1
      shared-lib/lib-ot/src/errors.rs
  28. 2 0
      shared-lib/lib-ot/src/lib.rs
  29. 4 0
      shared-lib/lib-ot/src/protobuf/mod.rs
  30. 5 0
      shared-lib/lib-ot/src/protobuf/model/mod.rs
  31. 199 200
      shared-lib/lib-ot/src/protobuf/model/model.rs
  32. 3 3
      shared-lib/lib-ot/src/protobuf/proto/model.proto
  33. 109 0
      shared-lib/lib-ot/src/revision/cache.rs
  34. 5 0
      shared-lib/lib-ot/src/revision/mod.rs
  35. 41 59
      shared-lib/lib-ot/src/revision/model.rs

+ 5 - 0
backend/Cargo.lock

@@ -1972,13 +1972,18 @@ version = "0.1.0"
 dependencies = [
  "bytecount",
  "bytes",
+ "dashmap",
  "derive_more",
+ "flowy-derive",
  "lazy_static",
  "log",
+ "md5",
+ "protobuf",
  "serde",
  "serde_json",
  "strum",
  "strum_macros",
+ "tokio",
  "tracing",
 ]
 

+ 2 - 1
backend/src/services/doc/edit/edit_actor.rs

@@ -5,8 +5,9 @@ use crate::{
 use actix_web::web::Data;
 use async_stream::stream;
 use backend_service::errors::{internal_error, Result as DocResult, ServerError};
-use flowy_document_infra::protobuf::{Doc, Revision};
+use flowy_document_infra::protobuf::Doc;
 use futures::stream::StreamExt;
+use lib_ot::protobuf::Revision;
 use sqlx::PgPool;
 use std::sync::{atomic::Ordering::SeqCst, Arc};
 use tokio::{

+ 6 - 2
backend/src/services/doc/edit/editor.rs

@@ -11,9 +11,13 @@ use dashmap::DashMap;
 use flowy_document_infra::{
     core::Document,
     entities::ws::{WsDataType, WsDocumentData},
-    protobuf::{Doc, RevId, RevType, Revision, RevisionRange, UpdateDocParams},
+    protobuf::{Doc, UpdateDocParams},
+};
+use lib_ot::{
+    core::OperationTransformable,
+    protobuf::{RevId, RevType, Revision, RevisionRange},
+    rich_text::RichTextDelta,
 };
-use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta};
 use parking_lot::RwLock;
 use protobuf::Message;
 use sqlx::PgPool;

+ 2 - 1
backend/src/services/doc/manager.rs

@@ -9,7 +9,8 @@ use crate::{
 use actix_web::web::Data;
 use backend_service::errors::{internal_error, Result as DocResult, ServerError};
 use dashmap::DashMap;
-use flowy_document_infra::protobuf::{Doc, DocIdentifier, Revision};
+use flowy_document_infra::protobuf::{Doc, DocIdentifier};
+use lib_ot::protobuf::Revision;
 use sqlx::PgPool;
 use std::sync::Arc;
 use tokio::{

+ 2 - 1
backend/src/services/doc/ws_actor.rs

@@ -9,8 +9,9 @@ use actix_rt::task::spawn_blocking;
 use actix_web::web::Data;
 use async_stream::stream;
 use backend_service::errors::{internal_error, Result as DocResult, ServerError};
-use flowy_document_infra::protobuf::{NewDocUser, Revision, WsDataType, WsDocumentData};
+use flowy_document_infra::protobuf::{NewDocUser, WsDataType, WsDocumentData};
 use futures::stream::StreamExt;
+use lib_ot::protobuf::Revision;
 use sqlx::PgPool;
 use std::sync::Arc;
 use tokio::sync::{mpsc, oneshot};

+ 0 - 1
frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/protobuf.dart

@@ -1,4 +1,3 @@
 // Auto-generated, do not edit 
 export './ws.pb.dart';
-export './revision.pb.dart';
 export './doc.pb.dart';

+ 50 - 50
frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/revision.pb.dart → frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pb.dart

@@ -1,6 +1,6 @@
 ///
 //  Generated code. Do not modify.
-//  source: revision.proto
+//  source: model.proto
 //
 // @dart = 2.12
 // ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields
@@ -10,56 +10,9 @@ import 'dart:core' as $core;
 import 'package:fixnum/fixnum.dart' as $fixnum;
 import 'package:protobuf/protobuf.dart' as $pb;
 
-import 'revision.pbenum.dart';
+import 'model.pbenum.dart';
 
-export 'revision.pbenum.dart';
-
-class RevId extends $pb.GeneratedMessage {
-  static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RevId', createEmptyInstance: create)
-    ..aInt64(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'value')
-    ..hasRequiredFields = false
-  ;
-
-  RevId._() : super();
-  factory RevId({
-    $fixnum.Int64? value,
-  }) {
-    final _result = create();
-    if (value != null) {
-      _result.value = value;
-    }
-    return _result;
-  }
-  factory RevId.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
-  factory RevId.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
-  @$core.Deprecated(
-  'Using this can add significant overhead to your binary. '
-  'Use [GeneratedMessageGenericExtensions.deepCopy] instead. '
-  'Will be removed in next major version')
-  RevId clone() => RevId()..mergeFromMessage(this);
-  @$core.Deprecated(
-  'Using this can add significant overhead to your binary. '
-  'Use [GeneratedMessageGenericExtensions.rebuild] instead. '
-  'Will be removed in next major version')
-  RevId copyWith(void Function(RevId) updates) => super.copyWith((message) => updates(message as RevId)) as RevId; // ignore: deprecated_member_use
-  $pb.BuilderInfo get info_ => _i;
-  @$core.pragma('dart2js:noInline')
-  static RevId create() => RevId._();
-  RevId createEmptyInstance() => create();
-  static $pb.PbList<RevId> createRepeated() => $pb.PbList<RevId>();
-  @$core.pragma('dart2js:noInline')
-  static RevId getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<RevId>(create);
-  static RevId? _defaultInstance;
-
-  @$pb.TagNumber(1)
-  $fixnum.Int64 get value => $_getI64(0);
-  @$pb.TagNumber(1)
-  set value($fixnum.Int64 v) { $_setInt64(0, v); }
-  @$pb.TagNumber(1)
-  $core.bool hasValue() => $_has(0);
-  @$pb.TagNumber(1)
-  void clearValue() => clearField(1);
-}
+export 'model.pbenum.dart';
 
 class Revision extends $pb.GeneratedMessage {
   static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'Revision', createEmptyInstance: create)
@@ -178,6 +131,53 @@ class Revision extends $pb.GeneratedMessage {
   void clearTy() => clearField(6);
 }
 
+class RevId extends $pb.GeneratedMessage {
+  static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RevId', createEmptyInstance: create)
+    ..aInt64(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'value')
+    ..hasRequiredFields = false
+  ;
+
+  RevId._() : super();
+  factory RevId({
+    $fixnum.Int64? value,
+  }) {
+    final _result = create();
+    if (value != null) {
+      _result.value = value;
+    }
+    return _result;
+  }
+  factory RevId.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
+  factory RevId.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
+  @$core.Deprecated(
+  'Using this can add significant overhead to your binary. '
+  'Use [GeneratedMessageGenericExtensions.deepCopy] instead. '
+  'Will be removed in next major version')
+  RevId clone() => RevId()..mergeFromMessage(this);
+  @$core.Deprecated(
+  'Using this can add significant overhead to your binary. '
+  'Use [GeneratedMessageGenericExtensions.rebuild] instead. '
+  'Will be removed in next major version')
+  RevId copyWith(void Function(RevId) updates) => super.copyWith((message) => updates(message as RevId)) as RevId; // ignore: deprecated_member_use
+  $pb.BuilderInfo get info_ => _i;
+  @$core.pragma('dart2js:noInline')
+  static RevId create() => RevId._();
+  RevId createEmptyInstance() => create();
+  static $pb.PbList<RevId> createRepeated() => $pb.PbList<RevId>();
+  @$core.pragma('dart2js:noInline')
+  static RevId getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<RevId>(create);
+  static RevId? _defaultInstance;
+
+  @$pb.TagNumber(1)
+  $fixnum.Int64 get value => $_getI64(0);
+  @$pb.TagNumber(1)
+  set value($fixnum.Int64 v) { $_setInt64(0, v); }
+  @$pb.TagNumber(1)
+  $core.bool hasValue() => $_has(0);
+  @$pb.TagNumber(1)
+  void clearValue() => clearField(1);
+}
+
 class RevisionRange extends $pb.GeneratedMessage {
   static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'RevisionRange', createEmptyInstance: create)
     ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId')

+ 1 - 1
frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/revision.pbenum.dart → frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbenum.dart

@@ -1,6 +1,6 @@
 ///
 //  Generated code. Do not modify.
-//  source: revision.proto
+//  source: model.proto
 //
 // @dart = 2.12
 // ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields

+ 11 - 11
frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/revision.pbjson.dart → frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbjson.dart

@@ -1,6 +1,6 @@
 ///
 //  Generated code. Do not modify.
-//  source: revision.proto
+//  source: model.proto
 //
 // @dart = 2.12
 // ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields,deprecated_member_use_from_same_package
@@ -19,16 +19,6 @@ const RevType$json = const {
 
 /// Descriptor for `RevType`. Decode as a `google.protobuf.EnumDescriptorProto`.
 final $typed_data.Uint8List revTypeDescriptor = $convert.base64Decode('CgdSZXZUeXBlEgkKBUxvY2FsEAASCgoGUmVtb3RlEAE=');
-@$core.Deprecated('Use revIdDescriptor instead')
-const RevId$json = const {
-  '1': 'RevId',
-  '2': const [
-    const {'1': 'value', '3': 1, '4': 1, '5': 3, '10': 'value'},
-  ],
-};
-
-/// Descriptor for `RevId`. Decode as a `google.protobuf.DescriptorProto`.
-final $typed_data.Uint8List revIdDescriptor = $convert.base64Decode('CgVSZXZJZBIUCgV2YWx1ZRgBIAEoA1IFdmFsdWU=');
 @$core.Deprecated('Use revisionDescriptor instead')
 const Revision$json = const {
   '1': 'Revision',
@@ -44,6 +34,16 @@ const Revision$json = const {
 
 /// Descriptor for `Revision`. Decode as a `google.protobuf.DescriptorProto`.
 final $typed_data.Uint8List revisionDescriptor = $convert.base64Decode('CghSZXZpc2lvbhIeCgtiYXNlX3Jldl9pZBgBIAEoA1IJYmFzZVJldklkEhUKBnJldl9pZBgCIAEoA1IFcmV2SWQSHQoKZGVsdGFfZGF0YRgDIAEoDFIJZGVsdGFEYXRhEhAKA21kNRgEIAEoCVIDbWQ1EhUKBmRvY19pZBgFIAEoCVIFZG9jSWQSGAoCdHkYBiABKA4yCC5SZXZUeXBlUgJ0eQ==');
+@$core.Deprecated('Use revIdDescriptor instead')
+const RevId$json = const {
+  '1': 'RevId',
+  '2': const [
+    const {'1': 'value', '3': 1, '4': 1, '5': 3, '10': 'value'},
+  ],
+};
+
+/// Descriptor for `RevId`. Decode as a `google.protobuf.DescriptorProto`.
+final $typed_data.Uint8List revIdDescriptor = $convert.base64Decode('CgVSZXZJZBIUCgV2YWx1ZRgBIAEoA1IFdmFsdWU=');
 @$core.Deprecated('Use revisionRangeDescriptor instead')
 const RevisionRange$json = const {
   '1': 'RevisionRange',

+ 2 - 2
frontend/app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document-infra/revision.pbserver.dart → frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/model.pbserver.dart

@@ -1,9 +1,9 @@
 ///
 //  Generated code. Do not modify.
-//  source: revision.proto
+//  source: model.proto
 //
 // @dart = 2.12
 // ignore_for_file: annotate_overrides,camel_case_types,unnecessary_const,non_constant_identifier_names,library_prefixes,unused_import,unused_shown_name,return_of_invalid_type,unnecessary_this,prefer_final_fields,deprecated_member_use_from_same_package
 
-export 'revision.pb.dart';
+export 'model.pb.dart';
 

+ 2 - 0
frontend/app_flowy/packages/flowy_sdk/lib/protobuf/lib-ot/protobuf.dart

@@ -0,0 +1,2 @@
+// Auto-generated, do not edit 
+export './model.pb.dart';

+ 2 - 1
frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs

@@ -11,7 +11,7 @@ use flowy_database::ConnectionPool;
 use flowy_document_infra::{
     core::history::UndoResult,
     entities::{
-        doc::{DocDelta, RevId, RevType, Revision, RevisionRange},
+        doc::DocDelta,
         ws::{WsDataType, WsDocumentData},
     },
     errors::DocumentResult,
@@ -19,6 +19,7 @@ use flowy_document_infra::{
 use lib_infra::retry::{ExponentialBackoff, Retry};
 use lib_ot::{
     core::Interval,
+    revision::{RevId, RevType, Revision, RevisionRange},
     rich_text::{RichTextAttribute, RichTextDelta},
 };
 use lib_ws::WsConnectState;

+ 2 - 1
frontend/rust-lib/flowy-document/src/services/doc/edit/model.rs

@@ -1,7 +1,8 @@
 use crate::{errors::DocError, services::ws::DocumentWebSocket};
-use flowy_document_infra::entities::doc::{NewDocUser, RevId};
+use flowy_document_infra::entities::doc::NewDocUser;
 use futures::future::BoxFuture;
 use lib_infra::retry::Action;
+use lib_ot::revision::RevId;
 use std::{future, sync::Arc};
 
 pub(crate) struct OpenDocAction {

+ 1 - 1
frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs

@@ -2,12 +2,12 @@ use async_stream::stream;
 use bytes::Bytes;
 use flowy_document_infra::{
     core::{history::UndoResult, Document},
-    entities::doc::{RevId, Revision},
     errors::DocumentError,
 };
 use futures::stream::StreamExt;
 use lib_ot::{
     core::{Interval, OperationTransformable},
+    revision::{RevId, Revision},
     rich_text::{RichTextAttribute, RichTextDelta},
 };
 use std::{convert::TryFrom, sync::Arc};

+ 6 - 5
frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs

@@ -3,12 +3,13 @@ use crate::{
     services::doc::revision::RevisionStore,
 };
 use flowy_database::ConnectionPool;
-use flowy_document_infra::{
-    entities::doc::{Doc, RevId, RevType, Revision, RevisionRange},
-    util::RevIdCounter,
-};
+use flowy_document_infra::{entities::doc::Doc, util::RevIdCounter};
 use lib_infra::future::ResultFuture;
-use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta};
+use lib_ot::{
+    core::OperationTransformable,
+    revision::{RevId, RevType, Revision, RevisionRange},
+    rich_text::RichTextDelta,
+};
 use std::sync::Arc;
 use tokio::sync::mpsc;
 

+ 3 - 38
frontend/rust-lib/flowy-document/src/services/doc/revision/model.rs

@@ -1,48 +1,13 @@
 use crate::{
     errors::{internal_error, DocError, DocResult},
-    sql_tables::{RevState, RevTableSql},
+    sql_tables::{RevTableSql, SqlRevState},
 };
 use flowy_database::ConnectionPool;
-use flowy_document_infra::entities::doc::{Revision, RevisionRange};
 use lib_infra::future::ResultFuture;
+use lib_ot::revision::{Revision, RevisionRange};
 use std::sync::Arc;
 use tokio::sync::broadcast;
 
-pub type RevIdReceiver = broadcast::Receiver<i64>;
-pub type RevIdSender = broadcast::Sender<i64>;
-
-pub struct RevisionRecord {
-    pub revision: Revision,
-    pub state: RevState,
-}
-
-impl RevisionRecord {
-    pub fn new(revision: Revision) -> Self {
-        Self {
-            revision,
-            state: RevState::Local,
-        }
-    }
-}
-
-pub(crate) struct PendingRevId {
-    pub rev_id: i64,
-    pub sender: RevIdSender,
-}
-
-impl PendingRevId {
-    pub(crate) fn new(rev_id: i64, sender: RevIdSender) -> Self { Self { rev_id, sender } }
-
-    pub(crate) fn finish(&self, rev_id: i64) -> bool {
-        if self.rev_id > rev_id {
-            false
-        } else {
-            let _ = self.sender.send(self.rev_id);
-            true
-        }
-    }
-}
-
 pub(crate) struct Persistence {
     pub(crate) rev_sql: Arc<RevTableSql>,
     pub(crate) pool: Arc<ConnectionPool>,
@@ -54,7 +19,7 @@ impl Persistence {
         Self { rev_sql, pool }
     }
 
-    pub(crate) fn create_revs(&self, revisions: Vec<(Revision, RevState)>) -> DocResult<()> {
+    pub(crate) fn create_revs(&self, revisions: Vec<(Revision, SqlRevState)>) -> DocResult<()> {
         let conn = &*self.pool.get().map_err(internal_error)?;
         conn.immediate_transaction::<_, DocError, _>(|| {
             let _ = self.rev_sql.create_rev_table(revisions, conn)?;

+ 14 - 8
frontend/rust-lib/flowy-document/src/services/doc/revision/persistence.rs

@@ -1,16 +1,17 @@
 use crate::{
     errors::{internal_error, DocError, DocResult},
     services::doc::revision::{model::*, RevisionServer},
-    sql_tables::RevState,
+    sql_tables::SqlRevState,
 };
 use async_stream::stream;
 use dashmap::DashMap;
 use flowy_database::{ConnectionPool, SqliteConnection};
-use flowy_document_infra::entities::doc::{revision_from_doc, Doc, RevId, RevType, Revision, RevisionRange};
+use flowy_document_infra::entities::doc::Doc;
 use futures::stream::StreamExt;
 use lib_infra::future::ResultFuture;
 use lib_ot::{
     core::{Operation, OperationTransformable},
+    revision::{PendingRevId, RevId, RevIdReceiver, RevType, Revision, RevisionRange, RevisionRecord},
     rich_text::RichTextDelta,
 };
 use std::{collections::VecDeque, sync::Arc, time::Duration};
@@ -52,7 +53,7 @@ impl RevisionStore {
             server,
         });
 
-        tokio::spawn(RevisionStream::new(store.clone(), pending_rx, ws_revision_sender).run());
+        tokio::spawn(RevisionUploadStream::new(store.clone(), pending_rx, ws_revision_sender).run());
 
         store
     }
@@ -70,7 +71,7 @@ impl RevisionStore {
             if let Ok(rev_id) = rx.recv().await {
                 match revs_map.get_mut(&rev_id) {
                     None => {},
-                    Some(mut rev) => rev.value_mut().state = RevState::Acked,
+                    Some(mut rev) => rev.value_mut().state = SqlRevState::Acked.into(),
                 }
             }
         });
@@ -113,7 +114,7 @@ impl RevisionStore {
             let revisions_state = revs_map
                 .iter()
                 .map(|kv| (kv.revision.clone(), kv.state))
-                .collect::<Vec<(Revision, RevState)>>();
+                .collect::<Vec<(Revision, SqlRevState)>>();
 
             match persistence.create_revs(revisions_state.clone()) {
                 Ok(_) => {
@@ -157,11 +158,16 @@ impl RevisionStore {
 
         let doc = self.server.fetch_document_from_remote(&self.doc_id).await?;
         let revision = revision_from_doc(doc.clone(), RevType::Remote);
-        let _ = self.persistence.create_revs(vec![(revision, RevState::Acked)])?;
+        let _ = self.persistence.create_revs(vec![(revision, SqlRevState::Acked)])?;
         Ok(doc)
     }
 }
 
+pub fn revision_from_doc(doc: Doc, ty: RevType) -> Revision {
+    let delta_data = doc.data.as_bytes();
+    Revision::new(doc.base_rev_id, doc.rev_id, delta_data.to_owned(), &doc.id, ty)
+}
+
 impl RevisionIterator for RevisionStore {
     fn next(&self) -> ResultFuture<Option<Revision>, DocError> {
         let pending_revs = self.pending_revs.clone();
@@ -316,13 +322,13 @@ pub(crate) enum PendingMsg {
 pub(crate) type PendingSender = mpsc::UnboundedSender<PendingMsg>;
 pub(crate) type PendingReceiver = mpsc::UnboundedReceiver<PendingMsg>;
 
-pub(crate) struct RevisionStream {
+pub(crate) struct RevisionUploadStream {
     revisions: Arc<dyn RevisionIterator>,
     receiver: Option<PendingReceiver>,
     ws_revision_sender: mpsc::UnboundedSender<Revision>,
 }
 
-impl RevisionStream {
+impl RevisionUploadStream {
     pub(crate) fn new(
         revisions: Arc<dyn RevisionIterator>,
         pending_rx: PendingReceiver,

+ 3 - 3
frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_sql.rs

@@ -1,17 +1,17 @@
 use crate::{
     errors::DocError,
-    sql_tables::{doc::RevTable, RevChangeset, RevState, RevTableType},
+    sql_tables::{doc::RevTable, RevChangeset, RevTableType, SqlRevState},
 };
 use diesel::update;
 use flowy_database::{insert_or_ignore_into, prelude::*, schema::rev_table::dsl, SqliteConnection};
-use flowy_document_infra::entities::doc::{Revision, RevisionRange};
+use lib_ot::revision::{Revision, RevisionRange};
 
 pub struct RevTableSql {}
 
 impl RevTableSql {
     pub(crate) fn create_rev_table(
         &self,
-        revisions: Vec<(Revision, RevState)>,
+        revisions: Vec<(Revision, SqlRevState)>,
         conn: &SqliteConnection,
     ) -> Result<(), DocError> {
         // Batch insert: https://diesel.rs/guides/all-about-inserts.html

+ 22 - 14
frontend/rust-lib/flowy-document/src/sql_tables/doc/rev_table.rs

@@ -1,9 +1,7 @@
 use diesel::sql_types::Integer;
 use flowy_database::schema::rev_table;
-use flowy_document_infra::{
-    entities::doc::{RevId, RevType, Revision},
-    util::md5,
-};
+use flowy_document_infra::util::md5;
+use lib_ot::revision::{RevId, RevState, RevType, Revision};
 
 #[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
 #[table_name = "rev_table"]
@@ -13,39 +11,49 @@ pub(crate) struct RevTable {
     pub(crate) base_rev_id: i64,
     pub(crate) rev_id: i64,
     pub(crate) data: Vec<u8>,
-    pub(crate) state: RevState,
+    pub(crate) state: SqlRevState,
     pub(crate) ty: RevTableType,
 }
 
 #[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)]
 #[repr(i32)]
 #[sql_type = "Integer"]
-pub enum RevState {
+pub enum SqlRevState {
     Local = 0,
     Acked = 1,
 }
 
-impl std::default::Default for RevState {
-    fn default() -> Self { RevState::Local }
+impl std::default::Default for SqlRevState {
+    fn default() -> Self { SqlRevState::Local }
 }
 
-impl std::convert::From<i32> for RevState {
+impl std::convert::From<i32> for SqlRevState {
     fn from(value: i32) -> Self {
         match value {
-            0 => RevState::Local,
-            1 => RevState::Acked,
+            0 => SqlRevState::Local,
+            1 => SqlRevState::Acked,
             o => {
                 log::error!("Unsupported rev state {}, fallback to RevState::Local", o);
-                RevState::Local
+                SqlRevState::Local
             },
         }
     }
 }
-impl RevState {
+
+impl SqlRevState {
     pub fn value(&self) -> i32 { *self as i32 }
 }
 impl_sql_integer_expression!(RevState);
 
+impl std::convert::From<SqlRevState> for RevState {
+    fn from(s: SqlRevState) -> Self {
+        match s {
+            SqlRevState::Local => RevState.Local,
+            SqlRevState::Acked => RevState.Acked,
+        }
+    }
+}
+
 impl std::convert::From<RevTable> for Revision {
     fn from(table: RevTable) -> Self {
         let md5 = md5(&table.data);
@@ -111,5 +119,5 @@ impl_sql_integer_expression!(RevTableType);
 pub(crate) struct RevChangeset {
     pub(crate) doc_id: String,
     pub(crate) rev_id: RevId,
-    pub(crate) state: RevState,
+    pub(crate) state: SqlRevState,
 }

+ 5 - 0
shared-lib/Cargo.lock

@@ -1128,13 +1128,18 @@ version = "0.1.0"
 dependencies = [
  "bytecount",
  "bytes",
+ "dashmap",
  "derive_more",
+ "flowy-derive",
  "lazy_static",
  "log",
+ "md5",
+ "protobuf",
  "serde",
  "serde_json",
  "strum",
  "strum_macros",
+ "tokio",
  "tracing",
 ]
 

+ 4 - 4
shared-lib/flowy-derive/src/derive_cache/derive_cache.rs

@@ -63,12 +63,12 @@ pub fn category_from_str(type_str: &str) -> TypeCategory {
         | "DocDelta"
         | "NewDocUser"
         | "DocIdentifier"
-        | "RevId"
-        | "Revision"
-        | "RevisionRange"
         | "WsDocumentData"
         | "WsError"
         | "WsMessage"
+        | "Revision"
+        | "RevId"
+        | "RevisionRange"
         | "SignInRequest"
         | "SignInParams"
         | "SignInResponse"
@@ -91,9 +91,9 @@ pub fn category_from_str(type_str: &str) -> TypeCategory {
         | "TrashType"
         | "ViewType"
         | "ExportType"
-        | "RevType"
         | "WsDataType"
         | "WsModule"
+        | "RevType"
         => TypeCategory::Enum,
 
         "Option" => TypeCategory::Opt,

+ 0 - 2
shared-lib/flowy-document-infra/src/entities/doc/mod.rs

@@ -1,7 +1,5 @@
 #![allow(clippy::module_inception)]
 mod doc;
 pub mod parser;
-mod revision;
 
 pub use doc::*;
-pub use revision::*;

+ 2 - 4
shared-lib/flowy-document-infra/src/entities/ws/ws.rs

@@ -1,9 +1,7 @@
-use crate::{
-    entities::doc::{NewDocUser, Revision},
-    errors::DocumentError,
-};
+use crate::{entities::doc::NewDocUser, errors::DocumentError};
 use bytes::Bytes;
 use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
+use lib_ot::revision::Revision;
 use std::convert::{TryFrom, TryInto};
 
 #[derive(Debug, Clone, ProtoBuf_Enum, Eq, PartialEq, Hash)]

+ 0 - 3
shared-lib/flowy-document-infra/src/protobuf/model/mod.rs

@@ -4,8 +4,5 @@
 mod ws;
 pub use ws::*;
 
-mod revision;
-pub use revision::*;
-
 mod doc;
 pub use doc::*;

+ 6 - 0
shared-lib/lib-ot/Cargo.toml

@@ -8,6 +8,12 @@ edition = "2018"
 [dependencies]
 bytecount = "0.6.0"
 serde = { version = "1.0", features = ["derive"] }
+protobuf = {version = "2.18.0"}
+flowy-derive = { path = "../flowy-derive" }
+tokio = {version = "1", features = ["sync"]}
+dashmap = "4.0"
+md5 = "0.7.0"
+
 serde_json = {version = "1.0"}
 derive_more = {version = "0.99", features = ["display"]}
 log = "0.4"

+ 3 - 0
shared-lib/lib-ot/Flowy.toml

@@ -0,0 +1,3 @@
+
+proto_crates = ["src/revision"]
+event_files = []

+ 25 - 1
shared-lib/lib-ot/src/errors.rs

@@ -1,4 +1,4 @@
-use std::{error::Error, fmt, str::Utf8Error};
+use std::{error::Error, fmt, fmt::Debug, str::Utf8Error};
 
 #[derive(Clone, Debug)]
 pub struct OTError {
@@ -6,6 +6,22 @@ pub struct OTError {
     pub msg: String,
 }
 
+macro_rules! static_ot_error {
+    ($name:ident, $code:expr) => {
+        #[allow(non_snake_case, missing_docs)]
+        pub fn $name() -> OTError { $code.into() }
+    };
+}
+
+impl std::convert::From<OTErrorCode> for OTError {
+    fn from(code: OTErrorCode) -> Self {
+        OTError {
+            code: code.clone(),
+            msg: format!("{:?}", code),
+        }
+    }
+}
+
 impl OTError {
     pub fn new(code: OTErrorCode, msg: &str) -> OTError {
         Self {
@@ -13,6 +29,13 @@ impl OTError {
             msg: msg.to_owned(),
         }
     }
+
+    pub fn context<T: Debug>(mut self, error: T) -> Self {
+        self.msg = format!("{:?}", error);
+        self
+    }
+
+    static_ot_error!(duplicate_revision, OTErrorCode::DuplicatedRevision);
 }
 
 impl fmt::Display for OTError {
@@ -42,6 +65,7 @@ pub enum OTErrorCode {
     UndoFail,
     RedoFail,
     SerdeError,
+    DuplicatedRevision,
 }
 
 pub struct ErrorBuilder {

+ 2 - 0
shared-lib/lib-ot/src/lib.rs

@@ -1,3 +1,5 @@
 pub mod core;
 pub mod errors;
+pub mod protobuf;
+pub mod revision;
 pub mod rich_text;

+ 4 - 0
shared-lib/lib-ot/src/protobuf/mod.rs

@@ -0,0 +1,4 @@
+#![cfg_attr(rustfmt, rustfmt::skip)]
+// Auto-generated, do not edit
+mod model;
+pub use model::*;

+ 5 - 0
shared-lib/lib-ot/src/protobuf/model/mod.rs

@@ -0,0 +1,5 @@
+#![cfg_attr(rustfmt, rustfmt::skip)]
+// Auto-generated, do not edit
+
+mod model;
+pub use model::*;

+ 199 - 200
shared-lib/flowy-document-infra/src/protobuf/model/revision.rs → shared-lib/lib-ot/src/protobuf/model/model.rs

@@ -17,164 +17,12 @@
 #![allow(trivial_casts)]
 #![allow(unused_imports)]
 #![allow(unused_results)]
-//! Generated file from `revision.proto`
+//! Generated file from `model.proto`
 
 /// Generated files are compatible only with the same version
 /// of protobuf runtime.
 // const _PROTOBUF_VERSION_CHECK: () = ::protobuf::VERSION_2_22_1;
 
-#[derive(PartialEq,Clone,Default)]
-pub struct RevId {
-    // message fields
-    pub value: i64,
-    // special fields
-    pub unknown_fields: ::protobuf::UnknownFields,
-    pub cached_size: ::protobuf::CachedSize,
-}
-
-impl<'a> ::std::default::Default for &'a RevId {
-    fn default() -> &'a RevId {
-        <RevId as ::protobuf::Message>::default_instance()
-    }
-}
-
-impl RevId {
-    pub fn new() -> RevId {
-        ::std::default::Default::default()
-    }
-
-    // int64 value = 1;
-
-
-    pub fn get_value(&self) -> i64 {
-        self.value
-    }
-    pub fn clear_value(&mut self) {
-        self.value = 0;
-    }
-
-    // Param is passed by value, moved
-    pub fn set_value(&mut self, v: i64) {
-        self.value = v;
-    }
-}
-
-impl ::protobuf::Message for RevId {
-    fn is_initialized(&self) -> bool {
-        true
-    }
-
-    fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
-        while !is.eof()? {
-            let (field_number, wire_type) = is.read_tag_unpack()?;
-            match field_number {
-                1 => {
-                    if wire_type != ::protobuf::wire_format::WireTypeVarint {
-                        return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
-                    }
-                    let tmp = is.read_int64()?;
-                    self.value = tmp;
-                },
-                _ => {
-                    ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
-                },
-            };
-        }
-        ::std::result::Result::Ok(())
-    }
-
-    // Compute sizes of nested messages
-    #[allow(unused_variables)]
-    fn compute_size(&self) -> u32 {
-        let mut my_size = 0;
-        if self.value != 0 {
-            my_size += ::protobuf::rt::value_size(1, self.value, ::protobuf::wire_format::WireTypeVarint);
-        }
-        my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
-        self.cached_size.set(my_size);
-        my_size
-    }
-
-    fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
-        if self.value != 0 {
-            os.write_int64(1, self.value)?;
-        }
-        os.write_unknown_fields(self.get_unknown_fields())?;
-        ::std::result::Result::Ok(())
-    }
-
-    fn get_cached_size(&self) -> u32 {
-        self.cached_size.get()
-    }
-
-    fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
-        &self.unknown_fields
-    }
-
-    fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
-        &mut self.unknown_fields
-    }
-
-    fn as_any(&self) -> &dyn (::std::any::Any) {
-        self as &dyn (::std::any::Any)
-    }
-    fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
-        self as &mut dyn (::std::any::Any)
-    }
-    fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
-        self
-    }
-
-    fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
-        Self::descriptor_static()
-    }
-
-    fn new() -> RevId {
-        RevId::new()
-    }
-
-    fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
-        static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
-        descriptor.get(|| {
-            let mut fields = ::std::vec::Vec::new();
-            fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>(
-                "value",
-                |m: &RevId| { &m.value },
-                |m: &mut RevId| { &mut m.value },
-            ));
-            ::protobuf::reflect::MessageDescriptor::new_pb_name::<RevId>(
-                "RevId",
-                fields,
-                file_descriptor_proto()
-            )
-        })
-    }
-
-    fn default_instance() -> &'static RevId {
-        static instance: ::protobuf::rt::LazyV2<RevId> = ::protobuf::rt::LazyV2::INIT;
-        instance.get(RevId::new)
-    }
-}
-
-impl ::protobuf::Clear for RevId {
-    fn clear(&mut self) {
-        self.value = 0;
-        self.unknown_fields.clear();
-    }
-}
-
-impl ::std::fmt::Debug for RevId {
-    fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
-        ::protobuf::text_format::fmt(self, f)
-    }
-}
-
-impl ::protobuf::reflect::ProtobufValue for RevId {
-    fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
-        ::protobuf::reflect::ReflectValueRef::Message(self)
-    }
-}
-
 #[derive(PartialEq,Clone,Default)]
 pub struct Revision {
     // message fields
@@ -519,6 +367,158 @@ impl ::protobuf::reflect::ProtobufValue for Revision {
     }
 }
 
+#[derive(PartialEq,Clone,Default)]
+pub struct RevId {
+    // message fields
+    pub value: i64,
+    // special fields
+    pub unknown_fields: ::protobuf::UnknownFields,
+    pub cached_size: ::protobuf::CachedSize,
+}
+
+impl<'a> ::std::default::Default for &'a RevId {
+    fn default() -> &'a RevId {
+        <RevId as ::protobuf::Message>::default_instance()
+    }
+}
+
+impl RevId {
+    pub fn new() -> RevId {
+        ::std::default::Default::default()
+    }
+
+    // int64 value = 1;
+
+
+    pub fn get_value(&self) -> i64 {
+        self.value
+    }
+    pub fn clear_value(&mut self) {
+        self.value = 0;
+    }
+
+    // Param is passed by value, moved
+    pub fn set_value(&mut self, v: i64) {
+        self.value = v;
+    }
+}
+
+impl ::protobuf::Message for RevId {
+    fn is_initialized(&self) -> bool {
+        true
+    }
+
+    fn merge_from(&mut self, is: &mut ::protobuf::CodedInputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+        while !is.eof()? {
+            let (field_number, wire_type) = is.read_tag_unpack()?;
+            match field_number {
+                1 => {
+                    if wire_type != ::protobuf::wire_format::WireTypeVarint {
+                        return ::std::result::Result::Err(::protobuf::rt::unexpected_wire_type(wire_type));
+                    }
+                    let tmp = is.read_int64()?;
+                    self.value = tmp;
+                },
+                _ => {
+                    ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
+                },
+            };
+        }
+        ::std::result::Result::Ok(())
+    }
+
+    // Compute sizes of nested messages
+    #[allow(unused_variables)]
+    fn compute_size(&self) -> u32 {
+        let mut my_size = 0;
+        if self.value != 0 {
+            my_size += ::protobuf::rt::value_size(1, self.value, ::protobuf::wire_format::WireTypeVarint);
+        }
+        my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
+        self.cached_size.set(my_size);
+        my_size
+    }
+
+    fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
+        if self.value != 0 {
+            os.write_int64(1, self.value)?;
+        }
+        os.write_unknown_fields(self.get_unknown_fields())?;
+        ::std::result::Result::Ok(())
+    }
+
+    fn get_cached_size(&self) -> u32 {
+        self.cached_size.get()
+    }
+
+    fn get_unknown_fields(&self) -> &::protobuf::UnknownFields {
+        &self.unknown_fields
+    }
+
+    fn mut_unknown_fields(&mut self) -> &mut ::protobuf::UnknownFields {
+        &mut self.unknown_fields
+    }
+
+    fn as_any(&self) -> &dyn (::std::any::Any) {
+        self as &dyn (::std::any::Any)
+    }
+    fn as_any_mut(&mut self) -> &mut dyn (::std::any::Any) {
+        self as &mut dyn (::std::any::Any)
+    }
+    fn into_any(self: ::std::boxed::Box<Self>) -> ::std::boxed::Box<dyn (::std::any::Any)> {
+        self
+    }
+
+    fn descriptor(&self) -> &'static ::protobuf::reflect::MessageDescriptor {
+        Self::descriptor_static()
+    }
+
+    fn new() -> RevId {
+        RevId::new()
+    }
+
+    fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
+        static descriptor: ::protobuf::rt::LazyV2<::protobuf::reflect::MessageDescriptor> = ::protobuf::rt::LazyV2::INIT;
+        descriptor.get(|| {
+            let mut fields = ::std::vec::Vec::new();
+            fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeInt64>(
+                "value",
+                |m: &RevId| { &m.value },
+                |m: &mut RevId| { &mut m.value },
+            ));
+            ::protobuf::reflect::MessageDescriptor::new_pb_name::<RevId>(
+                "RevId",
+                fields,
+                file_descriptor_proto()
+            )
+        })
+    }
+
+    fn default_instance() -> &'static RevId {
+        static instance: ::protobuf::rt::LazyV2<RevId> = ::protobuf::rt::LazyV2::INIT;
+        instance.get(RevId::new)
+    }
+}
+
+impl ::protobuf::Clear for RevId {
+    fn clear(&mut self) {
+        self.value = 0;
+        self.unknown_fields.clear();
+    }
+}
+
+impl ::std::fmt::Debug for RevId {
+    fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
+        ::protobuf::text_format::fmt(self, f)
+    }
+}
+
+impl ::protobuf::reflect::ProtobufValue for RevId {
+    fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
+        ::protobuf::reflect::ReflectValueRef::Message(self)
+    }
+}
+
 #[derive(PartialEq,Clone,Default)]
 pub struct RevisionRange {
     // message fields
@@ -799,53 +799,52 @@ impl ::protobuf::reflect::ProtobufValue for RevType {
 }
 
 static file_descriptor_proto_data: &'static [u8] = b"\
-    \n\x0erevision.proto\"\x1d\n\x05RevId\x12\x14\n\x05value\x18\x01\x20\x01\
-    (\x03R\x05value\"\xa3\x01\n\x08Revision\x12\x1e\n\x0bbase_rev_id\x18\x01\
-    \x20\x01(\x03R\tbaseRevId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\x03R\x05r\
-    evId\x12\x1d\n\ndelta_data\x18\x03\x20\x01(\x0cR\tdeltaData\x12\x10\n\
-    \x03md5\x18\x04\x20\x01(\tR\x03md5\x12\x15\n\x06doc_id\x18\x05\x20\x01(\
-    \tR\x05docId\x12\x18\n\x02ty\x18\x06\x20\x01(\x0e2\x08.RevTypeR\x02ty\"N\
-    \n\rRevisionRange\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\
-    \x14\n\x05start\x18\x02\x20\x01(\x03R\x05start\x12\x10\n\x03end\x18\x03\
-    \x20\x01(\x03R\x03end*\x20\n\x07RevType\x12\t\n\x05Local\x10\0\x12\n\n\
-    \x06Remote\x10\x01J\xea\x05\n\x06\x12\x04\0\0\x15\x01\n\x08\n\x01\x0c\
-    \x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x04\x01\n\n\n\x03\x04\0\
-    \x01\x12\x03\x02\x08\r\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x14\n\x0c\
-    \n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\0\x01\x12\
-    \x03\x03\n\x0f\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x12\x13\n\n\n\x02\
-    \x04\x01\x12\x04\x05\0\x0c\x01\n\n\n\x03\x04\x01\x01\x12\x03\x05\x08\x10\
-    \n\x0b\n\x04\x04\x01\x02\0\x12\x03\x06\x04\x1a\n\x0c\n\x05\x04\x01\x02\0\
-    \x05\x12\x03\x06\x04\t\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x06\n\x15\n\
-    \x0c\n\x05\x04\x01\x02\0\x03\x12\x03\x06\x18\x19\n\x0b\n\x04\x04\x01\x02\
-    \x01\x12\x03\x07\x04\x15\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\x07\x04\
-    \t\n\x0c\n\x05\x04\x01\x02\x01\x01\x12\x03\x07\n\x10\n\x0c\n\x05\x04\x01\
-    \x02\x01\x03\x12\x03\x07\x13\x14\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\x08\
-    \x04\x19\n\x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\x08\x04\t\n\x0c\n\x05\
-    \x04\x01\x02\x02\x01\x12\x03\x08\n\x14\n\x0c\n\x05\x04\x01\x02\x02\x03\
-    \x12\x03\x08\x17\x18\n\x0b\n\x04\x04\x01\x02\x03\x12\x03\t\x04\x13\n\x0c\
-    \n\x05\x04\x01\x02\x03\x05\x12\x03\t\x04\n\n\x0c\n\x05\x04\x01\x02\x03\
-    \x01\x12\x03\t\x0b\x0e\n\x0c\n\x05\x04\x01\x02\x03\x03\x12\x03\t\x11\x12\
-    \n\x0b\n\x04\x04\x01\x02\x04\x12\x03\n\x04\x16\n\x0c\n\x05\x04\x01\x02\
-    \x04\x05\x12\x03\n\x04\n\n\x0c\n\x05\x04\x01\x02\x04\x01\x12\x03\n\x0b\
-    \x11\n\x0c\n\x05\x04\x01\x02\x04\x03\x12\x03\n\x14\x15\n\x0b\n\x04\x04\
-    \x01\x02\x05\x12\x03\x0b\x04\x13\n\x0c\n\x05\x04\x01\x02\x05\x06\x12\x03\
-    \x0b\x04\x0b\n\x0c\n\x05\x04\x01\x02\x05\x01\x12\x03\x0b\x0c\x0e\n\x0c\n\
-    \x05\x04\x01\x02\x05\x03\x12\x03\x0b\x11\x12\n\n\n\x02\x04\x02\x12\x04\r\
-    \0\x11\x01\n\n\n\x03\x04\x02\x01\x12\x03\r\x08\x15\n\x0b\n\x04\x04\x02\
-    \x02\0\x12\x03\x0e\x04\x16\n\x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0e\x04\
-    \n\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\x0e\x0b\x11\n\x0c\n\x05\x04\x02\
-    \x02\0\x03\x12\x03\x0e\x14\x15\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x0f\
-    \x04\x14\n\x0c\n\x05\x04\x02\x02\x01\x05\x12\x03\x0f\x04\t\n\x0c\n\x05\
-    \x04\x02\x02\x01\x01\x12\x03\x0f\n\x0f\n\x0c\n\x05\x04\x02\x02\x01\x03\
-    \x12\x03\x0f\x12\x13\n\x0b\n\x04\x04\x02\x02\x02\x12\x03\x10\x04\x12\n\
-    \x0c\n\x05\x04\x02\x02\x02\x05\x12\x03\x10\x04\t\n\x0c\n\x05\x04\x02\x02\
-    \x02\x01\x12\x03\x10\n\r\n\x0c\n\x05\x04\x02\x02\x02\x03\x12\x03\x10\x10\
-    \x11\n\n\n\x02\x05\0\x12\x04\x12\0\x15\x01\n\n\n\x03\x05\0\x01\x12\x03\
-    \x12\x05\x0c\n\x0b\n\x04\x05\0\x02\0\x12\x03\x13\x04\x0e\n\x0c\n\x05\x05\
-    \0\x02\0\x01\x12\x03\x13\x04\t\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x13\
-    \x0c\r\n\x0b\n\x04\x05\0\x02\x01\x12\x03\x14\x04\x0f\n\x0c\n\x05\x05\0\
-    \x02\x01\x01\x12\x03\x14\x04\n\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x14\
-    \r\x0eb\x06proto3\
+    \n\x0bmodel.proto\"\xa3\x01\n\x08Revision\x12\x1e\n\x0bbase_rev_id\x18\
+    \x01\x20\x01(\x03R\tbaseRevId\x12\x15\n\x06rev_id\x18\x02\x20\x01(\x03R\
+    \x05revId\x12\x1d\n\ndelta_data\x18\x03\x20\x01(\x0cR\tdeltaData\x12\x10\
+    \n\x03md5\x18\x04\x20\x01(\tR\x03md5\x12\x15\n\x06doc_id\x18\x05\x20\x01\
+    (\tR\x05docId\x12\x18\n\x02ty\x18\x06\x20\x01(\x0e2\x08.RevTypeR\x02ty\"\
+    \x1d\n\x05RevId\x12\x14\n\x05value\x18\x01\x20\x01(\x03R\x05value\"N\n\r\
+    RevisionRange\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\x14\n\
+    \x05start\x18\x02\x20\x01(\x03R\x05start\x12\x10\n\x03end\x18\x03\x20\
+    \x01(\x03R\x03end*\x20\n\x07RevType\x12\t\n\x05Local\x10\0\x12\n\n\x06Re\
+    mote\x10\x01J\xea\x05\n\x06\x12\x04\0\0\x15\x01\n\x08\n\x01\x0c\x12\x03\
+    \0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\t\x01\n\n\n\x03\x04\0\x01\x12\x03\
+    \x02\x08\x10\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x1a\n\x0c\n\x05\x04\
+    \0\x02\0\x05\x12\x03\x03\x04\t\n\x0c\n\x05\x04\0\x02\0\x01\x12\x03\x03\n\
+    \x15\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x18\x19\n\x0b\n\x04\x04\0\
+    \x02\x01\x12\x03\x04\x04\x15\n\x0c\n\x05\x04\0\x02\x01\x05\x12\x03\x04\
+    \x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\n\x10\n\x0c\n\x05\x04\0\
+    \x02\x01\x03\x12\x03\x04\x13\x14\n\x0b\n\x04\x04\0\x02\x02\x12\x03\x05\
+    \x04\x19\n\x0c\n\x05\x04\0\x02\x02\x05\x12\x03\x05\x04\t\n\x0c\n\x05\x04\
+    \0\x02\x02\x01\x12\x03\x05\n\x14\n\x0c\n\x05\x04\0\x02\x02\x03\x12\x03\
+    \x05\x17\x18\n\x0b\n\x04\x04\0\x02\x03\x12\x03\x06\x04\x13\n\x0c\n\x05\
+    \x04\0\x02\x03\x05\x12\x03\x06\x04\n\n\x0c\n\x05\x04\0\x02\x03\x01\x12\
+    \x03\x06\x0b\x0e\n\x0c\n\x05\x04\0\x02\x03\x03\x12\x03\x06\x11\x12\n\x0b\
+    \n\x04\x04\0\x02\x04\x12\x03\x07\x04\x16\n\x0c\n\x05\x04\0\x02\x04\x05\
+    \x12\x03\x07\x04\n\n\x0c\n\x05\x04\0\x02\x04\x01\x12\x03\x07\x0b\x11\n\
+    \x0c\n\x05\x04\0\x02\x04\x03\x12\x03\x07\x14\x15\n\x0b\n\x04\x04\0\x02\
+    \x05\x12\x03\x08\x04\x13\n\x0c\n\x05\x04\0\x02\x05\x06\x12\x03\x08\x04\
+    \x0b\n\x0c\n\x05\x04\0\x02\x05\x01\x12\x03\x08\x0c\x0e\n\x0c\n\x05\x04\0\
+    \x02\x05\x03\x12\x03\x08\x11\x12\n\n\n\x02\x04\x01\x12\x04\n\0\x0c\x01\n\
+    \n\n\x03\x04\x01\x01\x12\x03\n\x08\r\n\x0b\n\x04\x04\x01\x02\0\x12\x03\
+    \x0b\x04\x14\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x0b\x04\t\n\x0c\n\x05\
+    \x04\x01\x02\0\x01\x12\x03\x0b\n\x0f\n\x0c\n\x05\x04\x01\x02\0\x03\x12\
+    \x03\x0b\x12\x13\n\n\n\x02\x04\x02\x12\x04\r\0\x11\x01\n\n\n\x03\x04\x02\
+    \x01\x12\x03\r\x08\x15\n\x0b\n\x04\x04\x02\x02\0\x12\x03\x0e\x04\x16\n\
+    \x0c\n\x05\x04\x02\x02\0\x05\x12\x03\x0e\x04\n\n\x0c\n\x05\x04\x02\x02\0\
+    \x01\x12\x03\x0e\x0b\x11\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\x0e\x14\
+    \x15\n\x0b\n\x04\x04\x02\x02\x01\x12\x03\x0f\x04\x14\n\x0c\n\x05\x04\x02\
+    \x02\x01\x05\x12\x03\x0f\x04\t\n\x0c\n\x05\x04\x02\x02\x01\x01\x12\x03\
+    \x0f\n\x0f\n\x0c\n\x05\x04\x02\x02\x01\x03\x12\x03\x0f\x12\x13\n\x0b\n\
+    \x04\x04\x02\x02\x02\x12\x03\x10\x04\x12\n\x0c\n\x05\x04\x02\x02\x02\x05\
+    \x12\x03\x10\x04\t\n\x0c\n\x05\x04\x02\x02\x02\x01\x12\x03\x10\n\r\n\x0c\
+    \n\x05\x04\x02\x02\x02\x03\x12\x03\x10\x10\x11\n\n\n\x02\x05\0\x12\x04\
+    \x12\0\x15\x01\n\n\n\x03\x05\0\x01\x12\x03\x12\x05\x0c\n\x0b\n\x04\x05\0\
+    \x02\0\x12\x03\x13\x04\x0e\n\x0c\n\x05\x05\0\x02\0\x01\x12\x03\x13\x04\t\
+    \n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x13\x0c\r\n\x0b\n\x04\x05\0\x02\x01\
+    \x12\x03\x14\x04\x0f\n\x0c\n\x05\x05\0\x02\x01\x01\x12\x03\x14\x04\n\n\
+    \x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x14\r\x0eb\x06proto3\
 ";
 
 static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

+ 3 - 3
shared-lib/flowy-document-infra/src/protobuf/proto/revision.proto → shared-lib/lib-ot/src/protobuf/proto/model.proto

@@ -1,8 +1,5 @@
 syntax = "proto3";
 
-message RevId {
-    int64 value = 1;
-}
 message Revision {
     int64 base_rev_id = 1;
     int64 rev_id = 2;
@@ -11,6 +8,9 @@ message Revision {
     string doc_id = 5;
     RevType ty = 6;
 }
+message RevId {
+    int64 value = 1;
+}
 message RevisionRange {
     string doc_id = 1;
     int64 start = 2;

+ 109 - 0
shared-lib/lib-ot/src/revision/cache.rs

@@ -0,0 +1,109 @@
+use crate::{
+    errors::OTError,
+    revision::{RevId, Revision, RevisionRange},
+};
+use dashmap::{mapref::one::RefMut, DashMap};
+use std::{collections::VecDeque, sync::Arc};
+use tokio::sync::{broadcast, RwLock};
+
+pub trait RevisionDiskCache {
+    fn create_revision(&self, revision: &Revision) -> Result<(), OTError>;
+    fn revisions_in_range(&self, range: RevisionRange) -> Result<Option<Vec<Revision>>, OTError>;
+    fn read_revision(&self, rev_id: i64) -> Result<Option<Revision>, OTError>;
+}
+
+pub struct RevisionMemoryCache {
+    revs_map: Arc<DashMap<i64, RevisionRecord>>,
+    pending_revs: Arc<RwLock<VecDeque<i64>>>,
+}
+
+impl std::default::Default for RevisionMemoryCache {
+    fn default() -> Self {
+        let pending_revs = Arc::new(RwLock::new(VecDeque::new()));
+        RevisionMemoryCache {
+            revs_map: Arc::new(DashMap::new()),
+            pending_revs,
+        }
+    }
+}
+
+impl RevisionMemoryCache {
+    pub fn new() -> Self { RevisionMemoryCache::default() }
+
+    pub async fn add_revision(&self, revision: Revision) -> Result<(), OTError> {
+        if self.revs_map.contains_key(&revision.rev_id) {
+            return Err(OTError::duplicate_revision().context(format!("Duplicate revision id: {}", revision.rev_id)));
+        }
+
+        self.pending_revs.write().await.push_back(revision.rev_id);
+        self.revs_map.insert(revision.rev_id, RevisionRecord::new(revision));
+        Ok(())
+    }
+
+    pub async fn mut_revision<F>(&self, rev_id: i64, f: F)
+    where
+        F: Fn(RefMut<i64, RevisionRecord>),
+    {
+        if let Some(m_revision) = self.revs_map.get_mut(&rev_id) {
+            f(m_revision)
+        } else {
+            log::error!("Can't find revision with id {}", rev_id);
+        }
+    }
+
+    pub async fn revisions_in_range(&self, range: RevisionRange) -> Result<Option<Vec<Revision>>, OTError> {
+        let revs = range
+            .iter()
+            .flat_map(|rev_id| match self.revs_map.get(&rev_id) {
+                None => None,
+                Some(rev) => Some(rev.revision.clone()),
+            })
+            .collect::<Vec<Revision>>();
+
+        if revs.len() == range.len() as usize {
+            Ok(Some(revs))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+pub type RevIdReceiver = broadcast::Receiver<i64>;
+pub type RevIdSender = broadcast::Sender<i64>;
+
+pub enum RevState {
+    Local = 0,
+    Acked = 1,
+}
+
+pub struct RevisionRecord {
+    pub revision: Revision,
+    pub state: RevState,
+}
+
+impl RevisionRecord {
+    pub fn new(revision: Revision) -> Self {
+        Self {
+            revision,
+            state: RevState::Local,
+        }
+    }
+}
+
+pub struct PendingRevId {
+    pub rev_id: i64,
+    pub sender: RevIdSender,
+}
+
+impl PendingRevId {
+    pub fn new(rev_id: i64, sender: RevIdSender) -> Self { Self { rev_id, sender } }
+
+    pub fn finish(&self, rev_id: i64) -> bool {
+        if self.rev_id > rev_id {
+            false
+        } else {
+            let _ = self.sender.send(self.rev_id);
+            true
+        }
+    }
+}

+ 5 - 0
shared-lib/lib-ot/src/revision/mod.rs

@@ -0,0 +1,5 @@
+mod cache;
+mod model;
+
+pub use cache::*;
+pub use model::*;

+ 41 - 59
shared-lib/flowy-document-infra/src/entities/doc/revision.rs → shared-lib/lib-ot/src/revision/model.rs

@@ -1,62 +1,7 @@
-use crate::{entities::doc::Doc, util::md5};
+use crate::rich_text::RichTextDelta;
 use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
-use lib_ot::rich_text::RichTextDelta;
 use std::{fmt::Formatter, ops::RangeInclusive};
 
-#[derive(Debug, ProtoBuf_Enum, Clone, Eq, PartialEq)]
-pub enum RevType {
-    Local  = 0,
-    Remote = 1,
-}
-
-impl RevType {
-    pub fn is_local(&self) -> bool { self == &RevType::Local }
-}
-
-impl std::default::Default for RevType {
-    fn default() -> Self { RevType::Local }
-}
-
-// [[i64 to bytes]]
-// use byteorder::{BigEndian, ReadBytesExt};
-// use std::{io::Cursor};
-// impl std::convert::TryFrom<Bytes> for RevId {
-//     type Error = DocError;
-//
-//     fn try_from(bytes: Bytes) -> Result<Self, Self::Error> {
-//         // let mut wtr = vec![];
-//         // let _ = wtr.write_i64::<BigEndian>(revision.rev_id);
-//
-//         let mut rdr = Cursor::new(bytes);
-//         match rdr.read_i64::<BigEndian>() {
-//             Ok(rev_id) => Ok(RevId(rev_id)),
-//             Err(e) => Err(DocError::internal().context(e)),
-//         }
-//     }
-// }
-
-#[derive(Clone, Debug, ProtoBuf, Default)]
-pub struct RevId {
-    #[pb(index = 1)]
-    pub value: i64,
-}
-
-impl AsRef<i64> for RevId {
-    fn as_ref(&self) -> &i64 { &self.value }
-}
-
-impl std::convert::From<RevId> for i64 {
-    fn from(rev_id: RevId) -> Self { rev_id.value }
-}
-
-impl std::convert::From<i64> for RevId {
-    fn from(value: i64) -> Self { RevId { value } }
-}
-
-impl std::fmt::Display for RevId {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!("{}", self.value)) }
-}
-
 #[derive(PartialEq, Eq, Clone, Default, ProtoBuf)]
 pub struct Revision {
     #[pb(index = 1)]
@@ -127,9 +72,40 @@ impl Revision {
     }
 }
 
-pub fn revision_from_doc(doc: Doc, ty: RevType) -> Revision {
-    let delta_data = doc.data.as_bytes();
-    Revision::new(doc.base_rev_id, doc.rev_id, delta_data.to_owned(), &doc.id, ty)
+#[derive(Clone, Debug, ProtoBuf, Default)]
+pub struct RevId {
+    #[pb(index = 1)]
+    pub value: i64,
+}
+
+impl AsRef<i64> for RevId {
+    fn as_ref(&self) -> &i64 { &self.value }
+}
+
+impl std::convert::From<RevId> for i64 {
+    fn from(rev_id: RevId) -> Self { rev_id.value }
+}
+
+impl std::convert::From<i64> for RevId {
+    fn from(value: i64) -> Self { RevId { value } }
+}
+
+impl std::fmt::Display for RevId {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_fmt(format_args!("{}", self.value)) }
+}
+
+#[derive(Debug, ProtoBuf_Enum, Clone, Eq, PartialEq)]
+pub enum RevType {
+    Local  = 0,
+    Remote = 1,
+}
+
+impl RevType {
+    pub fn is_local(&self) -> bool { self == &RevType::Local }
+}
+
+impl std::default::Default for RevType {
+    fn default() -> Self { RevType::Local }
 }
 
 #[derive(Debug, Clone, Default, ProtoBuf)]
@@ -161,3 +137,9 @@ impl RevisionRange {
         RangeInclusive::new(self.start, self.end)
     }
 }
+
+#[inline]
+pub fn md5<T: AsRef<[u8]>>(data: T) -> String {
+    let md5 = format!("{:x}", md5::compute(data));
+    md5
+}