Browse Source

create op table

appflowy 3 năm trước cách đây
mục cha
commit
302631f808
39 tập tin đã thay đổi với 523 bổ sung400 xóa
  1. 0 1
      app_flowy/lib/workspace/infrastructure/deps_resolver.dart
  2. 0 1
      app_flowy/lib/workspace/infrastructure/i_doc_impl.dart
  3. 3 3
      app_flowy/lib/workspace/infrastructure/repos/doc_repo.dart
  4. 0 2
      app_flowy/lib/workspace/infrastructure/repos/view_repo.dart
  5. 4 4
      app_flowy/packages/flowy_sdk/lib/dispatch/code_gen.dart
  6. 37 37
      app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/doc.pb.dart
  7. 9 9
      app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/doc.pbjson.dart
  8. 2 2
      app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/event.pbenum.dart
  9. 2 2
      app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/event.pbjson.dart
  10. 3 3
      backend/src/service/doc_service/doc.rs
  11. 2 2
      backend/src/service/doc_service/router.rs
  12. 8 1
      rust-lib/flowy-database/migrations/2021-09-22-074638_flowy-doc-op/up.sql
  13. 72 0
      rust-lib/flowy-database/src/macros.rs
  14. 11 0
      rust-lib/flowy-database/src/schema.rs
  15. 1 1
      rust-lib/flowy-derive/src/derive_cache/derive_cache.rs
  16. 4 4
      rust-lib/flowy-document/src/entities/doc/doc.rs
  17. 20 21
      rust-lib/flowy-document/src/module.rs
  18. 121 121
      rust-lib/flowy-document/src/protobuf/model/doc.rs
  19. 4 4
      rust-lib/flowy-document/src/protobuf/proto/doc.proto
  20. 17 11
      rust-lib/flowy-document/src/services/cache.rs
  21. 60 52
      rust-lib/flowy-document/src/services/doc/doc_controller.rs
  22. 10 7
      rust-lib/flowy-document/src/services/doc/edit_context.rs
  23. 6 4
      rust-lib/flowy-document/src/services/doc/mod.rs
  24. 0 0
      rust-lib/flowy-document/src/services/file/file.rs
  25. 0 0
      rust-lib/flowy-document/src/services/file/manager.rs
  26. 0 0
      rust-lib/flowy-document/src/services/file/mod.rs
  27. 0 2
      rust-lib/flowy-document/src/services/mod.rs
  28. 37 0
      rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs
  29. 54 0
      rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs
  30. 5 5
      rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs
  31. 4 0
      rust-lib/flowy-document/src/sql_tables/doc/mod.rs
  32. 5 8
      rust-lib/flowy-workspace/src/entities/view/view_update.rs
  33. 2 2
      rust-lib/flowy-workspace/src/event.rs
  34. 4 4
      rust-lib/flowy-workspace/src/handlers/view_handler.rs
  35. 0 71
      rust-lib/flowy-workspace/src/macros.rs
  36. 1 1
      rust-lib/flowy-workspace/src/module.rs
  37. 11 11
      rust-lib/flowy-workspace/src/protobuf/model/event.rs
  38. 1 1
      rust-lib/flowy-workspace/src/protobuf/proto/event.proto
  39. 3 3
      rust-lib/flowy-workspace/src/services/view_controller.rs

+ 0 - 1
app_flowy/lib/workspace/infrastructure/deps_resolver.dart

@@ -18,7 +18,6 @@ import 'package:app_flowy/workspace/infrastructure/repos/app_repo.dart';
 import 'package:app_flowy/workspace/infrastructure/repos/doc_repo.dart';
 import 'package:app_flowy/workspace/infrastructure/repos/view_repo.dart';
 import 'package:app_flowy/workspace/infrastructure/repos/workspace_repo.dart';
-import 'package:flowy_editor/flowy_editor.dart';
 import 'package:flowy_sdk/protobuf/flowy-user/user_profile.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-workspace/view_create.pb.dart';
 import 'package:get_it/get_it.dart';

+ 0 - 1
app_flowy/lib/workspace/infrastructure/i_doc_impl.dart

@@ -4,7 +4,6 @@ import 'dart:typed_data';
 import 'package:dartz/dartz.dart';
 import 'package:app_flowy/workspace/domain/i_doc.dart';
 import 'package:app_flowy/workspace/infrastructure/repos/doc_repo.dart';
-import 'package:flowy_editor/src/model/quill_delta.dart';
 import 'package:flowy_sdk/protobuf/flowy-document/doc.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-workspace/errors.pb.dart';
 

+ 3 - 3
app_flowy/lib/workspace/infrastructure/repos/doc_repo.dart

