Browse Source

md5 for consistency check

appflowy 3 years ago
parent
commit
511b6658a4
24 changed files with 194 additions and 233 deletions
  1. 2 2
      backend/Cargo.lock
  2. 0 0
      backend/tests/document/edit_script.rs
  3. 1 1
      backend/tests/document/edit_test.rs
  4. 2 2
      backend/tests/document/mod.rs
  5. 1 1
      frontend/rust-lib/flowy-document/Cargo.toml
  6. 41 30
      frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs
  7. 42 21
      frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs
  8. 8 5
      frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs
  9. 9 4
      frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs
  10. 0 1
      frontend/rust-lib/flowy-document/tests/editor/mod.rs
  11. 0 91
      frontend/rust-lib/flowy-document/tests/editor/revision_test.rs
  12. 3 2
      frontend/rust-lib/flowy-test/Cargo.toml
  13. 13 54
      frontend/rust-lib/flowy-test/src/doc_script.rs
  14. 1 1
      frontend/rust-lib/flowy-test/src/lib.rs
  15. 1 0
      frontend/rust-lib/flowy-test/tests/main.rs
  16. 44 0
      frontend/rust-lib/flowy-test/tests/revision_test.rs
  17. 6 0
      shared-lib/flowy-collaboration/src/core/document/document.rs
  18. 0 1
      shared-lib/flowy-collaboration/src/core/document/extensions/format/mod.rs
  19. 2 3
      shared-lib/flowy-collaboration/src/core/document/extensions/format/resolve_block_format.rs
  20. 1 2
      shared-lib/flowy-collaboration/src/core/document/extensions/format/resolve_inline_format.rs
  21. 1 1
      shared-lib/flowy-collaboration/src/core/document/extensions/mod.rs
  22. 1 1
      shared-lib/flowy-derive/src/proto_buf/deserialize.rs
  23. 14 10
      shared-lib/lib-ot/src/revision/model.rs
  24. 1 0
      shared-lib/lib-ot/tests/main.rs

+ 2 - 2
backend/Cargo.lock

@@ -671,9 +671,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
 
 [[package]]
 name = "bytes"
-version = "1.0.1"
+version = "1.1.0"
 source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b700ce4376041dcd0a327fd0097c41095743c4c8af8887265942faf1100bd040"
+checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
 dependencies = [
  "serde",
 ]

+ 0 - 0
backend/tests/document/helper.rs → backend/tests/document/edit_script.rs


+ 1 - 1
backend/tests/document/edit.rs → backend/tests/document/edit_test.rs

@@ -1,4 +1,4 @@
-use crate::document::helper::{DocScript, DocumentTest};
+use crate::document::edit_script::{DocScript, DocumentTest};
 use flowy_collaboration::core::document::{Document, FlowyDoc};
 use lib_ot::{core::Interval, rich_text::RichTextAttribute};
 

+ 2 - 2
backend/tests/document/mod.rs

@@ -1,2 +1,2 @@
-mod edit;
-mod helper;
+mod edit_script;
+mod edit_test;

+ 1 - 1
frontend/rust-lib/flowy-document/Cargo.toml

@@ -28,7 +28,7 @@ lazy_static = "1.4.0"
 log = "0.4.14"
 tokio = {version = "1", features = ["sync"]}
 tracing = { version = "0.1", features = ["log"] }
-bytes = { version = "1.0" }
+bytes = { version = "1.1" }
 strum = "0.21"
 strum_macros = "0.21"
 dashmap = "4.0"

+ 41 - 30
frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs

@@ -2,7 +2,7 @@ use crate::{
     errors::FlowyError,
     module::DocumentUser,
     services::doc::{
-        edit::{EditCommand, EditCommandQueue, OpenDocAction, TransformDeltas},
+        edit::{DocumentMD5, EditCommand, EditCommandQueue, NewDelta, OpenDocAction, TransformDeltas},
         revision::{RevisionDownStream, RevisionManager, SteamStopTx},
         DocumentWebSocket,
         WsDocumentHandler,
@@ -70,50 +70,50 @@ impl ClientDocEditor {
     }
 
     pub async fn insert<T: ToString>(&self, index: usize, data: T) -> Result<(), FlowyError> {
-        let (ret, rx) = oneshot::channel::<CollaborateResult<RichTextDelta>>();
+        let (ret, rx) = oneshot::channel::<CollaborateResult<NewDelta>>();
         let msg = EditCommand::Insert {
             index,
             data: data.to_string(),
             ret,
         };
         let _ = self.edit_cmd_tx.send(msg);
-        let delta = rx.await.map_err(internal_error)??;
-        let _ = self.save_local_delta(delta).await?;
+        let (delta, md5) = rx.await.map_err(internal_error)??;
+        let _ = self.save_local_delta(delta, md5).await?;
         Ok(())
     }
 
     pub async fn delete(&self, interval: Interval) -> Result<(), FlowyError> {
-        let (ret, rx) = oneshot::channel::<CollaborateResult<RichTextDelta>>();
+        let (ret, rx) = oneshot::channel::<CollaborateResult<NewDelta>>();
         let msg = EditCommand::Delete { interval, ret };
         let _ = self.edit_cmd_tx.send(msg);
-        let delta = rx.await.map_err(internal_error)??;
-        let _ = self.save_local_delta(delta).await?;
+        let (delta, md5) = rx.await.map_err(internal_error)??;
+        let _ = self.save_local_delta(delta, md5).await?;
         Ok(())
     }
 
     pub async fn format(&self, interval: Interval, attribute: RichTextAttribute) -> Result<(), FlowyError> {
-        let (ret, rx) = oneshot::channel::<CollaborateResult<RichTextDelta>>();
+        let (ret, rx) = oneshot::channel::<CollaborateResult<NewDelta>>();
         let msg = EditCommand::Format {
             interval,
             attribute,
             ret,
         };
         let _ = self.edit_cmd_tx.send(msg);
-        let delta = rx.await.map_err(internal_error)??;
-        let _ = self.save_local_delta(delta).await?;
+        let (delta, md5) = rx.await.map_err(internal_error)??;
+        let _ = self.save_local_delta(delta, md5).await?;
         Ok(())
     }
 
     pub async fn replace<T: ToString>(&self, interval: Interval, data: T) -> Result<(), FlowyError> {
-        let (ret, rx) = oneshot::channel::<CollaborateResult<RichTextDelta>>();
+        let (ret, rx) = oneshot::channel::<CollaborateResult<NewDelta>>();
         let msg = EditCommand::Replace {
             interval,
             data: data.to_string(),
             ret,
         };
         let _ = self.edit_cmd_tx.send(msg);
-        let delta = rx.await.map_err(internal_error)??;
-        let _ = self.save_local_delta(delta).await?;
+        let (delta, md5) = rx.await.map_err(internal_error)??;
+        let _ = self.save_local_delta(delta, md5).await?;
         Ok(())
     }
 
@@ -148,7 +148,7 @@ impl ClientDocEditor {
     }
 
     pub async fn delta(&self) -> FlowyResult<DocDelta> {
-        let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
+        let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
         let msg = EditCommand::ReadDoc { ret };
         let _ = self.edit_cmd_tx.send(msg);
         let data = rx.await.map_err(internal_error)??;
@@ -159,12 +159,19 @@ impl ClientDocEditor {
         })
     }
 
-    async fn save_local_delta(&self, delta: RichTextDelta) -> Result<RevId, FlowyError> {
+    async fn save_local_delta(&self, delta: RichTextDelta, md5: String) -> Result<RevId, FlowyError> {
         let delta_data = delta.to_bytes();
         let (base_rev_id, rev_id) = self.rev_manager.next_rev_id();
-        let delta_data = delta_data.to_vec();
         let user_id = self.user.user_id()?;
-        let revision = Revision::new(base_rev_id, rev_id, delta_data, &self.doc_id, RevType::Local, user_id);
+        let revision = Revision::new(
+            &self.doc_id,
+            base_rev_id,
+            rev_id,
+            delta_data,
+            RevType::Local,
+            &user_id,
+            md5,
+        );
         let _ = self.rev_manager.add_local_revision(&revision).await?;
         Ok(rev_id.into())
     }
@@ -172,15 +179,15 @@ impl ClientDocEditor {
     #[tracing::instrument(level = "debug", skip(self, data), err)]
     pub(crate) async fn composing_local_delta(&self, data: Bytes) -> Result<(), FlowyError> {
         let delta = RichTextDelta::from_bytes(&data)?;
-        let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
+        let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
         let msg = EditCommand::ComposeDelta {
             delta: delta.clone(),
             ret,
         };
         let _ = self.edit_cmd_tx.send(msg);
-        let _ = rx.await.map_err(internal_error)??;
+        let md5 = rx.await.map_err(internal_error)??;
 
-        let _ = self.save_local_delta(delta).await?;
+        let _ = self.save_local_delta(delta, md5).await?;
         Ok(())
     }
 
@@ -223,40 +230,44 @@ impl ClientDocEditor {
         }
 
         // compose delta
-        let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
+        let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
         let msg = EditCommand::ComposeDelta {
             delta: client_prime.clone(),
             ret,
         };
         let _ = self.edit_cmd_tx.send(msg);
-        let _ = rx.await.map_err(internal_error)??;
+        let md5 = rx.await.map_err(internal_error)??;
 
         // update rev id
         self.rev_manager
             .update_rev_id_counter_value(server_rev_id.clone().into());
         let (local_base_rev_id, local_rev_id) = self.rev_manager.next_rev_id();
-
+        let delta_data = client_prime.to_bytes();
         // save the revision
         let user_id = self.user.user_id()?;
         let revision = Revision::new(
+            &self.doc_id,
             local_base_rev_id,
             local_rev_id,
-            client_prime.to_bytes().to_vec(),
-            &self.doc_id,
+            delta_data,
             RevType::Remote,
-            user_id,
+            &user_id,
+            md5.clone(),
         );
+
         let _ = self.rev_manager.add_remote_revision(&revision).await?;
 
         // send the server_prime delta
         let user_id = self.user.user_id()?;
+        let delta_data = server_prime.to_bytes();
         let revision = Revision::new(
+            &self.doc_id,
             local_base_rev_id,
             local_rev_id,
-            server_prime.to_bytes().to_vec(),
-            &self.doc_id,
+            delta_data,
             RevType::Remote,
-            user_id,
+            &user_id,
+            md5,
         );
         let _ = self.ws_sender.send(revision.into());
         Ok(())
@@ -324,7 +335,7 @@ fn start_sync(
 #[cfg(feature = "flowy_unit_test")]
 impl ClientDocEditor {
     pub async fn doc_json(&self) -> FlowyResult<String> {
-        let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
+        let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
         let msg = EditCommand::ReadDoc { ret };
         let _ = self.edit_cmd_tx.send(msg);
         let s = rx.await.map_err(internal_error)??;

+ 42 - 21
frontend/rust-lib/flowy-document/src/services/doc/edit/queue.rs

@@ -4,6 +4,7 @@ use flowy_collaboration::{
     core::document::{history::UndoResult, Document},
     errors::CollaborateError,
 };
+use flowy_error::FlowyError;
 use futures::stream::StreamExt;
 use lib_ot::{
     core::{Interval, OperationTransformable},
@@ -41,12 +42,15 @@ impl EditCommandQueue {
         };
         stream
             .for_each(|msg| async {
-                self.handle_message(msg).await;
+                match self.handle_message(msg).await {
+                    Ok(_) => {},
+                    Err(e) => tracing::debug!("[EditCommandQueue]: {}", e),
+                }
             })
             .await;
     }
 
-    async fn handle_message(&self, msg: EditCommand) {
+    async fn handle_message(&self, msg: EditCommand) -> Result<(), FlowyError> {
         match msg {
             EditCommand::ComposeDelta { delta, ret } => {
                 let result = self.composed_delta(delta).await;
@@ -56,36 +60,48 @@ impl EditCommandQueue {
                 let f = || async {
                     let revision = Revision::try_from(bytes)?;
                     let delta = RichTextDelta::from_bytes(&revision.delta_data)?;
-                    let rev_id: RevId = revision.rev_id.into();
-                    let (server_prime, client_prime) = self.document.read().await.delta().transform(&delta)?;
+                    let server_rev_id: RevId = revision.rev_id.into();
+                    let read_guard = self.document.read().await;
+                    let (server_prime, client_prime) = read_guard.delta().transform(&delta)?;
+                    drop(read_guard);
+
                     let transform_delta = TransformDeltas {
                         client_prime,
                         server_prime,
-                        server_rev_id: rev_id,
+                        server_rev_id,
                     };
+
                     Ok::<TransformDeltas, CollaborateError>(transform_delta)
                 };
                 let _ = ret.send(f().await);
             },
             EditCommand::Insert { index, data, ret } => {
-                let delta = self.document.write().await.insert(index, data);
-                let _ = ret.send(delta);
+                let mut write_guard = self.document.write().await;
+                let delta = write_guard.insert(index, data)?;
+                let md5 = write_guard.md5();
+                let _ = ret.send(Ok((delta, md5)));
             },
             EditCommand::Delete { interval, ret } => {
-                let result = self.document.write().await.delete(interval);
-                let _ = ret.send(result);
+                let mut write_guard = self.document.write().await;
+                let delta = write_guard.delete(interval)?;
+                let md5 = write_guard.md5();
+                let _ = ret.send(Ok((delta, md5)));
             },
             EditCommand::Format {
                 interval,
                 attribute,
                 ret,
             } => {
-                let result = self.document.write().await.format(interval, attribute);
-                let _ = ret.send(result);
+                let mut write_guard = self.document.write().await;
+                let delta = write_guard.format(interval, attribute)?;
+                let md5 = write_guard.md5();
+                let _ = ret.send(Ok((delta, md5)));
             },
             EditCommand::Replace { interval, data, ret } => {
-                let result = self.document.write().await.replace(interval, data);
-                let _ = ret.send(result);
+                let mut write_guard = self.document.write().await;
+                let delta = write_guard.replace(interval, data)?;
+                let md5 = write_guard.md5();
+                let _ = ret.send(Ok((delta, md5)));
             },
             EditCommand::CanUndo { ret } => {
                 let _ = ret.send(self.document.read().await.can_undo());
@@ -110,10 +126,11 @@ impl EditCommandQueue {
                 let _ = ret.send(Ok(delta));
             },
         }
+        Ok(())
     }
 
     #[tracing::instrument(level = "debug", skip(self, delta), fields(compose_result), err)]
-    async fn composed_delta(&self, delta: RichTextDelta) -> Result<(), CollaborateError> {
+    async fn composed_delta(&self, delta: RichTextDelta) -> Result<String, CollaborateError> {
         // tracing::debug!("{:?} thread handle_message", thread::current(),);
         let mut document = self.document.write().await;
         tracing::Span::current().record(
@@ -121,19 +138,23 @@ impl EditCommandQueue {
             &format!("doc_id:{} - {}", &self.doc_id, delta.to_json()).as_str(),
         );
 
-        let result = document.compose_delta(delta);
+        let _ = document.compose_delta(delta)?;
+        let md5 = document.md5();
         drop(document);
 
-        result
+        Ok(md5)
     }
 }
 
 pub(crate) type Ret<T> = oneshot::Sender<Result<T, CollaborateError>>;
+pub(crate) type NewDelta = (RichTextDelta, String);
+pub(crate) type DocumentMD5 = String;
+
 #[allow(dead_code)]
 pub(crate) enum EditCommand {
     ComposeDelta {
         delta: RichTextDelta,
-        ret: Ret<()>,
+        ret: Ret<DocumentMD5>,
     },
     ProcessRemoteRevision {
         bytes: Bytes,
@@ -142,22 +163,22 @@ pub(crate) enum EditCommand {
     Insert {
         index: usize,
         data: String,
-        ret: Ret<RichTextDelta>,
+        ret: Ret<NewDelta>,
     },
     Delete {
         interval: Interval,
-        ret: Ret<RichTextDelta>,
+        ret: Ret<NewDelta>,
     },
     Format {
         interval: Interval,
         attribute: RichTextAttribute,
-        ret: Ret<RichTextDelta>,
+        ret: Ret<NewDelta>,
     },
 
     Replace {
         interval: Interval,
         data: String,
-        ret: Ret<RichTextDelta>,
+        ret: Ret<NewDelta>,
     },
     CanUndo {
         ret: oneshot::Sender<bool>,

+ 8 - 5
frontend/rust-lib/flowy-document/src/services/doc/revision/cache/cache.rs

@@ -7,7 +7,8 @@ use crate::{
     },
     sql_tables::RevTableSql,
 };
-use flowy_collaboration::entities::doc::Doc;
+use bytes::Bytes;
+use flowy_collaboration::{entities::doc::Doc, util::md5};
 use flowy_database::ConnectionPool;
 use flowy_error::{internal_error, FlowyResult};
 use lib_infra::future::FutureResult;
@@ -155,14 +156,16 @@ impl RevisionCache {
 
         // The document doesn't exist in local. Try load from server
         let doc = self.server.fetch_document(&self.doc_id).await?;
-        let delta_data = doc.data.as_bytes();
+        let delta_data = Bytes::from(doc.data.clone());
+        let doc_md5 = md5(&delta_data);
         let revision = Revision::new(
+            &doc.id,
             doc.base_rev_id,
             doc.rev_id,
-            delta_data.to_owned(),
-            &doc.id,
+            delta_data,
             RevType::Remote,
-            self.user_id.clone(),
+            &self.user_id,
+            doc_md5,
         );
 
         self.add_remote_revision(revision).await?;

+ 9 - 4
frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs

@@ -5,7 +5,10 @@ use crate::{
         DocumentWebSocket,
     },
 };
-use flowy_collaboration::{entities::doc::Doc, util::RevIdCounter};
+use flowy_collaboration::{
+    entities::doc::Doc,
+    util::{md5, RevIdCounter},
+};
 use flowy_error::FlowyResult;
 use lib_infra::future::FutureResult;
 use lib_ot::{
@@ -84,13 +87,15 @@ impl RevisionManager {
         }
 
         let delta_data = new_delta.to_bytes();
+        let md5 = md5(&delta_data);
         let revision = Revision::new(
+            &self.doc_id,
             range.start,
             range.end,
-            delta_data.to_vec(),
-            &self.doc_id,
+            delta_data,
             RevType::Remote,
-            self.user_id.clone(),
+            &self.user_id,
+            md5,
         );
 
         Ok(revision)

+ 0 - 1
frontend/rust-lib/flowy-document/tests/editor/mod.rs

@@ -1,7 +1,6 @@
 #![allow(clippy::module_inception)]
 mod attribute_test;
 mod op_test;
-mod revision_test;
 mod serde_test;
 mod undo_redo_test;
 

+ 0 - 91
frontend/rust-lib/flowy-document/tests/editor/revision_test.rs

@@ -1,91 +0,0 @@
-use flowy_test::editor::{EditorScript::*, *};
-use lib_ot::revision::RevState;
-
-#[tokio::test]
-async fn doc_rev_state_test1() {
-    let scripts = vec![
-        InsertText("123", 0),
-        AssertCurrentRevId(1),
-        AssertRevisionState(1, RevState::StateLocal),
-        SimulateAckedMessage(1),
-        AssertRevisionState(1, RevState::Acked),
-        AssertNextRevId(None),
-        AssertJson(r#"[{"insert":"123\n"}]"#),
-    ];
-    EditorTest::new().await.run_scripts(scripts).await;
-}
-
-#[tokio::test]
-async fn doc_rev_state_test2() {
-    let scripts = vec![
-        InsertText("1", 0),
-        InsertText("2", 1),
-        InsertText("3", 2),
-        AssertCurrentRevId(3),
-        AssertRevisionState(1, RevState::StateLocal),
-        AssertRevisionState(2, RevState::StateLocal),
-        AssertRevisionState(3, RevState::StateLocal),
-        SimulateAckedMessage(1),
-        AssertRevisionState(1, RevState::Acked),
-        AssertNextRevId(Some(2)),
-        SimulateAckedMessage(2),
-        AssertRevisionState(2, RevState::Acked),
-        //
-        AssertNextRevId(Some(3)),
-        AssertRevisionState(3, RevState::StateLocal),
-        AssertJson(r#"[{"insert":"123\n"}]"#),
-    ];
-    EditorTest::new().await.run_scripts(scripts).await;
-}
-
-#[tokio::test]
-async fn doc_push_test() {
-    // let delta = RichTextDeltaBuilder::new().insert("abc\n").build();
-    let scripts = vec![
-        InsertText("1", 0),
-        InsertText("2", 1),
-        InsertText("3", 2),
-        AssertJson(r#"[{"insert":"123\nabc\n"}]"#),
-    ];
-    EditorTest::new().await.run_scripts(scripts).await;
-}
-
-#[tokio::test]
-async fn doc_sync_test() {
-    let scripts = vec![
-        InsertText("1", 0),
-        InsertText("2", 1),
-        InsertText("3", 2),
-        AssertJson(r#"[{"insert":"123\n"}]"#),
-        AssertNextRevId(None),
-    ];
-    EditorTest::new().await.run_scripts(scripts).await;
-}
-
-#[tokio::test]
-async fn doc_sync_lost_ws_conn() {
-    let scripts = vec![
-        InsertText("1", 0),
-        StopWs,
-        InsertText("2", 1),
-        AssertNextRevId(Some(2)),
-        InsertText("3", 2),
-        AssertJson(r#"[{"insert":"123\n"}]"#),
-    ];
-    EditorTest::new().await.run_scripts(scripts).await;
-}
-
-#[tokio::test]
-async fn doc_sync_retry_ws_conn() {
-    let scripts = vec![
-        InsertText("1", 0),
-        StopWs,
-        InsertText("2", 1),
-        InsertText("3", 2),
-        StartWs,
-        WaitSyncFinished,
-        AssertNextRevId(None),
-        AssertJson(r#"[{"insert":"123\n"}]"#),
-    ];
-    EditorTest::new().await.run_scripts(scripts).await;
-}

+ 3 - 2
frontend/rust-lib/flowy-test/Cargo.toml

@@ -10,7 +10,7 @@ flowy-sdk = { path = "../flowy-sdk"}
 flowy-user = { path = "../flowy-user"}
 flowy-net = { path = "../flowy-net"}
 flowy-core = { path = "../flowy-core", default-features = false}
-flowy-document = { path = "../flowy-document"}
+flowy-document = { path = "../flowy-document", features = ["flowy_unit_test"]}
 lib-dispatch = { path = "../lib-dispatch" }
 
 flowy-collaboration = { path = "../../../shared-lib/flowy-collaboration" }
@@ -35,4 +35,5 @@ quickcheck_macros = "0.9.1"
 fake = "~2.3.0"
 claim = "0.4.0"
 futures = "0.3.15"
-serial_test = "0.5.1"
+serial_test = "0.5.1"
+flowy-net = { path = "../flowy-net", features = ["ws_mock"] }

+ 13 - 54
frontend/rust-lib/flowy-test/src/editor.rs → frontend/rust-lib/flowy-test/src/doc_script.rs

@@ -1,15 +1,7 @@
 use crate::{helper::ViewTest, FlowySDKTest};
-use flowy_collaboration::entities::{
-    doc::DocIdentifier,
-    ws::{WsDocumentData, WsDocumentDataBuilder},
-};
+use flowy_collaboration::entities::{doc::DocIdentifier, ws::WsDocumentData};
 use flowy_document::services::doc::{edit::ClientDocEditor, revision::RevisionIterator, SYNC_INTERVAL_IN_MILLIS};
-
-use lib_ot::{
-    core::Interval,
-    revision::{RevState, RevType, Revision, RevisionRange},
-    rich_text::RichTextDelta,
-};
+use lib_ot::{core::Interval, revision::RevState, rich_text::RichTextDelta};
 use std::sync::Arc;
 use tokio::time::{sleep, Duration};
 
@@ -19,16 +11,13 @@ pub enum EditorScript {
     InsertText(&'static str, usize),
     Delete(Interval),
     Replace(Interval, &'static str),
-    Undo(),
-    Redo(),
-    WaitSyncFinished,
-    SimulatePushRevisionMessageWithDelta(RichTextDelta),
-    SimulatePullRevisionMessage(RevisionRange),
-    SimulateAckedMessage(i64),
+
     AssertRevisionState(i64, RevState),
     AssertNextRevId(Option<i64>),
     AssertCurrentRevId(i64),
     AssertJson(&'static str),
+
+    WaitSyncFinished,
 }
 
 pub struct EditorTest {
@@ -51,7 +40,7 @@ impl EditorTest {
             self.run_script(script).await;
         }
 
-        sleep(Duration::from_secs(5)).await;
+        sleep(Duration::from_secs(3)).await;
     }
 
     async fn run_script(&mut self, script: EditorScript) {
@@ -60,21 +49,20 @@ impl EditorTest {
         let _memory_cache = cache.memory_cache();
         let _disk_cache = cache.dish_cache();
         let doc_id = self.editor.doc_id.clone();
-        let user_id = self.sdk.user_session.user_id().unwrap();
+        let _user_id = self.sdk.user_session.user_id().unwrap();
         let ws_manager = self.sdk.ws_manager.clone();
         let token = self.sdk.user_session.token().unwrap();
+        let wait_millis = 2 * SYNC_INTERVAL_IN_MILLIS;
 
         match script {
             EditorScript::StartWs => {
                 ws_manager.start(token.clone()).await.unwrap();
             },
             EditorScript::StopWs => {
-                sleep(Duration::from_millis(SYNC_INTERVAL_IN_MILLIS)).await;
                 ws_manager.stop().await;
             },
             EditorScript::InsertText(s, offset) => {
                 self.editor.insert(offset, s).await.unwrap();
-                sleep(Duration::from_millis(SYNC_INTERVAL_IN_MILLIS)).await;
             },
             EditorScript::Delete(interval) => {
                 self.editor.delete(interval).await.unwrap();
@@ -82,15 +70,6 @@ impl EditorTest {
             EditorScript::Replace(interval, s) => {
                 self.editor.replace(interval, s).await.unwrap();
             },
-            EditorScript::Undo() => {
-                self.editor.undo().await.unwrap();
-            },
-            EditorScript::Redo() => {
-                self.editor.redo().await.unwrap();
-            },
-            EditorScript::WaitSyncFinished => {
-                sleep(Duration::from_millis(1000)).await;
-            },
             EditorScript::AssertRevisionState(rev_id, state) => {
                 let record = cache.query_revision(&doc_id, rev_id).await.unwrap();
                 assert_eq!(record.state, state);
@@ -108,40 +87,20 @@ impl EditorTest {
                 let next_revision = next_revision.unwrap();
                 assert_eq!(next_revision.revision.rev_id, rev_id.unwrap());
             },
-            EditorScript::SimulatePushRevisionMessageWithDelta(delta) => {
-                let local_base_rev_id = rev_manager.rev_id();
-                let local_rev_id = local_base_rev_id + 1;
-                let revision = Revision::new(
-                    local_base_rev_id,
-                    local_rev_id,
-                    delta.to_bytes().to_vec(),
-                    &doc_id,
-                    RevType::Remote,
-                    user_id,
-                );
-                let data = WsDocumentDataBuilder::build_push_rev_message(&doc_id, revision);
-                self.send_ws_message(data).await;
-            },
-            EditorScript::SimulatePullRevisionMessage(_range) => {},
-            EditorScript::SimulateAckedMessage(i64) => {
-                let data = WsDocumentDataBuilder::build_acked_message(&doc_id, i64);
-                self.send_ws_message(data).await;
-            },
             EditorScript::AssertJson(expected) => {
                 let expected_delta: RichTextDelta = serde_json::from_str(expected).unwrap();
                 let delta = self.editor.doc_delta().await.unwrap();
-
                 if expected_delta != delta {
                     eprintln!("✅ expect: {}", expected,);
                     eprintln!("❌ receive: {}", delta.to_json());
                 }
                 assert_eq!(expected_delta, delta);
             },
+            EditorScript::WaitSyncFinished => {
+                // Workaround: just wait two seconds
+                sleep(Duration::from_millis(2000)).await;
+            },
         }
-    }
-
-    async fn send_ws_message(&self, data: WsDocumentData) {
-        self.editor.handle_ws_message(data).await.unwrap();
-        sleep(Duration::from_millis(200)).await;
+        sleep(Duration::from_millis(wait_millis)).await;
     }
 }

+ 1 - 1
frontend/rust-lib/flowy-test/src/lib.rs

@@ -1,4 +1,4 @@
-pub mod editor;
+pub mod doc_script;
 pub mod event_builder;
 pub mod helper;
 

+ 1 - 0
frontend/rust-lib/flowy-test/tests/main.rs

@@ -0,0 +1 @@
+mod revision_test;

+ 44 - 0
frontend/rust-lib/flowy-test/tests/revision_test.rs

@@ -0,0 +1,44 @@
+use flowy_test::doc_script::{EditorScript::*, *};
+use lib_ot::revision::RevState;
+
+#[tokio::test]
+async fn doc_sync_test() {
+    let scripts = vec![
+        InsertText("1", 0),
+        InsertText("2", 1),
+        InsertText("3", 2),
+        AssertJson(r#"[{"insert":"123\n"}]"#),
+        AssertNextRevId(None),
+    ];
+    EditorTest::new().await.run_scripts(scripts).await;
+}
+
+#[tokio::test]
+async fn doc_sync_lost_ws_conn() {
+    let scripts = vec![
+        InsertText("1", 0),
+        StopWs,
+        InsertText("2", 1),
+        InsertText("3", 2),
+        AssertNextRevId(Some(2)),
+        AssertJson(r#"[{"insert":"123\n"}]"#),
+    ];
+    EditorTest::new().await.run_scripts(scripts).await;
+}
+
+#[tokio::test]
+async fn doc_sync_retry_ws_conn() {
+    let scripts = vec![
+        InsertText("1", 0),
+        StopWs,
+        InsertText("2", 1),
+        InsertText("3", 2),
+        StartWs,
+        WaitSyncFinished,
+        AssertRevisionState(2, RevState::Acked),
+        AssertRevisionState(3, RevState::Acked),
+        AssertNextRevId(None),
+        AssertJson(r#"[{"insert":"123\n"}]"#),
+    ];
+    EditorTest::new().await.run_scripts(scripts).await;
+}

+ 6 - 0
shared-lib/flowy-collaboration/src/core/document/document.rs

@@ -62,6 +62,12 @@ impl Document {
 
     pub fn delta(&self) -> &RichTextDelta { &self.delta }
 
+    pub fn md5(&self) -> String {
+        // Opti: calculate the md5 of delta would cause performance issues
+        let bytes = self.to_bytes();
+        format!("{:x}", md5::compute(bytes))
+    }
+
     pub fn set_notify(&mut self, notify: mpsc::UnboundedSender<()>) { self.notify = Some(notify); }
 
     pub fn set_delta(&mut self, data: RichTextDelta) {

+ 0 - 1
shared-lib/flowy-collaboration/src/core/document/extensions/format/mod.rs

@@ -5,4 +5,3 @@ pub use resolve_inline_format::*;
 mod format_at_position;
 mod resolve_block_format;
 mod resolve_inline_format;
-

+ 2 - 3
shared-lib/flowy-collaboration/src/core/document/extensions/format/resolve_block_format.rs

@@ -1,13 +1,12 @@
 use lib_ot::{
     core::{DeltaBuilder, DeltaIter, Interval},
-    rich_text::{AttributeScope, plain_attributes, RichTextAttribute, RichTextDelta},
+    rich_text::{plain_attributes, AttributeScope, RichTextAttribute, RichTextDelta},
 };
 
 use crate::{
-    core::document::FormatExt,
+    core::document::{extensions::helper::line_break, FormatExt},
     util::find_newline,
 };
-use crate::core::document::extensions::helper::line_break;
 
 pub struct ResolveBlockFormat {}
 impl FormatExt for ResolveBlockFormat {

+ 1 - 2
shared-lib/flowy-collaboration/src/core/document/extensions/format/resolve_inline_format.rs

@@ -4,10 +4,9 @@ use lib_ot::{
 };
 
 use crate::{
-    core::document::FormatExt,
+    core::document::{extensions::helper::line_break, FormatExt},
     util::find_newline,
 };
-use crate::core::document::extensions::helper::line_break;
 
 pub struct ResolveInlineFormat {}
 impl FormatExt for ResolveInlineFormat {

+ 1 - 1
shared-lib/flowy-collaboration/src/core/document/extensions/mod.rs

@@ -8,8 +8,8 @@ use lib_ot::{
 
 mod delete;
 mod format;
-mod insert;
 mod helper;
+mod insert;
 
 pub type InsertExtension = Box<dyn InsertExt + Send + Sync>;
 pub type FormatExtension = Box<dyn FormatExt + Send + Sync>;

+ 1 - 1
shared-lib/flowy-derive/src/proto_buf/deserialize.rs

@@ -52,7 +52,7 @@ fn token_stream_for_one_of(ctxt: &Ctxt, field: &ASTField) -> Option<TokenStream>
         Err(e) => {
             eprintln!("token_stream_for_one_of failed: {:?} with error: {}", member, e);
             panic!();
-        }
+        },
     }?;
     let bracketed_ty_info = ty_info.bracket_ty_info.as_ref().as_ref();
 

+ 14 - 10
shared-lib/lib-ot/src/revision/model.rs

@@ -1,4 +1,5 @@
 use crate::rich_text::RichTextDelta;
+use bytes::Bytes;
 use flowy_derive::{ProtoBuf, ProtoBuf_Enum};
 use std::{fmt::Formatter, ops::RangeInclusive};
 
@@ -52,17 +53,20 @@ impl std::fmt::Debug for Revision {
 }
 
 impl Revision {
-    pub fn new<T1, T2, D>(base_rev_id: T1, rev_id: T2, delta: D, doc_id: &str, ty: RevType, user_id: String) -> Revision
-    where
-        T1: Into<i64>,
-        T2: Into<i64>,
-        D: AsRef<[u8]>,
-    {
-        let md5 = md5(&delta);
+    pub fn new(
+        doc_id: &str,
+        base_rev_id: i64,
+        rev_id: i64,
+        delta_data: Bytes,
+        ty: RevType,
+        user_id: &str,
+        md5: String,
+    ) -> Revision {
         let doc_id = doc_id.to_owned();
-        let delta_data = delta.as_ref().to_vec();
-        let base_rev_id = base_rev_id.into();
-        let rev_id = rev_id.into();
+        let delta_data = delta_data.to_vec();
+        let base_rev_id = base_rev_id;
+        let rev_id = rev_id;
+        let user_id = user_id.to_owned();
 
         if base_rev_id != 0 {
             debug_assert!(base_rev_id != rev_id);

+ 1 - 0
shared-lib/lib-ot/tests/main.rs

@@ -0,0 +1 @@
+