@@ -20,10 +20,10 @@ class DocRepository {
 
   Future<Either<Doc, WorkspaceError>> applyChangeset(
       {required Uint8List data}) {
-    final request = ApplyChangesetRequest.create()
-      ..viewId = docId
+    final request = DocDelta.create()
+      ..docId = docId
       ..data = data;
-    return WorkspaceEventApplyChangeset(request).send();
+    return WorkspaceEventApplyDocDelta(request).send();
   }
 
   Future<Either<Unit, WorkspaceError>> closeDoc(

+ 0 - 2
app_flowy/lib/workspace/infrastructure/repos/view_repo.dart

@@ -1,8 +1,6 @@
 import 'dart:async';
 import 'dart:typed_data';
-
 import 'package:dartz/dartz.dart';
-import 'package:flowy_log/flowy_log.dart';
 import 'package:flowy_sdk/dispatch/dispatch.dart';
 import 'package:flowy_sdk/protobuf/flowy-observable/subject.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-workspace/errors.pb.dart';

+ 4 - 4
app_flowy/packages/flowy_sdk/lib/dispatch/code_gen.dart

@@ -254,13 +254,13 @@ class WorkspaceEventOpenView {
     }
 }
 
-class WorkspaceEventApplyChangeset {
-     ApplyChangesetRequest request;
-     WorkspaceEventApplyChangeset(this.request);
+class WorkspaceEventApplyDocDelta {
+     DocDelta request;
+     WorkspaceEventApplyDocDelta(this.request);
 
     Future<Either<Doc, WorkspaceError>> send() {
     final request = FFIRequest.create()
-          ..event = WorkspaceEvent.ApplyChangeset.toString()
+          ..event = WorkspaceEvent.ApplyDocDelta.toString()
           ..payload = requestToBytes(this.request);
 
     return Dispatch.asyncRequest(request)

+ 37 - 37
app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/doc.pb.dart

@@ -148,22 +148,22 @@ class Doc extends $pb.GeneratedMessage {
 
 class UpdateDocParams extends $pb.GeneratedMessage {
   static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'UpdateDocParams', createEmptyInstance: create)
-    ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id')
-    ..a<$core.List<$core.int>>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docData', $pb.PbFieldType.OY)
+    ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId')
+    ..a<$core.List<$core.int>>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY)
     ..hasRequiredFields = false
   ;
 
   UpdateDocParams._() : super();
   factory UpdateDocParams({
-    $core.String? id,
-    $core.List<$core.int>? docData,
+    $core.String? docId,
+    $core.List<$core.int>? data,
   }) {
     final _result = create();
-    if (id != null) {
-      _result.id = id;
+    if (docId != null) {
+      _result.docId = docId;
     }
-    if (docData != null) {
-      _result.docData = docData;
+    if (data != null) {
+      _result.data = data;
     }
     return _result;
   }
@@ -189,74 +189,74 @@ class UpdateDocParams extends $pb.GeneratedMessage {
   static UpdateDocParams? _defaultInstance;
 
   @$pb.TagNumber(1)
-  $core.String get id => $_getSZ(0);
+  $core.String get docId => $_getSZ(0);
   @$pb.TagNumber(1)
-  set id($core.String v) { $_setString(0, v); }
+  set docId($core.String v) { $_setString(0, v); }
   @$pb.TagNumber(1)
-  $core.bool hasId() => $_has(0);
+  $core.bool hasDocId() => $_has(0);
   @$pb.TagNumber(1)
-  void clearId() => clearField(1);
+  void clearDocId() => clearField(1);
 
   @$pb.TagNumber(2)
-  $core.List<$core.int> get docData => $_getN(1);
+  $core.List<$core.int> get data => $_getN(1);
   @$pb.TagNumber(2)
-  set docData($core.List<$core.int> v) { $_setBytes(1, v); }
+  set data($core.List<$core.int> v) { $_setBytes(1, v); }
   @$pb.TagNumber(2)
-  $core.bool hasDocData() => $_has(1);
+  $core.bool hasData() => $_has(1);
   @$pb.TagNumber(2)
-  void clearDocData() => clearField(2);
+  void clearData() => clearField(2);
 }
 
-class DocChangeset extends $pb.GeneratedMessage {
-  static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'DocChangeset', createEmptyInstance: create)
-    ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'id')
+class DocDelta extends $pb.GeneratedMessage {
+  static final $pb.BuilderInfo _i = $pb.BuilderInfo(const $core.bool.fromEnvironment('protobuf.omit_message_names') ? '' : 'DocDelta', createEmptyInstance: create)
+    ..aOS(1, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'docId')
     ..a<$core.List<$core.int>>(2, const $core.bool.fromEnvironment('protobuf.omit_field_names') ? '' : 'data', $pb.PbFieldType.OY)
     ..hasRequiredFields = false
   ;
 
-  DocChangeset._() : super();
-  factory DocChangeset({
-    $core.String? id,
+  DocDelta._() : super();
+  factory DocDelta({
+    $core.String? docId,
     $core.List<$core.int>? data,
   }) {
     final _result = create();
-    if (id != null) {
-      _result.id = id;
+    if (docId != null) {
+      _result.docId = docId;
     }
     if (data != null) {
       _result.data = data;
     }
     return _result;
   }
-  factory DocChangeset.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
-  factory DocChangeset.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
+  factory DocDelta.fromBuffer($core.List<$core.int> i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromBuffer(i, r);
+  factory DocDelta.fromJson($core.String i, [$pb.ExtensionRegistry r = $pb.ExtensionRegistry.EMPTY]) => create()..mergeFromJson(i, r);
   @$core.Deprecated(
   'Using this can add significant overhead to your binary. '
   'Use [GeneratedMessageGenericExtensions.deepCopy] instead. '
   'Will be removed in next major version')
-  DocChangeset clone() => DocChangeset()..mergeFromMessage(this);
+  DocDelta clone() => DocDelta()..mergeFromMessage(this);
   @$core.Deprecated(
   'Using this can add significant overhead to your binary. '
   'Use [GeneratedMessageGenericExtensions.rebuild] instead. '
   'Will be removed in next major version')
-  DocChangeset copyWith(void Function(DocChangeset) updates) => super.copyWith((message) => updates(message as DocChangeset)) as DocChangeset; // ignore: deprecated_member_use
+  DocDelta copyWith(void Function(DocDelta) updates) => super.copyWith((message) => updates(message as DocDelta)) as DocDelta; // ignore: deprecated_member_use
   $pb.BuilderInfo get info_ => _i;
   @$core.pragma('dart2js:noInline')
-  static DocChangeset create() => DocChangeset._();
-  DocChangeset createEmptyInstance() => create();
-  static $pb.PbList<DocChangeset> createRepeated() => $pb.PbList<DocChangeset>();
+  static DocDelta create() => DocDelta._();
+  DocDelta createEmptyInstance() => create();
+  static $pb.PbList<DocDelta> createRepeated() => $pb.PbList<DocDelta>();
   @$core.pragma('dart2js:noInline')
-  static DocChangeset getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<DocChangeset>(create);
-  static DocChangeset? _defaultInstance;
+  static DocDelta getDefault() => _defaultInstance ??= $pb.GeneratedMessage.$_defaultFor<DocDelta>(create);
+  static DocDelta? _defaultInstance;
 
   @$pb.TagNumber(1)
-  $core.String get id => $_getSZ(0);
+  $core.String get docId => $_getSZ(0);
   @$pb.TagNumber(1)
-  set id($core.String v) { $_setString(0, v); }
+  set docId($core.String v) { $_setString(0, v); }
   @$pb.TagNumber(1)
-  $core.bool hasId() => $_has(0);
+  $core.bool hasDocId() => $_has(0);
   @$pb.TagNumber(1)
-  void clearId() => clearField(1);
+  void clearDocId() => clearField(1);
 
   @$pb.TagNumber(2)
   $core.List<$core.int> get data => $_getN(1);

+ 9 - 9
app_flowy/packages/flowy_sdk/lib/protobuf/flowy-document/doc.pbjson.dart

@@ -35,24 +35,24 @@ final $typed_data.Uint8List docDescriptor = $convert.base64Decode('CgNEb2MSDgoCa
 const UpdateDocParams$json = const {
   '1': 'UpdateDocParams',
   '2': const [
-    const {'1': 'id', '3': 1, '4': 1, '5': 9, '10': 'id'},
-    const {'1': 'doc_data', '3': 2, '4': 1, '5': 12, '10': 'docData'},
+    const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'},
+    const {'1': 'data', '3': 2, '4': 1, '5': 12, '10': 'data'},
   ],
 };
 
 /// Descriptor for `UpdateDocParams`. Decode as a `google.protobuf.DescriptorProto`.
-final $typed_data.Uint8List updateDocParamsDescriptor = $convert.base64Decode('Cg9VcGRhdGVEb2NQYXJhbXMSDgoCaWQYASABKAlSAmlkEhkKCGRvY19kYXRhGAIgASgMUgdkb2NEYXRh');
-@$core.Deprecated('Use docChangesetDescriptor instead')
-const DocChangeset$json = const {
-  '1': 'DocChangeset',
+final $typed_data.Uint8List updateDocParamsDescriptor = $convert.base64Decode('Cg9VcGRhdGVEb2NQYXJhbXMSFQoGZG9jX2lkGAEgASgJUgVkb2NJZBISCgRkYXRhGAIgASgMUgRkYXRh');
+@$core.Deprecated('Use docDeltaDescriptor instead')
+const DocDelta$json = const {
+  '1': 'DocDelta',
   '2': const [
-    const {'1': 'id', '3': 1, '4': 1, '5': 9, '10': 'id'},
+    const {'1': 'doc_id', '3': 1, '4': 1, '5': 9, '10': 'docId'},
     const {'1': 'data', '3': 2, '4': 1, '5': 12, '10': 'data'},
   ],
 };
 
-/// Descriptor for `DocChangeset`. Decode as a `google.protobuf.DescriptorProto`.
-final $typed_data.Uint8List docChangesetDescriptor = $convert.base64Decode('CgxEb2NDaGFuZ2VzZXQSDgoCaWQYASABKAlSAmlkEhIKBGRhdGEYAiABKAxSBGRhdGE=');
+/// Descriptor for `DocDelta`. Decode as a `google.protobuf.DescriptorProto`.
+final $typed_data.Uint8List docDeltaDescriptor = $convert.base64Decode('CghEb2NEZWx0YRIVCgZkb2NfaWQYASABKAlSBWRvY0lkEhIKBGRhdGEYAiABKAxSBGRhdGE=');
 @$core.Deprecated('Use queryDocParamsDescriptor instead')
 const QueryDocParams$json = const {
   '1': 'QueryDocParams',

+ 2 - 2
app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/event.pbenum.dart

@@ -25,7 +25,7 @@ class WorkspaceEvent extends $pb.ProtobufEnum {
   static const WorkspaceEvent UpdateView = WorkspaceEvent._(203, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'UpdateView');
   static const WorkspaceEvent DeleteView = WorkspaceEvent._(204, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'DeleteView');
   static const WorkspaceEvent OpenView = WorkspaceEvent._(205, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'OpenView');
-  static const WorkspaceEvent ApplyChangeset = WorkspaceEvent._(206, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'ApplyChangeset');
+  static const WorkspaceEvent ApplyDocDelta = WorkspaceEvent._(206, const $core.bool.fromEnvironment('protobuf.omit_enum_names') ? '' : 'ApplyDocDelta');
 
   static const $core.List<WorkspaceEvent> values = <WorkspaceEvent> [
     CreateWorkspace,
@@ -43,7 +43,7 @@ class WorkspaceEvent extends $pb.ProtobufEnum {
     UpdateView,
     DeleteView,
     OpenView,
-    ApplyChangeset,
+    ApplyDocDelta,
   ];
 
   static final $core.Map<$core.int, WorkspaceEvent> _byValue = $pb.ProtobufEnum.initByValue(values);

+ 2 - 2
app_flowy/packages/flowy_sdk/lib/protobuf/flowy-workspace/event.pbjson.dart

@@ -27,9 +27,9 @@ const WorkspaceEvent$json = const {
     const {'1': 'UpdateView', '2': 203},
     const {'1': 'DeleteView', '2': 204},
     const {'1': 'OpenView', '2': 205},
-    const {'1': 'ApplyChangeset', '2': 206},
+    const {'1': 'ApplyDocDelta', '2': 206},
   ],
 };
 
 /// Descriptor for `WorkspaceEvent`. Decode as a `google.protobuf.EnumDescriptorProto`.
-final $typed_data.Uint8List workspaceEventDescriptor = $convert.base64Decode('Cg5Xb3Jrc3BhY2VFdmVudBITCg9DcmVhdGVXb3Jrc3BhY2UQABIUChBSZWFkQ3VyV29ya3NwYWNlEAESEgoOUmVhZFdvcmtzcGFjZXMQAhITCg9EZWxldGVXb3Jrc3BhY2UQAxIRCg1PcGVuV29ya3NwYWNlEAQSFQoRUmVhZFdvcmtzcGFjZUFwcHMQBRINCglDcmVhdGVBcHAQZRINCglEZWxldGVBcHAQZhILCgdSZWFkQXBwEGcSDQoJVXBkYXRlQXBwEGgSDwoKQ3JlYXRlVmlldxDJARINCghSZWFkVmlldxDKARIPCgpVcGRhdGVWaWV3EMsBEg8KCkRlbGV0ZVZpZXcQzAESDQoIT3BlblZpZXcQzQESEwoOQXBwbHlDaGFuZ2VzZXQQzgE=');
+final $typed_data.Uint8List workspaceEventDescriptor = $convert.base64Decode('Cg5Xb3Jrc3BhY2VFdmVudBITCg9DcmVhdGVXb3Jrc3BhY2UQABIUChBSZWFkQ3VyV29ya3NwYWNlEAESEgoOUmVhZFdvcmtzcGFjZXMQAhITCg9EZWxldGVXb3Jrc3BhY2UQAxIRCg1PcGVuV29ya3NwYWNlEAQSFQoRUmVhZFdvcmtzcGFjZUFwcHMQBRINCglDcmVhdGVBcHAQZRINCglEZWxldGVBcHAQZhILCgdSZWFkQXBwEGcSDQoJVXBkYXRlQXBwEGgSDwoKQ3JlYXRlVmlldxDJARINCghSZWFkVmlldxDKARIPCgpVcGRhdGVWaWV3EMsBEg8KCkRlbGV0ZVZpZXcQzAESDQoIT3BlblZpZXcQzQESEgoNQXBwbHlEb2NEZWx0YRDOAQ==');

+ 3 - 3
backend/src/service/doc_service/doc.rs

@@ -4,7 +4,7 @@ use crate::{
     sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder},
 };
 use anyhow::Context;
-use flowy_document::protobuf::{CreateDocParams, Doc, QueryDocParams, SaveDocParams};
+use flowy_document::protobuf::{CreateDocParams, Doc, QueryDocParams, UpdateDocParams};
 use flowy_net::{errors::ServerError, response::FlowyResponse};
 use sqlx::{postgres::PgArguments, PgPool, Postgres};
 use uuid::Uuid;
@@ -55,9 +55,9 @@ pub(crate) async fn read_doc(
 
 pub(crate) async fn update_doc(
     pool: &PgPool,
-    mut params: SaveDocParams,
+    mut params: UpdateDocParams,
 ) -> Result<FlowyResponse, ServerError> {
-    let doc_id = Uuid::parse_str(&params.id)?;
+    let doc_id = Uuid::parse_str(&params.doc_id)?;
     let mut transaction = pool
         .begin()
         .await

+ 2 - 2
backend/src/service/doc_service/router.rs

@@ -4,7 +4,7 @@ use actix_web::{
 };
 use sqlx::PgPool;
 
-use flowy_document::protobuf::{QueryDocParams, SaveDocParams};
+use flowy_document::protobuf::{QueryDocParams, UpdateDocParams};
 use flowy_net::errors::ServerError;
 
 use crate::service::{
@@ -25,7 +25,7 @@ pub async fn update_handler(
     payload: Payload,
     pool: Data<PgPool>,
 ) -> Result<HttpResponse, ServerError> {
-    let params: SaveDocParams = parse_from_payload(payload).await?;
+    let params: UpdateDocParams = parse_from_payload(payload).await?;
     let response = update_doc(pool.get_ref(), params).await?;
     Ok(response.into())
 }

+ 8 - 1
rust-lib/flowy-database/migrations/2021-09-22-074638_flowy-doc-op/up.sql

@@ -1 +1,8 @@
--- Your SQL goes here
+-- Your SQL goes here
+CREATE TABLE op_table (
+    base_rev BIGINT NOT NULL DEFAULT 0,
+    rev BIGINT NOT NULL PRIMARY KEY,
+    data BLOB NOT NULL DEFAULT (x''),
+    md5 TEXT NOT NULL DEFAULT '',
+    state INTEGER NOT NULL DEFAULT 0
+);

+ 72 - 0
rust-lib/flowy-database/src/macros.rs

@@ -86,3 +86,75 @@ macro_rules! diesel_delete_table {
         debug_assert_eq!(affected_row, 1);
     };
 }
+
+#[macro_export]
+macro_rules! impl_sql_binary_expression {
+    ($target:ident) => {
+        impl diesel::serialize::ToSql<diesel::sql_types::Binary, diesel::sqlite::Sqlite> for $target {
+            fn to_sql<W: std::io::Write>(
+                &self,
+                out: &mut diesel::serialize::Output<W, diesel::sqlite::Sqlite>,
+            ) -> diesel::serialize::Result {
+                let bytes: Vec<u8> = self.try_into().map_err(|e| format!("{:?}", e))?;
+                diesel::serialize::ToSql::<diesel::sql_types::Binary, diesel::sqlite::Sqlite>::to_sql(&bytes, out)
+            }
+        }
+        // https://docs.diesel.rs/src/diesel/sqlite/types/mod.rs.html#30-33
+        // impl FromSql<sql_types::Binary, Sqlite> for *const [u8] {
+        //     fn from_sql(bytes: Option<&SqliteValue>) -> deserialize::Result<Self> {
+        //         let bytes = not_none!(bytes).read_blob();
+        //         Ok(bytes as *const _)
+        //     }
+        // }
+        impl<DB> diesel::deserialize::FromSql<diesel::sql_types::Binary, DB> for $target
+        where
+            DB: diesel::backend::Backend,
+            *const [u8]: diesel::deserialize::FromSql<diesel::sql_types::Binary, DB>,
+        {
+            fn from_sql(bytes: Option<&DB::RawValue>) -> diesel::deserialize::Result<Self> {
+                let slice_ptr = <*const [u8] as diesel::deserialize::FromSql<diesel::sql_types::Binary, DB>>::from_sql(bytes)?;
+                let bytes = unsafe { &*slice_ptr };
+
+                match $target::try_from(bytes) {
+                    Ok(object) => Ok(object),
+                    Err(e) => {
+                        log::error!("{:?} deserialize from bytes fail. {:?}", std::any::type_name::<$target>(), e);
+                        panic!();
+                    },
+                }
+            }
+        }
+    };
+}
+
+#[macro_export]
+macro_rules! impl_sql_integer_expression {
+    ($target:ident) => {
+        use diesel::{
+            deserialize,
+            deserialize::FromSql,
+            serialize,
+            serialize::{Output, ToSql},
+        };
+        use std::io::Write;
+
+        impl<DB> ToSql<Integer, DB> for $target
+        where
+            DB: diesel::backend::Backend,
+            i32: ToSql<Integer, DB>,
+        {
+            fn to_sql<W: Write>(&self, out: &mut Output<W, DB>) -> serialize::Result { (*self as i32).to_sql(out) }
+        }
+
+        impl<DB> FromSql<Integer, DB> for $target
+        where
+            DB: diesel::backend::Backend,
+            i32: FromSql<Integer, DB>,
+        {
+            fn from_sql(bytes: Option<&DB::RawValue>) -> deserialize::Result<Self> {
+                let smaill_int = i32::from_sql(bytes)?;
+                Ok($target::from(smaill_int))
+            }
+        }
+    };
+}

+ 11 - 0
rust-lib/flowy-database/src/schema.rs

@@ -21,6 +21,16 @@ table! {
     }
 }
 
+table! {
+    op_table (rev) {
+        base_rev -> BigInt,
+        rev -> BigInt,
+        data -> Binary,
+        md5 -> Text,
+        state -> Integer,
+    }
+}
+
 table! {
     user_table (id) {
         id -> Text,
@@ -61,6 +71,7 @@ table! {
 allow_tables_to_appear_in_same_query!(
     app_table,
     doc_table,
+    op_table,
     user_table,
     view_table,
     workspace_table,

+ 1 - 1
rust-lib/flowy-derive/src/derive_cache/derive_cache.rs

@@ -58,7 +58,7 @@ pub fn category_from_str(type_str: &str) -> TypeCategory {
         | "CreateDocParams"
         | "Doc"
         | "UpdateDocParams"
-        | "DocChangeset"
+        | "DocDelta"
         | "QueryDocParams"
         | "WsDocumentData"
         | "DocError"

+ 4 - 4
rust-lib/flowy-document/src/entities/doc/doc.rs

@@ -28,16 +28,16 @@ pub struct Doc {
 #[derive(ProtoBuf, Default, Debug, Clone)]
 pub struct UpdateDocParams {
     #[pb(index = 1)]
-    pub id: String,
+    pub doc_id: String,
 
     #[pb(index = 2)]
-    pub doc_data: Vec<u8>,
+    pub data: Vec<u8>,
 }
 
 #[derive(ProtoBuf, Default, Debug, Clone)]
-pub struct DocChangeset {
+pub struct DocDelta {
     #[pb(index = 1)]
-    pub id: String,
+    pub doc_id: String,
 
     #[pb(index = 2)]
     pub data: Vec<u8>, // Delta

+ 20 - 21
rust-lib/flowy-document/src/module.rs

@@ -1,13 +1,16 @@
-use crate::{
-    entities::doc::{CreateDocParams, Doc, DocChangeset, QueryDocParams},
-    errors::DocError,
-    services::{doc_controller::DocController, server::construct_doc_server, ws::WsManager},
-};
+use std::sync::Arc;
+
 use bytes::Bytes;
 use diesel::SqliteConnection;
-use flowy_database::ConnectionPool;
 use parking_lot::RwLock;
-use std::sync::Arc;
+
+use flowy_database::ConnectionPool;
+
+use crate::{
+    entities::doc::{CreateDocParams, Doc, DocDelta, QueryDocParams},
+    errors::DocError,
+    services::{doc::doc_controller::DocController, server::construct_doc_server, ws::WsManager},
+};
 
 pub trait DocumentUser: Send + Sync {
     fn user_dir(&self) -> Result<String, DocError>;
@@ -16,39 +19,35 @@ pub trait DocumentUser: Send + Sync {
 }
 
 pub struct FlowyDocument {
-    controller: Arc<DocController>,
+    doc_ctrl: Arc<DocController>,
 }
 
 impl FlowyDocument {
     pub fn new(user: Arc<dyn DocumentUser>, ws_manager: Arc<RwLock<WsManager>>) -> FlowyDocument {
         let server = construct_doc_server();
         let controller = Arc::new(DocController::new(server.clone(), user.clone(), ws_manager.clone()));
-        Self { controller }
+        Self { doc_ctrl: controller }
     }
 
     pub fn create(&self, params: CreateDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
-        let _ = self.controller.create(params, conn)?;
+        let _ = self.doc_ctrl.create(params, conn)?;
         Ok(())
     }
 
     pub fn delete(&self, params: QueryDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
-        let _ = self.controller.delete(params, conn)?;
+        let _ = self.doc_ctrl.delete(params, conn)?;
         Ok(())
     }
 
     pub async fn open(&self, params: QueryDocParams, pool: Arc<ConnectionPool>) -> Result<Doc, DocError> {
-        let open_doc = self.controller.open(params, pool).await?;
+        let open_doc = self.doc_ctrl.open(params, pool).await?;
         Ok(open_doc.doc())
     }
 
-    pub async fn apply_changeset(&self, params: DocChangeset, pool: Arc<ConnectionPool>) -> Result<Doc, DocError> {
-        // let _ = self.doc_manager.apply_changeset(&params.id,
-        // Bytes::from(params.data), pool).await?;
-        //
-        // // workaround: compare the rust's delta with flutter's delta. Will be removed
-        // // very soon
-        // let doc = self.doc_manager.read_doc(&params.id)?;
-        // Ok(doc)
-        unimplemented!()
+    pub async fn apply_doc_delta(&self, params: DocDelta, pool: Arc<ConnectionPool>) -> Result<Doc, DocError> {
+        // workaround: compare the rust's delta with flutter's delta. Will be removed
+        // very soon
+        let doc = self.doc_ctrl.edit_doc(params.clone(), pool)?;
+        Ok(doc)
     }
 }

+ 121 - 121
rust-lib/flowy-document/src/protobuf/model/doc.rs

@@ -463,8 +463,8 @@ impl ::protobuf::reflect::ProtobufValue for Doc {
 #[derive(PartialEq,Clone,Default)]
 pub struct UpdateDocParams {
     // message fields
-    pub id: ::std::string::String,
-    pub doc_data: ::std::vec::Vec<u8>,
+    pub doc_id: ::std::string::String,
+    pub data: ::std::vec::Vec<u8>,
     // special fields
     pub unknown_fields: ::protobuf::UnknownFields,
     pub cached_size: ::protobuf::CachedSize,
@@ -481,56 +481,56 @@ impl UpdateDocParams {
         ::std::default::Default::default()
     }
 
-    // string id = 1;
+    // string doc_id = 1;
 
 
-    pub fn get_id(&self) -> &str {
-        &self.id
+    pub fn get_doc_id(&self) -> &str {
+        &self.doc_id
     }
-    pub fn clear_id(&mut self) {
-        self.id.clear();
+    pub fn clear_doc_id(&mut self) {
+        self.doc_id.clear();
     }
 
     // Param is passed by value, moved
-    pub fn set_id(&mut self, v: ::std::string::String) {
-        self.id = v;
+    pub fn set_doc_id(&mut self, v: ::std::string::String) {
+        self.doc_id = v;
     }
 
     // Mutable pointer to the field.
     // If field is not initialized, it is initialized with default value first.
-    pub fn mut_id(&mut self) -> &mut ::std::string::String {
-        &mut self.id
+    pub fn mut_doc_id(&mut self) -> &mut ::std::string::String {
+        &mut self.doc_id
     }
 
     // Take field
-    pub fn take_id(&mut self) -> ::std::string::String {
-        ::std::mem::replace(&mut self.id, ::std::string::String::new())
+    pub fn take_doc_id(&mut self) -> ::std::string::String {
+        ::std::mem::replace(&mut self.doc_id, ::std::string::String::new())
     }
 
-    // bytes doc_data = 2;
+    // bytes data = 2;
 
 
-    pub fn get_doc_data(&self) -> &[u8] {
-        &self.doc_data
+    pub fn get_data(&self) -> &[u8] {
+        &self.data
     }
-    pub fn clear_doc_data(&mut self) {
-        self.doc_data.clear();
+    pub fn clear_data(&mut self) {
+        self.data.clear();
     }
 
     // Param is passed by value, moved
-    pub fn set_doc_data(&mut self, v: ::std::vec::Vec<u8>) {
-        self.doc_data = v;
+    pub fn set_data(&mut self, v: ::std::vec::Vec<u8>) {
+        self.data = v;
     }
 
     // Mutable pointer to the field.
     // If field is not initialized, it is initialized with default value first.
-    pub fn mut_doc_data(&mut self) -> &mut ::std::vec::Vec<u8> {
-        &mut self.doc_data
+    pub fn mut_data(&mut self) -> &mut ::std::vec::Vec<u8> {
+        &mut self.data
     }
 
     // Take field
-    pub fn take_doc_data(&mut self) -> ::std::vec::Vec<u8> {
-        ::std::mem::replace(&mut self.doc_data, ::std::vec::Vec::new())
+    pub fn take_data(&mut self) -> ::std::vec::Vec<u8> {
+        ::std::mem::replace(&mut self.data, ::std::vec::Vec::new())
     }
 }
 
@@ -544,10 +544,10 @@ impl ::protobuf::Message for UpdateDocParams {
             let (field_number, wire_type) = is.read_tag_unpack()?;
             match field_number {
                 1 => {
-                    ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.id)?;
+                    ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.doc_id)?;
                 },
                 2 => {
-                    ::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.doc_data)?;
+                    ::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.data)?;
                 },
                 _ => {
                     ::protobuf::rt::read_unknown_or_skip_group(field_number, wire_type, is, self.mut_unknown_fields())?;
@@ -561,11 +561,11 @@ impl ::protobuf::Message for UpdateDocParams {
     #[allow(unused_variables)]
     fn compute_size(&self) -> u32 {
         let mut my_size = 0;
-        if !self.id.is_empty() {
-            my_size += ::protobuf::rt::string_size(1, &self.id);
+        if !self.doc_id.is_empty() {
+            my_size += ::protobuf::rt::string_size(1, &self.doc_id);
         }
-        if !self.doc_data.is_empty() {
-            my_size += ::protobuf::rt::bytes_size(2, &self.doc_data);
+        if !self.data.is_empty() {
+            my_size += ::protobuf::rt::bytes_size(2, &self.data);
         }
         my_size += ::protobuf::rt::unknown_fields_size(self.get_unknown_fields());
         self.cached_size.set(my_size);
@@ -573,11 +573,11 @@ impl ::protobuf::Message for UpdateDocParams {
     }
 
     fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
-        if !self.id.is_empty() {
-            os.write_string(1, &self.id)?;
+        if !self.doc_id.is_empty() {
+            os.write_string(1, &self.doc_id)?;
         }
-        if !self.doc_data.is_empty() {
-            os.write_bytes(2, &self.doc_data)?;
+        if !self.data.is_empty() {
+            os.write_bytes(2, &self.data)?;
         }
         os.write_unknown_fields(self.get_unknown_fields())?;
         ::std::result::Result::Ok(())
@@ -618,14 +618,14 @@ impl ::protobuf::Message for UpdateDocParams {
         descriptor.get(|| {
             let mut fields = ::std::vec::Vec::new();
             fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
-                "id",
-                |m: &UpdateDocParams| { &m.id },
-                |m: &mut UpdateDocParams| { &mut m.id },
+                "doc_id",
+                |m: &UpdateDocParams| { &m.doc_id },
+                |m: &mut UpdateDocParams| { &mut m.doc_id },
             ));
             fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
-                "doc_data",
-                |m: &UpdateDocParams| { &m.doc_data },
-                |m: &mut UpdateDocParams| { &mut m.doc_data },
+                "data",
+                |m: &UpdateDocParams| { &m.data },
+                |m: &mut UpdateDocParams| { &mut m.data },
             ));
             ::protobuf::reflect::MessageDescriptor::new_pb_name::<UpdateDocParams>(
                 "UpdateDocParams",
@@ -643,8 +643,8 @@ impl ::protobuf::Message for UpdateDocParams {
 
 impl ::protobuf::Clear for UpdateDocParams {
     fn clear(&mut self) {
-        self.id.clear();
-        self.doc_data.clear();
+        self.doc_id.clear();
+        self.data.clear();
         self.unknown_fields.clear();
     }
 }
@@ -662,50 +662,50 @@ impl ::protobuf::reflect::ProtobufValue for UpdateDocParams {
 }
 
 #[derive(PartialEq,Clone,Default)]
-pub struct DocChangeset {
+pub struct DocDelta {
     // message fields
-    pub id: ::std::string::String,
+    pub doc_id: ::std::string::String,
     pub data: ::std::vec::Vec<u8>,
     // special fields
     pub unknown_fields: ::protobuf::UnknownFields,
     pub cached_size: ::protobuf::CachedSize,
 }
 
-impl<'a> ::std::default::Default for &'a DocChangeset {
-    fn default() -> &'a DocChangeset {
-        <DocChangeset as ::protobuf::Message>::default_instance()
+impl<'a> ::std::default::Default for &'a DocDelta {
+    fn default() -> &'a DocDelta {
+        <DocDelta as ::protobuf::Message>::default_instance()
     }
 }
 
-impl DocChangeset {
-    pub fn new() -> DocChangeset {
+impl DocDelta {
+    pub fn new() -> DocDelta {
         ::std::default::Default::default()
     }
 
-    // string id = 1;
+    // string doc_id = 1;
 
 
-    pub fn get_id(&self) -> &str {
-        &self.id
+    pub fn get_doc_id(&self) -> &str {
+        &self.doc_id
     }
-    pub fn clear_id(&mut self) {
-        self.id.clear();
+    pub fn clear_doc_id(&mut self) {
+        self.doc_id.clear();
     }
 
     // Param is passed by value, moved
-    pub fn set_id(&mut self, v: ::std::string::String) {
-        self.id = v;
+    pub fn set_doc_id(&mut self, v: ::std::string::String) {
+        self.doc_id = v;
     }
 
     // Mutable pointer to the field.
     // If field is not initialized, it is initialized with default value first.
-    pub fn mut_id(&mut self) -> &mut ::std::string::String {
-        &mut self.id
+    pub fn mut_doc_id(&mut self) -> &mut ::std::string::String {
+        &mut self.doc_id
     }
 
     // Take field
-    pub fn take_id(&mut self) -> ::std::string::String {
-        ::std::mem::replace(&mut self.id, ::std::string::String::new())
+    pub fn take_doc_id(&mut self) -> ::std::string::String {
+        ::std::mem::replace(&mut self.doc_id, ::std::string::String::new())
     }
 
     // bytes data = 2;
@@ -735,7 +735,7 @@ impl DocChangeset {
     }
 }
 
-impl ::protobuf::Message for DocChangeset {
+impl ::protobuf::Message for DocDelta {
     fn is_initialized(&self) -> bool {
         true
     }
@@ -745,7 +745,7 @@ impl ::protobuf::Message for DocChangeset {
             let (field_number, wire_type) = is.read_tag_unpack()?;
             match field_number {
                 1 => {
-                    ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.id)?;
+                    ::protobuf::rt::read_singular_proto3_string_into(wire_type, is, &mut self.doc_id)?;
                 },
                 2 => {
                     ::protobuf::rt::read_singular_proto3_bytes_into(wire_type, is, &mut self.data)?;
@@ -762,8 +762,8 @@ impl ::protobuf::Message for DocChangeset {
     #[allow(unused_variables)]
     fn compute_size(&self) -> u32 {
         let mut my_size = 0;
-        if !self.id.is_empty() {
-            my_size += ::protobuf::rt::string_size(1, &self.id);
+        if !self.doc_id.is_empty() {
+            my_size += ::protobuf::rt::string_size(1, &self.doc_id);
         }
         if !self.data.is_empty() {
             my_size += ::protobuf::rt::bytes_size(2, &self.data);
@@ -774,8 +774,8 @@ impl ::protobuf::Message for DocChangeset {
     }
 
     fn write_to_with_cached_sizes(&self, os: &mut ::protobuf::CodedOutputStream<'_>) -> ::protobuf::ProtobufResult<()> {
-        if !self.id.is_empty() {
-            os.write_string(1, &self.id)?;
+        if !self.doc_id.is_empty() {
+            os.write_string(1, &self.doc_id)?;
         }
         if !self.data.is_empty() {
             os.write_bytes(2, &self.data)?;
@@ -810,8 +810,8 @@ impl ::protobuf::Message for DocChangeset {
         Self::descriptor_static()
     }
 
-    fn new() -> DocChangeset {
-        DocChangeset::new()
+    fn new() -> DocDelta {
+        DocDelta::new()
     }
 
     fn descriptor_static() -> &'static ::protobuf::reflect::MessageDescriptor {
@@ -819,44 +819,44 @@ impl ::protobuf::Message for DocChangeset {
         descriptor.get(|| {
             let mut fields = ::std::vec::Vec::new();
             fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeString>(
-                "id",
-                |m: &DocChangeset| { &m.id },
-                |m: &mut DocChangeset| { &mut m.id },
+                "doc_id",
+                |m: &DocDelta| { &m.doc_id },
+                |m: &mut DocDelta| { &mut m.doc_id },
             ));
             fields.push(::protobuf::reflect::accessor::make_simple_field_accessor::<_, ::protobuf::types::ProtobufTypeBytes>(
                 "data",
-                |m: &DocChangeset| { &m.data },
-                |m: &mut DocChangeset| { &mut m.data },
+                |m: &DocDelta| { &m.data },
+                |m: &mut DocDelta| { &mut m.data },
             ));
-            ::protobuf::reflect::MessageDescriptor::new_pb_name::<DocChangeset>(
-                "DocChangeset",
+            ::protobuf::reflect::MessageDescriptor::new_pb_name::<DocDelta>(
+                "DocDelta",
                 fields,
                 file_descriptor_proto()
             )
         })
     }
 
-    fn default_instance() -> &'static DocChangeset {
-        static instance: ::protobuf::rt::LazyV2<DocChangeset> = ::protobuf::rt::LazyV2::INIT;
-        instance.get(DocChangeset::new)
+    fn default_instance() -> &'static DocDelta {
+        static instance: ::protobuf::rt::LazyV2<DocDelta> = ::protobuf::rt::LazyV2::INIT;
+        instance.get(DocDelta::new)
     }
 }
 
-impl ::protobuf::Clear for DocChangeset {
+impl ::protobuf::Clear for DocDelta {
     fn clear(&mut self) {
-        self.id.clear();
+        self.doc_id.clear();
         self.data.clear();
         self.unknown_fields.clear();
     }
 }
 
-impl ::std::fmt::Debug for DocChangeset {
+impl ::std::fmt::Debug for DocDelta {
     fn fmt(&self, f: &mut ::std::fmt::Formatter<'_>) -> ::std::fmt::Result {
         ::protobuf::text_format::fmt(self, f)
     }
 }
 
-impl ::protobuf::reflect::ProtobufValue for DocChangeset {
+impl ::protobuf::reflect::ProtobufValue for DocDelta {
     fn as_ref(&self) -> ::protobuf::reflect::ReflectValueRef {
         ::protobuf::reflect::ReflectValueRef::Message(self)
     }
@@ -1026,45 +1026,45 @@ static file_descriptor_proto_data: &'static [u8] = b"\
     R\x02id\x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\x04data\"E\n\x03Doc\x12\
     \x0e\n\x02id\x18\x01\x20\x01(\tR\x02id\x12\x12\n\x04data\x18\x02\x20\x01\
     (\x0cR\x04data\x12\x1a\n\x08revision\x18\x03\x20\x01(\x03R\x08revision\"\
-    <\n\x0fUpdateDocParams\x12\x0e\n\x02id\x18\x01\x20\x01(\tR\x02id\x12\x19\
-    \n\x08doc_data\x18\x02\x20\x01(\x0cR\x07docData\"2\n\x0cDocChangeset\x12\
-    \x0e\n\x02id\x18\x01\x20\x01(\tR\x02id\x12\x12\n\x04data\x18\x02\x20\x01\
-    (\x0cR\x04data\"'\n\x0eQueryDocParams\x12\x15\n\x06doc_id\x18\x01\x20\
-    \x01(\tR\x05docIdJ\xb0\x05\n\x06\x12\x04\0\0\x15\x01\n\x08\n\x01\x0c\x12\
-    \x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\n\n\x03\x04\0\x01\
-    \x12\x03\x02\x08\x17\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x12\n\x0c\n\
-    \x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\x12\
-    \x03\x03\x0b\r\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x10\x11\n\x0b\n\
-    \x04\x04\0\x02\x01\x12\x03\x04\x04\x13\n\x0c\n\x05\x04\0\x02\x01\x05\x12\
-    \x03\x04\x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\n\x0e\n\x0c\n\
-    \x05\x04\0\x02\x01\x03\x12\x03\x04\x11\x12\n\n\n\x02\x04\x01\x12\x04\x06\
-    \0\n\x01\n\n\n\x03\x04\x01\x01\x12\x03\x06\x08\x0b\n\x0b\n\x04\x04\x01\
-    \x02\0\x12\x03\x07\x04\x12\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x07\x04\
-    \n\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x07\x0b\r\n\x0c\n\x05\x04\x01\
-    \x02\0\x03\x12\x03\x07\x10\x11\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\x08\
-    \x04\x13\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\x08\x04\t\n\x0c\n\x05\
-    \x04\x01\x02\x01\x01\x12\x03\x08\n\x0e\n\x0c\n\x05\x04\x01\x02\x01\x03\
-    \x12\x03\x08\x11\x12\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\t\x04\x17\n\x0c\
-    \n\x05\x04\x01\x02\x02\x05\x12\x03\t\x04\t\n\x0c\n\x05\x04\x01\x02\x02\
-    \x01\x12\x03\t\n\x12\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\t\x15\x16\n\
-    \n\n\x02\x04\x02\x12\x04\x0b\0\x0e\x01\n\n\n\x03\x04\x02\x01\x12\x03\x0b\
-    \x08\x17\n\x0b\n\x04\x04\x02\x02\0\x12\x03\x0c\x04\x12\n\x0c\n\x05\x04\
-    \x02\x02\0\x05\x12\x03\x0c\x04\n\n\x0c\n\x05\x04\x02\x02\0\x01\x12\x03\
-    \x0c\x0b\r\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\x0c\x10\x11\n\x0b\n\x04\
-    \x04\x02\x02\x01\x12\x03\r\x04\x17\n\x0c\n\x05\x04\x02\x02\x01\x05\x12\
-    \x03\r\x04\t\n\x0c\n\x05\x04\x02\x02\x01\x01\x12\x03\r\n\x12\n\x0c\n\x05\
-    \x04\x02\x02\x01\x03\x12\x03\r\x15\x16\n\n\n\x02\x04\x03\x12\x04\x0f\0\
-    \x12\x01\n\n\n\x03\x04\x03\x01\x12\x03\x0f\x08\x14\n\x0b\n\x04\x04\x03\
-    \x02\0\x12\x03\x10\x04\x12\n\x0c\n\x05\x04\x03\x02\0\x05\x12\x03\x10\x04\
-    \n\n\x0c\n\x05\x04\x03\x02\0\x01\x12\x03\x10\x0b\r\n\x0c\n\x05\x04\x03\
-    \x02\0\x03\x12\x03\x10\x10\x11\n\x0b\n\x04\x04\x03\x02\x01\x12\x03\x11\
-    \x04\x13\n\x0c\n\x05\x04\x03\x02\x01\x05\x12\x03\x11\x04\t\n\x0c\n\x05\
-    \x04\x03\x02\x01\x01\x12\x03\x11\n\x0e\n\x0c\n\x05\x04\x03\x02\x01\x03\
-    \x12\x03\x11\x11\x12\n\n\n\x02\x04\x04\x12\x04\x13\0\x15\x01\n\n\n\x03\
-    \x04\x04\x01\x12\x03\x13\x08\x16\n\x0b\n\x04\x04\x04\x02\0\x12\x03\x14\
-    \x04\x16\n\x0c\n\x05\x04\x04\x02\0\x05\x12\x03\x14\x04\n\n\x0c\n\x05\x04\
-    \x04\x02\0\x01\x12\x03\x14\x0b\x11\n\x0c\n\x05\x04\x04\x02\0\x03\x12\x03\
-    \x14\x14\x15b\x06proto3\
+    <\n\x0fUpdateDocParams\x12\x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\
+    \x12\x12\n\x04data\x18\x02\x20\x01(\x0cR\x04data\"5\n\x08DocDelta\x12\
+    \x15\n\x06doc_id\x18\x01\x20\x01(\tR\x05docId\x12\x12\n\x04data\x18\x02\
+    \x20\x01(\x0cR\x04data\"'\n\x0eQueryDocParams\x12\x15\n\x06doc_id\x18\
+    \x01\x20\x01(\tR\x05docIdJ\xb0\x05\n\x06\x12\x04\0\0\x15\x01\n\x08\n\x01\
+    \x0c\x12\x03\0\0\x12\n\n\n\x02\x04\0\x12\x04\x02\0\x05\x01\n\n\n\x03\x04\
+    \0\x01\x12\x03\x02\x08\x17\n\x0b\n\x04\x04\0\x02\0\x12\x03\x03\x04\x12\n\
+    \x0c\n\x05\x04\0\x02\0\x05\x12\x03\x03\x04\n\n\x0c\n\x05\x04\0\x02\0\x01\
+    \x12\x03\x03\x0b\r\n\x0c\n\x05\x04\0\x02\0\x03\x12\x03\x03\x10\x11\n\x0b\
+    \n\x04\x04\0\x02\x01\x12\x03\x04\x04\x13\n\x0c\n\x05\x04\0\x02\x01\x05\
+    \x12\x03\x04\x04\t\n\x0c\n\x05\x04\0\x02\x01\x01\x12\x03\x04\n\x0e\n\x0c\
+    \n\x05\x04\0\x02\x01\x03\x12\x03\x04\x11\x12\n\n\n\x02\x04\x01\x12\x04\
+    \x06\0\n\x01\n\n\n\x03\x04\x01\x01\x12\x03\x06\x08\x0b\n\x0b\n\x04\x04\
+    \x01\x02\0\x12\x03\x07\x04\x12\n\x0c\n\x05\x04\x01\x02\0\x05\x12\x03\x07\
+    \x04\n\n\x0c\n\x05\x04\x01\x02\0\x01\x12\x03\x07\x0b\r\n\x0c\n\x05\x04\
+    \x01\x02\0\x03\x12\x03\x07\x10\x11\n\x0b\n\x04\x04\x01\x02\x01\x12\x03\
+    \x08\x04\x13\n\x0c\n\x05\x04\x01\x02\x01\x05\x12\x03\x08\x04\t\n\x0c\n\
+    \x05\x04\x01\x02\x01\x01\x12\x03\x08\n\x0e\n\x0c\n\x05\x04\x01\x02\x01\
+    \x03\x12\x03\x08\x11\x12\n\x0b\n\x04\x04\x01\x02\x02\x12\x03\t\x04\x17\n\
+    \x0c\n\x05\x04\x01\x02\x02\x05\x12\x03\t\x04\t\n\x0c\n\x05\x04\x01\x02\
+    \x02\x01\x12\x03\t\n\x12\n\x0c\n\x05\x04\x01\x02\x02\x03\x12\x03\t\x15\
+    \x16\n\n\n\x02\x04\x02\x12\x04\x0b\0\x0e\x01\n\n\n\x03\x04\x02\x01\x12\
+    \x03\x0b\x08\x17\n\x0b\n\x04\x04\x02\x02\0\x12\x03\x0c\x04\x16\n\x0c\n\
+    \x05\x04\x02\x02\0\x05\x12\x03\x0c\x04\n\n\x0c\n\x05\x04\x02\x02\0\x01\
+    \x12\x03\x0c\x0b\x11\n\x0c\n\x05\x04\x02\x02\0\x03\x12\x03\x0c\x14\x15\n\
+    \x0b\n\x04\x04\x02\x02\x01\x12\x03\r\x04\x13\n\x0c\n\x05\x04\x02\x02\x01\
+    \x05\x12\x03\r\x04\t\n\x0c\n\x05\x04\x02\x02\x01\x01\x12\x03\r\n\x0e\n\
+    \x0c\n\x05\x04\x02\x02\x01\x03\x12\x03\r\x11\x12\n\n\n\x02\x04\x03\x12\
+    \x04\x0f\0\x12\x01\n\n\n\x03\x04\x03\x01\x12\x03\x0f\x08\x10\n\x0b\n\x04\
+    \x04\x03\x02\0\x12\x03\x10\x04\x16\n\x0c\n\x05\x04\x03\x02\0\x05\x12\x03\
+    \x10\x04\n\n\x0c\n\x05\x04\x03\x02\0\x01\x12\x03\x10\x0b\x11\n\x0c\n\x05\
+    \x04\x03\x02\0\x03\x12\x03\x10\x14\x15\n\x0b\n\x04\x04\x03\x02\x01\x12\
+    \x03\x11\x04\x13\n\x0c\n\x05\x04\x03\x02\x01\x05\x12\x03\x11\x04\t\n\x0c\
+    \n\x05\x04\x03\x02\x01\x01\x12\x03\x11\n\x0e\n\x0c\n\x05\x04\x03\x02\x01\
+    \x03\x12\x03\x11\x11\x12\n\n\n\x02\x04\x04\x12\x04\x13\0\x15\x01\n\n\n\
+    \x03\x04\x04\x01\x12\x03\x13\x08\x16\n\x0b\n\x04\x04\x04\x02\0\x12\x03\
+    \x14\x04\x16\n\x0c\n\x05\x04\x04\x02\0\x05\x12\x03\x14\x04\n\n\x0c\n\x05\
+    \x04\x04\x02\0\x01\x12\x03\x14\x0b\x11\n\x0c\n\x05\x04\x04\x02\0\x03\x12\
+    \x03\x14\x14\x15b\x06proto3\
 ";
 
 static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

+ 4 - 4
rust-lib/flowy-document/src/protobuf/proto/doc.proto

@@ -10,11 +10,11 @@ message Doc {
     int64 revision = 3;
 }
 message UpdateDocParams {
-    string id = 1;
-    bytes doc_data = 2;
+    string doc_id = 1;
+    bytes data = 2;
 }
-message DocChangeset {
-    string id = 1;
+message DocDelta {
+    string doc_id = 1;
     bytes data = 2;
 }
 message QueryDocParams {

+ 17 - 11
rust-lib/flowy-document/src/services/cache.rs

@@ -1,28 +1,34 @@
+use std::{convert::TryInto, fmt::Debug, sync::Arc};
+
+use bytes::Bytes;
+use dashmap::DashMap;
+use parking_lot::RwLock;
+
+use flowy_database::ConnectionPool;
+use flowy_ot::{core::Delta, errors::OTError};
+
 use crate::{
     entities::doc::Doc,
     errors::DocError,
     services::{
-        open_doc::{DocId, OpenedDoc},
+        doc::edit_context::{DocId, EditDocContext},
         ws::WsManager,
     },
 };
-use bytes::Bytes;
-use dashmap::DashMap;
-use flowy_database::ConnectionPool;
-use flowy_ot::{core::Delta, errors::OTError};
-use parking_lot::RwLock;
-use std::{convert::TryInto, fmt::Debug, sync::Arc};
 
 pub(crate) struct DocCache {
-    doc_map: DashMap<DocId, Arc<OpenedDoc>>,
+    doc_map: DashMap<DocId, Arc<EditDocContext>>,
 }
 
 impl DocCache {
     pub(crate) fn new() -> Self { Self { doc_map: DashMap::new() } }
 
-    pub(crate) fn set(&self, doc: Arc<OpenedDoc>) -> Result<(), DocError> {
+    pub(crate) fn set(&self, doc: Arc<EditDocContext>) {
+        let doc_id = doc.id.clone();
+        if self.doc_map.contains_key(&doc_id) {
+            log::warn!("Doc:{} already exists in cache", doc_id.as_ref());
+        }
         self.doc_map.insert(doc.id.clone(), doc);
-        Ok(())
     }
 
     pub(crate) fn is_opened(&self, doc_id: &str) -> bool {
@@ -30,7 +36,7 @@ impl DocCache {
         self.doc_map.get(&doc_id).is_some()
     }
 
-    pub(crate) fn get(&self, doc_id: &str) -> Result<Arc<OpenedDoc>, DocError> {
+    pub(crate) fn get(&self, doc_id: &str) -> Result<Arc<EditDocContext>, DocError> {
         if !self.is_opened(&doc_id) {
             return Err(doc_not_found());
         }

+ 60 - 52
rust-lib/flowy-document/src/services/doc_controller.rs → rust-lib/flowy-document/src/services/doc/doc_controller.rs

@@ -1,27 +1,25 @@
 use crate::{
-    entities::doc::{CreateDocParams, Doc, QueryDocParams, UpdateDocParams},
-    errors::DocError,
+    entities::doc::{CreateDocParams, Doc, DocDelta, QueryDocParams, UpdateDocParams},
+    errors::{internal_error, DocError},
     module::DocumentUser,
-    services::server::Server,
-    sql_tables::doc::{DocTable, DocTableChangeset, DocTableSql},
-};
-use flowy_database::{ConnectionPool, SqliteConnection};
-
-use crate::{
-    errors::internal_error,
     services::{
         cache::DocCache,
-        open_doc::{DocId, OpenedDoc, OpenedDocPersistence},
+        doc::edit_context::{DocId, EditDocContext, EditDocPersistence},
+        server::Server,
         ws::WsManager,
     },
+    sql_tables::doc::{DocTable, DocTableChangeset, DocTableSql, OpTableSql},
 };
+use bytes::Bytes;
+use flowy_database::{ConnectionPool, SqliteConnection};
 use parking_lot::RwLock;
 use std::sync::Arc;
 use tokio::task::JoinHandle;
 
 pub(crate) struct DocController {
     server: Server,
-    sql: Arc<DocTableSql>,
+    doc_sql: Arc<DocTableSql>,
+    op_sql: Arc<OpTableSql>,
     ws: Arc<RwLock<WsManager>>,
     cache: Arc<DocCache>,
     user: Arc<dyn DocumentUser>,
@@ -29,11 +27,13 @@ pub(crate) struct DocController {
 
 impl DocController {
     pub(crate) fn new(server: Server, user: Arc<dyn DocumentUser>, ws: Arc<RwLock<WsManager>>) -> Self {
-        let sql = Arc::new(DocTableSql {});
+        let doc_sql = Arc::new(DocTableSql {});
+        let op_sql = Arc::new(OpTableSql {});
         let cache = Arc::new(DocCache::new());
         Self {
-            sql,
             server,
+            doc_sql,
+            op_sql,
             user,
             ws,
             cache,
@@ -47,21 +47,21 @@ impl DocController {
             data: params.data,
             revision: 0,
         };
-        let _ = self.sql.create_doc_table(DocTable::new(doc), conn)?;
+        let _ = self.doc_sql.create_doc_table(DocTable::new(doc), conn)?;
         Ok(())
     }
 
     #[tracing::instrument(level = "debug", skip(self, pool), err)]
-    pub(crate) async fn open(&self, params: QueryDocParams, pool: Arc<ConnectionPool>) -> Result<Arc<OpenedDoc>, DocError> {
+    pub(crate) async fn open(&self, params: QueryDocParams, pool: Arc<ConnectionPool>) -> Result<Arc<EditDocContext>, DocError> {
         if self.cache.is_opened(&params.doc_id) == false {
-            return match self._open(params.clone(), pool.clone()) {
+            return match self._open(params, pool).await {
                 Ok(doc) => Ok(doc),
                 Err(error) => Err(error),
             };
         }
 
-        let doc = self.cache.get(&params.doc_id)?;
-        Ok(doc)
+        let edit_doc_ctx = self.cache.get(&params.doc_id)?;
+        Ok(edit_doc_ctx)
     }
 
     pub(crate) fn close(&self, doc_id: &str) -> Result<(), DocError> {
@@ -70,31 +70,23 @@ impl DocController {
         Ok(())
     }
 
-    // #[tracing::instrument(level = "debug", skip(self, changeset, pool), err)]
-    // pub(crate) async fn apply_changeset<T>(&self, id: T, changeset: Bytes, pool:
-    // Arc<ConnectionPool>) -> Result<(), DocError>     where
-    //         T: Into<DocId> + Debug,
-    // {
-    //     let id = id.into();
-    //     match self.doc_map.get(&id) {
-    //         None => Err(doc_not_found()),
-    //         Some(doc) => {
-    //             let _ = doc.apply_delta(changeset, pool)?;
-    //             Ok(())
-    //         },
-    //     }
-    // }
-
     #[tracing::instrument(level = "debug", skip(self, conn), err)]
     pub(crate) fn delete(&self, params: QueryDocParams, conn: &SqliteConnection) -> Result<(), DocError> {
         let doc_id = &params.doc_id;
-        let _ = self.sql.delete_doc(doc_id, &*conn)?;
+        let _ = self.doc_sql.delete_doc(doc_id, &*conn)?;
 
         self.cache.remove(doc_id);
         self.ws.write().remove_handler(doc_id);
         let _ = self.delete_doc_on_server(params)?;
         Ok(())
     }
+
+    #[tracing::instrument(level = "debug", skip(self, delta, pool), err)]
+    pub(crate) fn edit_doc(&self, delta: DocDelta, pool: Arc<ConnectionPool>) -> Result<Doc, DocError> {
+        let edit_doc_ctx = self.cache.get(&delta.doc_id)?;
+        let _ = edit_doc_ctx.apply_delta(Bytes::from(delta.data), pool)?;
+        Ok(edit_doc_ctx.doc())
+    }
 }
 
 impl DocController {
@@ -119,19 +111,22 @@ impl DocController {
         &self,
         params: QueryDocParams,
         pool: Arc<ConnectionPool>,
-    ) -> Result<JoinHandle<Result<Doc, DocError>>, DocError> {
+    ) -> Result<JoinHandle<Result<Arc<EditDocContext>, DocError>>, DocError> {
         let token = self.user.token()?;
         let server = self.server.clone();
-        let sql = self.sql.clone();
+        let doc_sql = self.doc_sql.clone();
+        let op_sql = self.op_sql.clone();
+        let ws = self.ws.clone();
+        let cache = self.cache.clone();
 
         Ok(tokio::spawn(async move {
             match server.read_doc(&token, params).await? {
                 None => Err(DocError::not_found()),
                 Some(doc) => {
                     let doc_table = DocTable::new(doc.clone());
-                    let _ = sql.create_doc_table(doc_table, &*(pool.get().map_err(internal_error)?))?;
-                    // TODO: notify
-                    Ok(doc)
+                    let _ = doc_sql.create_doc_table(doc_table, &*(pool.get().map_err(internal_error)?))?;
+                    let edit_doc_ctx = make_edit_context(ws, cache, op_sql, doc)?;
+                    Ok(edit_doc_ctx)
                 },
             }
         }))
@@ -153,32 +148,45 @@ impl DocController {
         Ok(())
     }
 
-    fn _open(&self, params: QueryDocParams, pool: Arc<ConnectionPool>) -> Result<Arc<OpenedDoc>, DocError> {
-        match self.sql.read_doc_table(&params.doc_id, &*(pool.get().map_err(internal_error)?)) {
+    async fn _open(&self, params: QueryDocParams, pool: Arc<ConnectionPool>) -> Result<Arc<EditDocContext>, DocError> {
+        match self.doc_sql.read_doc_table(&params.doc_id, &*(pool.get().map_err(internal_error)?)) {
             Ok(doc_table) => {
-                let doc = Arc::new(OpenedDoc::new(doc_table.into(), self.ws.read().sender.clone())?);
-                self.ws.write().register_handler(doc.id.as_ref(), doc.clone());
-                self.cache.set(doc.clone());
-
-                Ok(doc)
+                let edit_doc_ctx = make_edit_context(self.ws.clone(), self.cache.clone(), self.op_sql.clone(), doc_table.into())?;
+                Ok(edit_doc_ctx)
             },
             Err(error) => {
                 if error.is_record_not_found() {
                     log::debug!("Doc:{} don't exist, reading from server", params.doc_id);
-                    // TODO: notify doc update
-                    let _ = self.read_doc_from_server(params, pool);
+                    match self.read_doc_from_server(params, pool)?.await.map_err(internal_error)? {
+                        Ok(edit_doc_ctx) => Ok(edit_doc_ctx),
+                        Err(error) => Err(error),
+                    }
+                } else {
+                    return Err(error);
                 }
-
-                return Err(error);
             },
         }
     }
 }
 
-impl OpenedDocPersistence for DocController {
+fn make_edit_context(
+    ws: Arc<RwLock<WsManager>>,
+    cache: Arc<DocCache>,
+    op_sql: Arc<OpTableSql>,
+    doc: Doc,
+) -> Result<Arc<EditDocContext>, DocError> {
+    // Opti: require upgradable_read lock and then upgrade to write lock using
+    // RwLockUpgradableReadGuard::upgrade(xx)
+    let edit_doc_ctx = Arc::new(EditDocContext::new(doc, ws.read().sender.clone(), op_sql)?);
+    ws.write().register_handler(edit_doc_ctx.id.as_ref(), edit_doc_ctx.clone());
+    cache.set(edit_doc_ctx.clone());
+    Ok(edit_doc_ctx)
+}
+
+impl EditDocPersistence for DocController {
     fn save(&self, params: UpdateDocParams, pool: Arc<ConnectionPool>) -> Result<(), DocError> {
         let changeset = DocTableChangeset::new(params.clone());
-        let _ = self.sql.update_doc_table(changeset, &*(pool.get().map_err(internal_error)?))?;
+        let _ = self.doc_sql.update_doc_table(changeset, &*(pool.get().map_err(internal_error)?))?;
         Ok(())
     }
 }

+ 10 - 7
rust-lib/flowy-document/src/services/open_doc.rs → rust-lib/flowy-document/src/services/doc/edit_context.rs

@@ -8,6 +8,7 @@ use crate::{
         doc::Document,
         ws::{WsHandler, WsSender},
     },
+    sql_tables::doc::OpTableSql,
 };
 use bytes::Bytes;
 use flowy_database::ConnectionPool;
@@ -27,19 +28,20 @@ where
     fn from(s: T) -> Self { DocId(s.to_string()) }
 }
 
-pub(crate) trait OpenedDocPersistence: Send + Sync {
+pub(crate) trait EditDocPersistence: Send + Sync {
     fn save(&self, params: UpdateDocParams, pool: Arc<ConnectionPool>) -> Result<(), DocError>;
 }
 
-pub(crate) struct OpenedDoc {
+pub(crate) struct EditDocContext {
     pub(crate) id: DocId,
     pub(crate) revision: i64,
     document: RwLock<Document>,
     ws_sender: Arc<dyn WsSender>,
+    op_sql: Arc<OpTableSql>,
 }
 
-impl OpenedDoc {
-    pub(crate) fn new(doc: Doc, ws_sender: Arc<dyn WsSender>) -> Result<Self, DocError> {
+impl EditDocContext {
+    pub(crate) fn new(doc: Doc, ws_sender: Arc<dyn WsSender>, op_sql: Arc<OpTableSql>) -> Result<Self, DocError> {
         let id: DocId = doc.id.into();
         let revision = doc.revision;
         let delta: Delta = doc.data.try_into()?;
@@ -50,6 +52,7 @@ impl OpenedDoc {
             revision,
             document,
             ws_sender,
+            op_sql,
         })
     }
 
@@ -75,8 +78,8 @@ impl OpenedDoc {
 
         // Opti: strategy to save the document
         let save = UpdateDocParams {
-            id: self.id.0.clone(),
-            doc_data: write_guard.to_bytes(),
+            doc_id: self.id.0.clone(),
+            data: write_guard.to_bytes(),
         };
         // let _ = self.persistence.save(save, pool)?;
 
@@ -84,7 +87,7 @@ impl OpenedDoc {
     }
 }
 
-impl WsHandler for OpenedDoc {
+impl WsHandler for EditDocContext {
     fn receive(&self, data: WsDocumentData) {
         match data.source {
             WsSource::Delta => {},

+ 6 - 4
rust-lib/flowy-document/src/services/doc/mod.rs

@@ -1,10 +1,12 @@
+pub use document::*;
+pub use history::*;
+pub use view::*;
+
 mod document;
 mod history;
 mod view;
 
+pub(crate) mod doc_controller;
+pub mod edit_context;
 pub mod extensions;
 mod util;
-
-pub use document::*;
-pub use history::*;
-pub use view::*;

+ 0 - 0
rust-lib/flowy-document/src/services/file_manager/file.rs → rust-lib/flowy-document/src/services/file/file.rs


+ 0 - 0
rust-lib/flowy-document/src/services/file_manager/manager.rs → rust-lib/flowy-document/src/services/file/manager.rs


+ 0 - 0
rust-lib/flowy-document/src/services/file_manager/mod.rs → rust-lib/flowy-document/src/services/file/mod.rs


+ 0 - 2
rust-lib/flowy-document/src/services/mod.rs

@@ -1,6 +1,4 @@
 mod cache;
 pub mod doc;
-pub(crate) mod doc_controller;
-mod open_doc;
 pub mod server;
 pub mod ws;

+ 37 - 0
rust-lib/flowy-document/src/sql_tables/doc/doc_op_sql.rs

@@ -0,0 +1,37 @@
+use crate::{
+    errors::DocError,
+    sql_tables::doc::{OpChangeset, OpTable},
+};
+use flowy_database::{
+    prelude::*,
+    schema::{op_table, op_table::dsl},
+    SqliteConnection,
+};
+
+pub struct OpTableSql {}
+
+impl OpTableSql {
+    pub(crate) fn create_op_table(&self, op_table: OpTable, conn: &SqliteConnection) -> Result<(), DocError> {
+        let _ = diesel::insert_into(op_table::table).values(op_table).execute(conn)?;
+        Ok(())
+    }
+
+    pub(crate) fn update_op_table(&self, changeset: OpChangeset, conn: &SqliteConnection) -> Result<(), DocError> {
+        let filter = dsl::op_table.filter(op_table::dsl::rev.eq(changeset.rev));
+        let affected_row = diesel::update(filter).set(changeset).execute(conn)?;
+        debug_assert_eq!(affected_row, 1);
+        Ok(())
+    }
+
+    pub(crate) fn read_op_table(&self, conn: &SqliteConnection) -> Result<Vec<OpTable>, DocError> {
+        let ops = dsl::op_table.load::<OpTable>(conn)?;
+        Ok(ops)
+    }
+
+    pub(crate) fn delete_op_table(&self, rev: i64, conn: &SqliteConnection) -> Result<(), DocError> {
+        let filter = dsl::op_table.filter(op_table::dsl::rev.eq(rev));
+        let affected_row = diesel::delete(filter).execute(conn)?;
+        debug_assert_eq!(affected_row, 1);
+        Ok(())
+    }
+}

+ 54 - 0
rust-lib/flowy-document/src/sql_tables/doc/doc_op_table.rs

@@ -0,0 +1,54 @@
+use diesel::sql_types::Integer;
+use flowy_database::schema::op_table;
+
+#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
+#[table_name = "op_table"]
+#[primary_key(rev)]
+pub(crate) struct OpTable {
+    pub(crate) base_rev: i64,
+    pub(crate) rev: i64,
+    pub(crate) data: Vec<u8>,
+    pub(crate) md5: String,
+    pub(crate) state: OpState,
+}
+
+#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash, FromSqlRow, AsExpression)]
+#[repr(i32)]
+#[sql_type = "Integer"]
+pub enum OpState {
+    Local   = 0,
+    Sending = 1,
+    Acked   = 2,
+}
+
+impl std::default::Default for OpState {
+    fn default() -> Self { OpState::Local }
+}
+
+impl std::convert::From<i32> for OpState {
+    fn from(value: i32) -> Self {
+        match value {
+            0 => OpState::Local,
+            1 => OpState::Sending,
+            2 => OpState::Acked,
+            o => {
+                log::error!("Unsupported view type {}, fallback to ViewType::Docs", o);
+                OpState::Local
+            },
+        }
+    }
+}
+
+impl OpState {
+    pub fn value(&self) -> i32 { *self as i32 }
+}
+
+impl_sql_integer_expression!(OpState);
+
+#[derive(AsChangeset, Identifiable, Default, Debug)]
+#[table_name = "op_table"]
+#[primary_key(rev)]
+pub(crate) struct OpChangeset {
+    pub(crate) rev: i64,
+    pub(crate) state: Option<OpState>,
+}

+ 5 - 5
rust-lib/flowy-document/src/sql_tables/doc/doc_table.rs

@@ -4,9 +4,9 @@ use flowy_database::schema::doc_table;
 #[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
 #[table_name = "doc_table"]
 pub(crate) struct DocTable {
-    pub id: String,
-    pub data: Vec<u8>,
-    pub revision: i64,
+    pub(crate) id: String,
+    pub(crate) data: Vec<u8>,
+    pub(crate) revision: i64,
 }
 
 impl DocTable {
@@ -29,8 +29,8 @@ pub(crate) struct DocTableChangeset {
 impl DocTableChangeset {
     pub(crate) fn new(params: UpdateDocParams) -> Self {
         Self {
-            id: params.id,
-            data: params.doc_data,
+            id: params.doc_id,
+            data: params.data,
         }
     }
 }

+ 4 - 0
rust-lib/flowy-document/src/sql_tables/doc/mod.rs

@@ -1,5 +1,9 @@
+mod doc_op_sql;
+mod doc_op_table;
 mod doc_sql;
 mod doc_table;
 
+pub(crate) use doc_op_sql::*;
+pub(crate) use doc_op_table::*;
 pub(crate) use doc_sql::*;
 pub(crate) use doc_table::*;

+ 5 - 8
rust-lib/flowy-workspace/src/entities/view/view_update.rs

@@ -3,7 +3,7 @@ use crate::{
     errors::WorkspaceError,
 };
 use flowy_derive::ProtoBuf;
-use flowy_document::entities::doc::{DocChangeset, UpdateDocParams};
+use flowy_document::entities::doc::{DocDelta, UpdateDocParams};
 use std::convert::TryInto;
 
 #[derive(Default, ProtoBuf)]
@@ -118,10 +118,7 @@ impl TryInto<UpdateDocParams> for SaveViewDataRequest {
         // Opti: Vec<u8> -> Delta -> Vec<u8>
         let data = DeltaData::parse(self.data).map_err(|e| WorkspaceError::view_data().context(e))?.0;
 
-        Ok(UpdateDocParams {
-            id: view_id,
-            doc_data: data,
-        })
+        Ok(UpdateDocParams { doc_id: view_id, data })
     }
 }
 
@@ -134,15 +131,15 @@ pub struct ApplyChangesetRequest {
     pub data: Vec<u8>,
 }
 
-impl TryInto<DocChangeset> for ApplyChangesetRequest {
+impl TryInto<DocDelta> for ApplyChangesetRequest {
     type Error = WorkspaceError;
 
-    fn try_into(self) -> Result<DocChangeset, Self::Error> {
+    fn try_into(self) -> Result<DocDelta, Self::Error> {
         let view_id = ViewId::parse(self.view_id).map_err(|e| WorkspaceError::view_id().context(e))?.0;
 
         // Opti: Vec<u8> -> Delta -> Vec<u8>
         let data = DeltaData::parse(self.data).map_err(|e| WorkspaceError::view_data().context(e))?.0;
 
-        Ok(DocChangeset { id: view_id, data })
+        Ok(DocDelta { doc_id: view_id, data })
     }
 }

+ 2 - 2
rust-lib/flowy-workspace/src/event.rs

@@ -49,6 +49,6 @@ pub enum WorkspaceEvent {
     #[event(input = "OpenViewRequest", output = "Doc")]
     OpenView          = 205,
 
-    #[event(input = "DocChangeset", output = "Doc")]
-    ApplyChangeset    = 206,
+    #[event(input = "DocDelta", output = "Doc")]
+    ApplyDocDelta     = 206,
 }

+ 4 - 4
rust-lib/flowy-workspace/src/handlers/view_handler.rs

@@ -16,7 +16,7 @@ use crate::{
     services::ViewController,
 };
 use flowy_dispatch::prelude::{data_result, Data, DataResult, Unit};
-use flowy_document::entities::doc::{Doc, DocChangeset, QueryDocParams};
+use flowy_document::entities::doc::{Doc, DocDelta, QueryDocParams};
 use std::{convert::TryInto, sync::Arc};
 
 #[tracing::instrument(skip(data, controller), err)]
@@ -56,12 +56,12 @@ pub(crate) async fn update_view_handler(
 }
 
 #[tracing::instrument(skip(data, controller), err)]
-pub(crate) async fn apply_changeset_handler(
+pub(crate) async fn apply_doc_delta_handler(
     data: Data<ApplyChangesetRequest>,
     controller: Unit<Arc<ViewController>>,
 ) -> DataResult<Doc, WorkspaceError> {
-    let params: DocChangeset = data.into_inner().try_into()?;
-    let doc = controller.apply_changeset(params).await?;
+    let params: DocDelta = data.into_inner().try_into()?;
+    let doc = controller.apply_doc_delta(params).await?;
     data_result(doc)
 }
 

+ 0 - 71
rust-lib/flowy-workspace/src/macros.rs

@@ -1,74 +1,3 @@
-#[macro_export]
-macro_rules! impl_sql_binary_expression {
-    ($target:ident) => {
-        impl diesel::serialize::ToSql<diesel::sql_types::Binary, diesel::sqlite::Sqlite> for $target {
-            fn to_sql<W: std::io::Write>(
-                &self,
-                out: &mut diesel::serialize::Output<W, diesel::sqlite::Sqlite>,
-            ) -> diesel::serialize::Result {
-                let bytes: Vec<u8> = self.try_into().map_err(|e| format!("{:?}", e))?;
-                diesel::serialize::ToSql::<diesel::sql_types::Binary, diesel::sqlite::Sqlite>::to_sql(&bytes, out)
-            }
-        }
-        // https://docs.diesel.rs/src/diesel/sqlite/types/mod.rs.html#30-33
-        // impl FromSql<sql_types::Binary, Sqlite> for *const [u8] {
-        //     fn from_sql(bytes: Option<&SqliteValue>) -> deserialize::Result<Self> {
-        //         let bytes = not_none!(bytes).read_blob();
-        //         Ok(bytes as *const _)
-        //     }
-        // }
-        impl<DB> diesel::deserialize::FromSql<diesel::sql_types::Binary, DB> for $target
-        where
-            DB: diesel::backend::Backend,
-            *const [u8]: diesel::deserialize::FromSql<diesel::sql_types::Binary, DB>,
-        {
-            fn from_sql(bytes: Option<&DB::RawValue>) -> diesel::deserialize::Result<Self> {
-                let slice_ptr = <*const [u8] as diesel::deserialize::FromSql<diesel::sql_types::Binary, DB>>::from_sql(bytes)?;
-                let bytes = unsafe { &*slice_ptr };
-
-                match $target::try_from(bytes) {
-                    Ok(object) => Ok(object),
-                    Err(e) => {
-                        log::error!("{:?} deserialize from bytes fail. {:?}", std::any::type_name::<$target>(), e);
-                        panic!();
-                    },
-                }
-            }
-        }
-    };
-}
-
-#[macro_export]
-macro_rules! impl_sql_integer_expression {
-    ($target:ident) => {
-        use diesel::{
-            deserialize,
-            deserialize::FromSql,
-            serialize,
-            serialize::{Output, ToSql},
-        };
-        use std::io::Write;
-
-        impl<DB> ToSql<Integer, DB> for $target
-        where
-            DB: diesel::backend::Backend,
-            i32: ToSql<Integer, DB>,
-        {
-            fn to_sql<W: Write>(&self, out: &mut Output<W, DB>) -> serialize::Result { (*self as i32).to_sql(out) }
-        }
-
-        impl<DB> FromSql<Integer, DB> for $target
-        where
-            DB: diesel::backend::Backend,
-            i32: FromSql<Integer, DB>,
-        {
-            fn from_sql(bytes: Option<&DB::RawValue>) -> deserialize::Result<Self> {
-                let smaill_int = i32::from_sql(bytes)?;
-                Ok($target::from(smaill_int))
-            }
-        }
-    };
-}
 // #[macro_export]
 // macro_rules! impl_save_func {
 //     ($func_name:ident, $target:ident, $table_name:expr, $conn:ident) => {

+ 1 - 1
rust-lib/flowy-workspace/src/module.rs

@@ -66,7 +66,7 @@ pub fn create(user: Arc<dyn WorkspaceUser>, database: Arc<dyn WorkspaceDatabase>
         .event(WorkspaceEvent::UpdateView, update_view_handler)
         .event(WorkspaceEvent::DeleteView, delete_view_handler)
         .event(WorkspaceEvent::OpenView, open_view_handler)
-        .event(WorkspaceEvent::ApplyChangeset, apply_changeset_handler);
+        .event(WorkspaceEvent::ApplyDocDelta, apply_doc_delta_handler);
 
     module
 }

+ 11 - 11
rust-lib/flowy-workspace/src/protobuf/model/event.rs

@@ -40,7 +40,7 @@ pub enum WorkspaceEvent {
     UpdateView = 203,
     DeleteView = 204,
     OpenView = 205,
-    ApplyChangeset = 206,
+    ApplyDocDelta = 206,
 }
 
 impl ::protobuf::ProtobufEnum for WorkspaceEvent {
@@ -65,7 +65,7 @@ impl ::protobuf::ProtobufEnum for WorkspaceEvent {
             203 => ::std::option::Option::Some(WorkspaceEvent::UpdateView),
             204 => ::std::option::Option::Some(WorkspaceEvent::DeleteView),
             205 => ::std::option::Option::Some(WorkspaceEvent::OpenView),
-            206 => ::std::option::Option::Some(WorkspaceEvent::ApplyChangeset),
+            206 => ::std::option::Option::Some(WorkspaceEvent::ApplyDocDelta),
             _ => ::std::option::Option::None
         }
     }
@@ -87,7 +87,7 @@ impl ::protobuf::ProtobufEnum for WorkspaceEvent {
             WorkspaceEvent::UpdateView,
             WorkspaceEvent::DeleteView,
             WorkspaceEvent::OpenView,
-            WorkspaceEvent::ApplyChangeset,
+            WorkspaceEvent::ApplyDocDelta,
         ];
         values
     }
@@ -116,17 +116,17 @@ impl ::protobuf::reflect::ProtobufValue for WorkspaceEvent {
 }
 
 static file_descriptor_proto_data: &'static [u8] = b"\
-    \n\x0bevent.proto*\xae\x02\n\x0eWorkspaceEvent\x12\x13\n\x0fCreateWorksp\
+    \n\x0bevent.proto*\xad\x02\n\x0eWorkspaceEvent\x12\x13\n\x0fCreateWorksp\
     ace\x10\0\x12\x14\n\x10ReadCurWorkspace\x10\x01\x12\x12\n\x0eReadWorkspa\
     ces\x10\x02\x12\x13\n\x0fDeleteWorkspace\x10\x03\x12\x11\n\rOpenWorkspac\
     e\x10\x04\x12\x15\n\x11ReadWorkspaceApps\x10\x05\x12\r\n\tCreateApp\x10e\
     \x12\r\n\tDeleteApp\x10f\x12\x0b\n\x07ReadApp\x10g\x12\r\n\tUpdateApp\
     \x10h\x12\x0f\n\nCreateView\x10\xc9\x01\x12\r\n\x08ReadView\x10\xca\x01\
     \x12\x0f\n\nUpdateView\x10\xcb\x01\x12\x0f\n\nDeleteView\x10\xcc\x01\x12\
-    \r\n\x08OpenView\x10\xcd\x01\x12\x13\n\x0eApplyChangeset\x10\xce\x01J\
-    \xba\x05\n\x06\x12\x04\0\0\x13\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\
-    \x02\x05\0\x12\x04\x02\0\x13\x01\n\n\n\x03\x05\0\x01\x12\x03\x02\x05\x13\
-    \n\x0b\n\x04\x05\0\x02\0\x12\x03\x03\x04\x18\n\x0c\n\x05\x05\0\x02\0\x01\
+    \r\n\x08OpenView\x10\xcd\x01\x12\x12\n\rApplyDocDelta\x10\xce\x01J\xba\
+    \x05\n\x06\x12\x04\0\0\x13\x01\n\x08\n\x01\x0c\x12\x03\0\0\x12\n\n\n\x02\
+    \x05\0\x12\x04\x02\0\x13\x01\n\n\n\x03\x05\0\x01\x12\x03\x02\x05\x13\n\
+    \x0b\n\x04\x05\0\x02\0\x12\x03\x03\x04\x18\n\x0c\n\x05\x05\0\x02\0\x01\
     \x12\x03\x03\x04\x13\n\x0c\n\x05\x05\0\x02\0\x02\x12\x03\x03\x16\x17\n\
     \x0b\n\x04\x05\0\x02\x01\x12\x03\x04\x04\x19\n\x0c\n\x05\x05\0\x02\x01\
     \x01\x12\x03\x04\x04\x14\n\x0c\n\x05\x05\0\x02\x01\x02\x12\x03\x04\x17\
@@ -156,9 +156,9 @@ static file_descriptor_proto_data: &'static [u8] = b"\
     \x02\r\x01\x12\x03\x10\x04\x0e\n\x0c\n\x05\x05\0\x02\r\x02\x12\x03\x10\
     \x11\x14\n\x0b\n\x04\x05\0\x02\x0e\x12\x03\x11\x04\x13\n\x0c\n\x05\x05\0\
     \x02\x0e\x01\x12\x03\x11\x04\x0c\n\x0c\n\x05\x05\0\x02\x0e\x02\x12\x03\
-    \x11\x0f\x12\n\x0b\n\x04\x05\0\x02\x0f\x12\x03\x12\x04\x19\n\x0c\n\x05\
-    \x05\0\x02\x0f\x01\x12\x03\x12\x04\x12\n\x0c\n\x05\x05\0\x02\x0f\x02\x12\
-    \x03\x12\x15\x18b\x06proto3\
+    \x11\x0f\x12\n\x0b\n\x04\x05\0\x02\x0f\x12\x03\x12\x04\x18\n\x0c\n\x05\
+    \x05\0\x02\x0f\x01\x12\x03\x12\x04\x11\n\x0c\n\x05\x05\0\x02\x0f\x02\x12\
+    \x03\x12\x14\x17b\x06proto3\
 ";
 
 static file_descriptor_proto_lazy: ::protobuf::rt::LazyV2<::protobuf::descriptor::FileDescriptorProto> = ::protobuf::rt::LazyV2::INIT;

+ 1 - 1
rust-lib/flowy-workspace/src/protobuf/proto/event.proto

@@ -16,5 +16,5 @@ enum WorkspaceEvent {
     UpdateView = 203;
     DeleteView = 204;
     OpenView = 205;
-    ApplyChangeset = 206;
+    ApplyDocDelta = 206;
 }

+ 3 - 3
rust-lib/flowy-workspace/src/services/view_controller.rs

@@ -14,7 +14,7 @@ use crate::{
 };
 use flowy_database::SqliteConnection;
 use flowy_document::{
-    entities::doc::{CreateDocParams, Doc, DocChangeset, QueryDocParams},
+    entities::doc::{CreateDocParams, Doc, DocDelta, QueryDocParams},
     module::FlowyDocument,
 };
 use std::sync::Arc;
@@ -125,9 +125,9 @@ impl ViewController {
         Ok(())
     }
 
-    pub(crate) async fn apply_changeset(&self, params: DocChangeset) -> Result<Doc, WorkspaceError> {
+    pub(crate) async fn apply_doc_delta(&self, params: DocDelta) -> Result<Doc, WorkspaceError> {
         let pool = self.database.db_pool()?;
-        let doc = self.document.apply_changeset(params, pool).await?;
+        let doc = self.document.apply_doc_delta(params, pool).await?;
         Ok(doc)
     }
 }