Pārlūkot izejas kodu

Saving the revision in serial

appflowy 3 gadi atpakaļ
vecāks
revīzija
a488944d2c

+ 2 - 0
backend/src/services/document/persistence.rs

@@ -17,6 +17,7 @@ use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta};
 use protobuf::Message;
 
 use flowy_collaboration::sync::ServerDocumentManager;
+use lib_ot::core::trim;
 use std::sync::Arc;
 use uuid::Uuid;
 
@@ -204,6 +205,7 @@ fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevisionPB) -> R
         let delta = RichTextDelta::from_bytes(revision.delta_data).map_err(internal_error)?;
         document_delta = document_delta.compose(&delta).map_err(internal_error)?;
     }
+
     let text = document_delta.to_json();
     let mut document_info = DocumentInfo::new();
     document_info.set_doc_id(doc_id.to_owned());

+ 1 - 1
frontend/rust-lib/flowy-core/src/services/view/controller.rs

@@ -216,7 +216,7 @@ impl ViewController {
     }
 
     pub(crate) async fn receive_document_delta(&self, params: DocumentDelta) -> Result<DocumentDelta, FlowyError> {
-        let doc = self.document_ctx.controller.apply_document_delta(params).await?;
+        let doc = self.document_ctx.controller.receive_local_delta(params).await?;
         Ok(doc)
     }
 

+ 2 - 3
frontend/rust-lib/flowy-document/src/controller.rs

@@ -79,7 +79,7 @@ impl DocumentController {
     }
 
     #[tracing::instrument(level = "debug", skip(self, delta), fields(doc_id = %delta.doc_id), err)]
-    pub async fn apply_document_delta(&self, delta: DocumentDelta) -> Result<DocumentDelta, FlowyError> {
+    pub async fn receive_local_delta(&self, delta: DocumentDelta) -> Result<DocumentDelta, FlowyError> {
         let editor = self.get_editor(&delta.doc_id).await?;
         let _ = editor.compose_local_delta(Bytes::from(delta.delta_json)).await?;
         let document_json = editor.document_json().await?;
@@ -121,8 +121,7 @@ impl DocumentController {
             token,
             server: self.server.clone(),
         });
-        let doc_editor =
-            ClientDocumentEditor::new(doc_id, user, pool, rev_manager, self.ws_sender.clone(), server).await?;
+        let doc_editor = ClientDocumentEditor::new(doc_id, user, rev_manager, self.ws_sender.clone(), server).await?;
         self.ws_receivers.add(doc_id, doc_editor.ws_handler());
         self.open_cache.insert(&doc_id, &doc_editor);
         Ok(doc_editor)

+ 28 - 47
frontend/rust-lib/flowy-document/src/core/edit/editor.rs

@@ -7,12 +7,7 @@ use crate::{
     errors::FlowyError,
 };
 use bytes::Bytes;
-use flowy_collaboration::{
-    document::history::UndoResult,
-    entities::revision::{RevId, Revision},
-    errors::CollaborateResult,
-};
-use flowy_database::ConnectionPool;
+use flowy_collaboration::errors::CollaborateResult;
 use flowy_error::{internal_error, FlowyResult};
 use lib_ot::{
     core::Interval,
@@ -26,24 +21,22 @@ pub struct ClientDocumentEditor {
     rev_manager: Arc<RevisionManager>,
     ws_manager: Arc<dyn DocumentWebSocketManager>,
     edit_queue: UnboundedSender<EditorCommand>,
-    user: Arc<dyn DocumentUser>,
 }
 
 impl ClientDocumentEditor {
     pub(crate) async fn new(
         doc_id: &str,
         user: Arc<dyn DocumentUser>,
-        pool: Arc<ConnectionPool>,
         mut rev_manager: RevisionManager,
         ws: Arc<dyn DocumentWebSocket>,
         server: Arc<dyn RevisionServer>,
     ) -> FlowyResult<Arc<Self>> {
         let delta = rev_manager.load_document(server).await?;
-        let edit_queue = spawn_edit_queue(doc_id, delta, pool.clone());
+        let rev_manager = Arc::new(rev_manager);
         let doc_id = doc_id.to_string();
         let user_id = user.user_id()?;
-        let rev_manager = Arc::new(rev_manager);
 
+        let edit_queue = spawn_edit_queue(user, rev_manager.clone(), delta);
         let ws_manager = make_document_ws_manager(
             doc_id.clone(),
             user_id.clone(),
@@ -57,56 +50,51 @@ impl ClientDocumentEditor {
             rev_manager,
             ws_manager,
             edit_queue,
-            user,
         });
         Ok(editor)
     }
 
     pub async fn insert<T: ToString>(&self, index: usize, data: T) -> Result<(), FlowyError> {
-        let (ret, rx) = oneshot::channel::<CollaborateResult<NewDelta>>();
+        let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
         let msg = EditorCommand::Insert {
             index,
             data: data.to_string(),
             ret,
         };
         let _ = self.edit_queue.send(msg);
-        let (delta, md5) = rx.await.map_err(internal_error)??;
-        let _ = self.save_local_delta(delta, md5).await?;
+        let _ = rx.await.map_err(internal_error)??;
         Ok(())
     }
 
     pub async fn delete(&self, interval: Interval) -> Result<(), FlowyError> {
-        let (ret, rx) = oneshot::channel::<CollaborateResult<NewDelta>>();
+        let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
         let msg = EditorCommand::Delete { interval, ret };
         let _ = self.edit_queue.send(msg);
-        let (delta, md5) = rx.await.map_err(internal_error)??;
-        let _ = self.save_local_delta(delta, md5).await?;
+        let _ = rx.await.map_err(internal_error)??;
         Ok(())
     }
 
     pub async fn format(&self, interval: Interval, attribute: RichTextAttribute) -> Result<(), FlowyError> {
-        let (ret, rx) = oneshot::channel::<CollaborateResult<NewDelta>>();
+        let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
         let msg = EditorCommand::Format {
             interval,
             attribute,
             ret,
         };
         let _ = self.edit_queue.send(msg);
-        let (delta, md5) = rx.await.map_err(internal_error)??;
-        let _ = self.save_local_delta(delta, md5).await?;
+        let _ = rx.await.map_err(internal_error)??;
         Ok(())
     }
 
     pub async fn replace<T: ToString>(&self, interval: Interval, data: T) -> Result<(), FlowyError> {
-        let (ret, rx) = oneshot::channel::<CollaborateResult<NewDelta>>();
+        let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
         let msg = EditorCommand::Replace {
             interval,
             data: data.to_string(),
             ret,
         };
         let _ = self.edit_queue.send(msg);
-        let (delta, md5) = rx.await.map_err(internal_error)??;
-        let _ = self.save_local_delta(delta, md5).await?;
+        let _ = rx.await.map_err(internal_error)??;
         Ok(())
     }
 
@@ -124,20 +112,20 @@ impl ClientDocumentEditor {
         rx.await.unwrap_or(false)
     }
 
-    pub async fn undo(&self) -> Result<UndoResult, FlowyError> {
-        let (ret, rx) = oneshot::channel::<CollaborateResult<UndoResult>>();
+    pub async fn undo(&self) -> Result<(), FlowyError> {
+        let (ret, rx) = oneshot::channel();
         let msg = EditorCommand::Undo { ret };
         let _ = self.edit_queue.send(msg);
-        let r = rx.await.map_err(internal_error)??;
-        Ok(r)
+        let _ = rx.await.map_err(internal_error)??;
+        Ok(())
     }
 
-    pub async fn redo(&self) -> Result<UndoResult, FlowyError> {
-        let (ret, rx) = oneshot::channel::<CollaborateResult<UndoResult>>();
+    pub async fn redo(&self) -> Result<(), FlowyError> {
+        let (ret, rx) = oneshot::channel();
         let msg = EditorCommand::Redo { ret };
         let _ = self.edit_queue.send(msg);
-        let r = rx.await.map_err(internal_error)??;
-        Ok(r)
+        let _ = rx.await.map_err(internal_error)??;
+        Ok(())
     }
 
     pub async fn document_json(&self) -> FlowyResult<String> {
@@ -148,27 +136,16 @@ impl ClientDocumentEditor {
         Ok(json)
     }
 
-    async fn save_local_delta(&self, delta: RichTextDelta, md5: String) -> Result<RevId, FlowyError> {
-        let delta_data = delta.to_bytes();
-        let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
-        let user_id = self.user.user_id()?;
-        let revision = Revision::new(&self.doc_id, base_rev_id, rev_id, delta_data, &user_id, md5);
-        let _ = self.rev_manager.add_local_revision(&revision).await?;
-        Ok(rev_id.into())
-    }
-
     #[tracing::instrument(level = "debug", skip(self, data), err)]
     pub(crate) async fn compose_local_delta(&self, data: Bytes) -> Result<(), FlowyError> {
         let delta = RichTextDelta::from_bytes(&data)?;
-        let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
-        let msg = EditorCommand::ComposeDelta {
+        let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
+        let msg = EditorCommand::ComposeLocalDelta {
             delta: delta.clone(),
             ret,
         };
         let _ = self.edit_queue.send(msg);
-        let md5 = rx.await.map_err(internal_error)??;
-
-        let _ = self.save_local_delta(delta, md5).await?;
+        let _ = rx.await.map_err(internal_error)??;
         Ok(())
     }
 
@@ -178,9 +155,13 @@ impl ClientDocumentEditor {
     pub(crate) fn ws_handler(&self) -> Arc<dyn DocumentWSReceiver> { self.ws_manager.receiver() }
 }
 
-fn spawn_edit_queue(doc_id: &str, delta: RichTextDelta, _pool: Arc<ConnectionPool>) -> UnboundedSender<EditorCommand> {
+fn spawn_edit_queue(
+    user: Arc<dyn DocumentUser>,
+    rev_manager: Arc<RevisionManager>,
+    delta: RichTextDelta,
+) -> UnboundedSender<EditorCommand> {
     let (sender, receiver) = mpsc::unbounded_channel::<EditorCommand>();
-    let actor = EditorCommandQueue::new(doc_id, delta, receiver);
+    let actor = EditorCommandQueue::new(user, rev_manager, delta, receiver);
     tokio::spawn(actor.run());
     sender
 }

+ 152 - 49
frontend/rust-lib/flowy-document/src/core/edit/queue.rs

@@ -1,8 +1,8 @@
+use crate::{context::DocumentUser, core::RevisionManager};
 use async_stream::stream;
-
 use flowy_collaboration::{
     document::{history::UndoResult, Document, NewlineDoc},
-    entities::revision::Revision,
+    entities::revision::{RepeatedRevision, RevId, Revision},
     errors::CollaborateError,
     util::make_delta_from_revisions,
 };
@@ -16,18 +16,24 @@ use std::sync::Arc;
 use tokio::sync::{mpsc, oneshot, RwLock};
 
 pub(crate) struct EditorCommandQueue {
-    #[allow(dead_code)]
-    doc_id: String,
     document: Arc<RwLock<Document>>,
+    user: Arc<dyn DocumentUser>,
+    rev_manager: Arc<RevisionManager>,
     receiver: Option<mpsc::UnboundedReceiver<EditorCommand>>,
 }
 
 impl EditorCommandQueue {
-    pub(crate) fn new(doc_id: &str, delta: RichTextDelta, receiver: mpsc::UnboundedReceiver<EditorCommand>) -> Self {
+    pub(crate) fn new(
+        user: Arc<dyn DocumentUser>,
+        rev_manager: Arc<RevisionManager>,
+        delta: RichTextDelta,
+        receiver: mpsc::UnboundedReceiver<EditorCommand>,
+    ) -> Self {
         let document = Arc::new(RwLock::new(Document::from_delta(delta)));
         Self {
-            doc_id: doc_id.to_owned(),
             document,
+            user,
+            rev_manager,
             receiver: Some(receiver),
         }
     }
@@ -43,8 +49,8 @@ impl EditorCommandQueue {
             }
         };
         stream
-            .for_each(|msg| async {
-                match self.handle_message(msg).await {
+            .for_each(|command| async {
+                match self.handle_command(command).await {
                     Ok(_) => {},
                     Err(e) => tracing::debug!("[EditCommandQueue]: {}", e),
                 }
@@ -52,30 +58,55 @@ impl EditorCommandQueue {
             .await;
     }
 
-    async fn handle_message(&self, msg: EditorCommand) -> Result<(), FlowyError> {
-        match msg {
-            EditorCommand::ComposeDelta { delta, ret } => {
-                let fut = || async {
-                    let mut document = self.document.write().await;
-                    let _ = document.compose_delta(delta)?;
-                    let md5 = document.md5();
-                    drop(document);
-
-                    Ok::<String, CollaborateError>(md5)
-                };
+    #[tracing::instrument(level = "debug", skip(self), err)]
+    async fn handle_command(&self, command: EditorCommand) -> Result<(), FlowyError> {
+        match command {
+            EditorCommand::ComposeLocalDelta { delta, ret } => {
+                let mut document = self.document.write().await;
+                let _ = document.compose_delta(delta.clone())?;
+                let md5 = document.md5();
+                drop(document);
+                let _ = self.save_local_delta(delta, md5).await?;
+                let _ = ret.send(Ok(()));
+            },
+            EditorCommand::ComposeRemoteDelta {
+                revisions,
+                client_delta,
+                server_delta,
+                ret,
+            } => {
+                let mut document = self.document.write().await;
+                let _ = document.compose_delta(client_delta.clone())?;
+                let md5 = document.md5();
+                for revision in &revisions {
+                    let _ = self.rev_manager.add_remote_revision(revision).await?;
+                }
 
-                let _ = ret.send(fut().await);
+                let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
+                let doc_id = self.rev_manager.doc_id.clone();
+                let user_id = self.user.user_id()?;
+                let (client_revision, server_revision) = make_client_and_server_revision(
+                    &doc_id,
+                    &user_id,
+                    base_rev_id,
+                    rev_id,
+                    client_delta,
+                    Some(server_delta),
+                    md5,
+                );
+                let _ = self.rev_manager.add_remote_revision(&client_revision).await?;
+                let _ = ret.send(Ok(server_revision));
             },
-            EditorCommand::OverrideDelta { delta, ret } => {
-                let fut = || async {
-                    let mut document = self.document.write().await;
-                    let _ = document.set_delta(delta);
-                    let md5 = document.md5();
-                    drop(document);
-                    Ok::<String, CollaborateError>(md5)
-                };
+            EditorCommand::OverrideDelta { revisions, delta, ret } => {
+                let mut document = self.document.write().await;
+                let _ = document.set_delta(delta);
+                let md5 = document.md5();
+                drop(document);
 
-                let _ = ret.send(fut().await);
+                let repeated_revision = RepeatedRevision::new(revisions);
+                assert_eq!(repeated_revision.last().unwrap().md5, md5);
+                let _ = self.rev_manager.reset_document(repeated_revision).await?;
+                let _ = ret.send(Ok(()));
             },
             EditorCommand::TransformRevision { revisions, ret } => {
                 let f = || async {
@@ -104,13 +135,15 @@ impl EditorCommandQueue {
                 let mut write_guard = self.document.write().await;
                 let delta = write_guard.insert(index, data)?;
                 let md5 = write_guard.md5();
-                let _ = ret.send(Ok((delta, md5)));
+                let _ = self.save_local_delta(delta, md5).await?;
+                let _ = ret.send(Ok(()));
             },
             EditorCommand::Delete { interval, ret } => {
                 let mut write_guard = self.document.write().await;
                 let delta = write_guard.delete(interval)?;
                 let md5 = write_guard.md5();
-                let _ = ret.send(Ok((delta, md5)));
+                let _ = self.save_local_delta(delta, md5).await?;
+                let _ = ret.send(Ok(()));
             },
             EditorCommand::Format {
                 interval,
@@ -120,13 +153,15 @@ impl EditorCommandQueue {
                 let mut write_guard = self.document.write().await;
                 let delta = write_guard.format(interval, attribute)?;
                 let md5 = write_guard.md5();
-                let _ = ret.send(Ok((delta, md5)));
+                let _ = self.save_local_delta(delta, md5).await?;
+                let _ = ret.send(Ok(()));
             },
             EditorCommand::Replace { interval, data, ret } => {
                 let mut write_guard = self.document.write().await;
                 let delta = write_guard.replace(interval, data)?;
                 let md5 = write_guard.md5();
-                let _ = ret.send(Ok((delta, md5)));
+                let _ = self.save_local_delta(delta, md5).await?;
+                let _ = ret.send(Ok(()));
             },
             EditorCommand::CanUndo { ret } => {
                 let _ = ret.send(self.document.read().await.can_undo());
@@ -135,12 +170,18 @@ impl EditorCommandQueue {
                 let _ = ret.send(self.document.read().await.can_redo());
             },
             EditorCommand::Undo { ret } => {
-                let result = self.document.write().await.undo();
-                let _ = ret.send(result);
+                let mut write_guard = self.document.write().await;
+                let UndoResult { delta } = write_guard.undo()?;
+                let md5 = write_guard.md5();
+                let _ = self.save_local_delta(delta, md5).await?;
+                let _ = ret.send(Ok(()));
             },
             EditorCommand::Redo { ret } => {
-                let result = self.document.write().await.redo();
-                let _ = ret.send(result);
+                let mut write_guard = self.document.write().await;
+                let UndoResult { delta } = write_guard.redo()?;
+                let md5 = write_guard.md5();
+                let _ = self.save_local_delta(delta, md5).await?;
+                let _ = ret.send(Ok(()));
             },
             EditorCommand::ReadDoc { ret } => {
                 let data = self.document.read().await.to_json();
@@ -153,21 +194,62 @@ impl EditorCommandQueue {
         }
         Ok(())
     }
+
+    async fn save_local_delta(&self, delta: RichTextDelta, md5: String) -> Result<RevId, FlowyError> {
+        let delta_data = delta.to_bytes();
+        let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
+        let user_id = self.user.user_id()?;
+        let revision = Revision::new(&self.rev_manager.doc_id, base_rev_id, rev_id, delta_data, &user_id, md5);
+        let _ = self.rev_manager.add_local_revision(&revision).await?;
+        Ok(rev_id.into())
+    }
+}
+
+fn make_client_and_server_revision(
+    doc_id: &str,
+    user_id: &str,
+    base_rev_id: i64,
+    rev_id: i64,
+    client_delta: RichTextDelta,
+    server_delta: Option<RichTextDelta>,
+    md5: DocumentMD5,
+) -> (Revision, Option<Revision>) {
+    let client_revision = Revision::new(
+        &doc_id,
+        base_rev_id,
+        rev_id,
+        client_delta.to_bytes(),
+        &user_id,
+        md5.clone(),
+    );
+
+    match server_delta {
+        None => (client_revision, None),
+        Some(server_delta) => {
+            let server_revision = Revision::new(&doc_id, base_rev_id, rev_id, server_delta.to_bytes(), &user_id, md5);
+            (client_revision, Some(server_revision))
+        },
+    }
 }
 
 pub(crate) type Ret<T> = oneshot::Sender<Result<T, CollaborateError>>;
-pub(crate) type NewDelta = (RichTextDelta, String);
 pub(crate) type DocumentMD5 = String;
 
-#[allow(dead_code)]
 pub(crate) enum EditorCommand {
-    ComposeDelta {
+    ComposeLocalDelta {
         delta: RichTextDelta,
-        ret: Ret<DocumentMD5>,
+        ret: Ret<()>,
+    },
+    ComposeRemoteDelta {
+        revisions: Vec<Revision>,
+        client_delta: RichTextDelta,
+        server_delta: RichTextDelta,
+        ret: Ret<Option<Revision>>,
     },
     OverrideDelta {
+        revisions: Vec<Revision>,
         delta: RichTextDelta,
-        ret: Ret<DocumentMD5>,
+        ret: Ret<()>,
     },
     TransformRevision {
         revisions: Vec<Revision>,
@@ -176,22 +258,21 @@ pub(crate) enum EditorCommand {
     Insert {
         index: usize,
         data: String,
-        ret: Ret<NewDelta>,
+        ret: Ret<()>,
     },
     Delete {
         interval: Interval,
-        ret: Ret<NewDelta>,
+        ret: Ret<()>,
     },
     Format {
         interval: Interval,
         attribute: RichTextAttribute,
-        ret: Ret<NewDelta>,
+        ret: Ret<()>,
     },
-
     Replace {
         interval: Interval,
         data: String,
-        ret: Ret<NewDelta>,
+        ret: Ret<()>,
     },
     CanUndo {
         ret: oneshot::Sender<bool>,
@@ -200,10 +281,10 @@ pub(crate) enum EditorCommand {
         ret: oneshot::Sender<bool>,
     },
     Undo {
-        ret: Ret<UndoResult>,
+        ret: Ret<()>,
     },
     Redo {
-        ret: Ret<UndoResult>,
+        ret: Ret<()>,
     },
     ReadDoc {
         ret: Ret<String>,
@@ -213,6 +294,28 @@ pub(crate) enum EditorCommand {
     },
 }
 
+impl std::fmt::Debug for EditorCommand {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        let s = match self {
+            EditorCommand::ComposeLocalDelta { .. } => "ComposeLocalDelta",
+            EditorCommand::ComposeRemoteDelta { .. } => "ComposeRemoteDelta",
+            EditorCommand::OverrideDelta { .. } => "OverrideDelta",
+            EditorCommand::TransformRevision { .. } => "TransformRevision",
+            EditorCommand::Insert { .. } => "Insert",
+            EditorCommand::Delete { .. } => "Delete",
+            EditorCommand::Format { .. } => "Format",
+            EditorCommand::Replace { .. } => "Replace",
+            EditorCommand::CanUndo { .. } => "CanUndo",
+            EditorCommand::CanRedo { .. } => "CanRedo",
+            EditorCommand::Undo { .. } => "Undo",
+            EditorCommand::Redo { .. } => "Redo",
+            EditorCommand::ReadDoc { .. } => "ReadDoc",
+            EditorCommand::ReadDocDelta { .. } => "ReadDocDelta",
+        };
+        f.write_str(s)
+    }
+}
+
 pub(crate) struct TransformDeltas {
     pub client_prime: RichTextDelta,
     pub server_prime: Option<RichTextDelta>,

+ 1 - 0
frontend/rust-lib/flowy-document/src/core/mod.rs

@@ -1,6 +1,7 @@
 pub mod edit;
 pub mod revision;
 mod web_socket;
+
 pub use crate::ws_receivers::*;
 pub use edit::*;
 pub use revision::*;

+ 1 - 0
frontend/rust-lib/flowy-document/src/core/revision/mod.rs

@@ -2,6 +2,7 @@ mod cache;
 mod disk;
 mod manager;
 mod memory;
+mod snapshot;
 
 pub use cache::*;
 pub use manager::*;

+ 1 - 0
frontend/rust-lib/flowy-document/src/core/revision/snapshot.rs

@@ -0,0 +1 @@
+

+ 24 - 114
frontend/rust-lib/flowy-document/src/core/web_socket/web_socket.rs

@@ -1,6 +1,5 @@
 use crate::core::{
     web_socket::{DocumentWSSinkDataProvider, DocumentWSSteamConsumer, HttpWebSocketManager},
-    DocumentMD5,
     DocumentWSReceiver,
     DocumentWebSocket,
     EditorCommand,
@@ -20,7 +19,7 @@ use lib_infra::future::FutureResult;
 
 use crate::core::web_socket::local_ws_impl::LocalWebSocketManager;
 use flowy_collaboration::entities::ws::DocumentServerWSDataType;
-use lib_ot::rich_text::RichTextDelta;
+
 use lib_ws::WSConnectState;
 use std::{collections::VecDeque, convert::TryFrom, sync::Arc};
 use tokio::sync::{broadcast, mpsc::UnboundedSender, oneshot, RwLock};
@@ -33,7 +32,7 @@ pub(crate) trait DocumentWebSocketManager: Send + Sync {
 pub(crate) async fn make_document_ws_manager(
     doc_id: String,
     user_id: String,
-    editor_edit_queue: UnboundedSender<EditorCommand>,
+    edit_cmd_tx: UnboundedSender<EditorCommand>,
     rev_manager: Arc<RevisionManager>,
     ws: Arc<dyn DocumentWebSocket>,
 ) -> Arc<dyn DocumentWebSocketManager> {
@@ -42,52 +41,24 @@ pub(crate) async fn make_document_ws_manager(
         let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
             doc_id: doc_id.clone(),
             user_id: user_id.clone(),
-            editor_edit_queue: editor_edit_queue.clone(),
+            edit_cmd_tx,
             rev_manager: rev_manager.clone(),
             shared_sink: shared_sink.clone(),
         });
-        let ws_stream_provider = DocumentWSSinkDataProviderAdapter(shared_sink.clone());
+        let ws_stream_provider = DocumentWSSinkDataProviderAdapter(shared_sink);
         let ws_manager = Arc::new(HttpWebSocketManager::new(
             &doc_id,
             ws.clone(),
             Arc::new(ws_stream_provider),
             ws_stream_consumer,
         ));
-        notify_user_has_connected(&user_id, &doc_id, rev_manager.clone(), shared_sink).await;
-        listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager.clone());
-
+        listen_document_ws_state(&user_id, &doc_id, ws_manager.scribe_state(), rev_manager);
         Arc::new(ws_manager)
     } else {
         Arc::new(Arc::new(LocalWebSocketManager {}))
     }
 }
 
-async fn notify_user_has_connected(
-    _user_id: &str,
-    _doc_id: &str,
-    _rev_manager: Arc<RevisionManager>,
-    _shared_sink: Arc<SharedWSSinkDataProvider>,
-) {
-    // let need_notify = match shared_sink.front().await {
-    //     None => true,
-    //     Some(data) => data.ty != DocumentClientWSDataType::UserConnect,
-    // };
-    //
-    // if need_notify {
-    //     let revision_data: Bytes =
-    // rev_manager.latest_revision().await.try_into().unwrap();
-    //     let new_connect = NewDocumentUser {
-    //         user_id: user_id.to_owned(),
-    //         doc_id: doc_id.to_owned(),
-    //         revision_data: revision_data.to_vec(),
-    //     };
-    //
-    //     let data =
-    // DocumentWSDataBuilder::build_new_document_user_message(doc_id,
-    // new_connect);     shared_sink.push_front(data).await;
-    // }
-}
-
 fn listen_document_ws_state(
     _user_id: &str,
     _doc_id: &str,
@@ -109,22 +80,19 @@ fn listen_document_ws_state(
 pub(crate) struct DocumentWebSocketSteamConsumerAdapter {
     pub(crate) doc_id: String,
     pub(crate) user_id: String,
-    pub(crate) editor_edit_queue: UnboundedSender<EditorCommand>,
+    pub(crate) edit_cmd_tx: UnboundedSender<EditorCommand>,
     pub(crate) rev_manager: Arc<RevisionManager>,
     pub(crate) shared_sink: Arc<SharedWSSinkDataProvider>,
 }
 
 impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
     fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> {
-        let user_id = self.user_id.clone();
         let rev_manager = self.rev_manager.clone();
-        let edit_cmd_tx = self.editor_edit_queue.clone();
+        let edit_cmd_tx = self.edit_cmd_tx.clone();
         let shared_sink = self.shared_sink.clone();
         let doc_id = self.doc_id.clone();
         FutureResult::new(async move {
-            if let Some(server_composed_revision) =
-                handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await?
-            {
+            if let Some(server_composed_revision) = handle_remote_revision(edit_cmd_tx, rev_manager, bytes).await? {
                 let data = DocumentClientWSData::from_revisions(&doc_id, vec![server_composed_revision]);
                 shared_sink.push_back(data).await;
             }
@@ -177,58 +145,8 @@ async fn transform_pushed_revisions(
     Ok(transformed_delta)
 }
 
-async fn compose_pushed_delta(
-    delta: RichTextDelta,
-    edit_cmd: &UnboundedSender<EditorCommand>,
-) -> FlowyResult<DocumentMD5> {
-    // compose delta
-    let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
-    let _ = edit_cmd.send(EditorCommand::ComposeDelta { delta, ret });
-    let md5 = rx.await.map_err(internal_error)??;
-    Ok(md5)
-}
-
-async fn override_client_delta(
-    delta: RichTextDelta,
-    edit_cmd: &UnboundedSender<EditorCommand>,
-) -> FlowyResult<DocumentMD5> {
-    let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
-    let _ = edit_cmd.send(EditorCommand::OverrideDelta { delta, ret });
-    let md5 = rx.await.map_err(internal_error)??;
-    Ok(md5)
-}
-
-async fn make_client_and_server_revision(
-    doc_id: &str,
-    user_id: &str,
-    base_rev_id: i64,
-    rev_id: i64,
-    client_delta: RichTextDelta,
-    server_delta: Option<RichTextDelta>,
-    md5: DocumentMD5,
-) -> (Revision, Option<Revision>) {
-    let client_revision = Revision::new(
-        &doc_id,
-        base_rev_id,
-        rev_id,
-        client_delta.to_bytes(),
-        &user_id,
-        md5.clone(),
-    );
-
-    match server_delta {
-        None => (client_revision, None),
-        Some(server_delta) => {
-            let server_revision = Revision::new(&doc_id, base_rev_id, rev_id, server_delta.to_bytes(), &user_id, md5);
-            (client_revision, Some(server_revision))
-        },
-    }
-}
-
 #[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))]
-pub(crate) async fn handle_push_rev(
-    doc_id: &str,
-    user_id: &str,
+pub(crate) async fn handle_remote_revision(
     edit_cmd_tx: UnboundedSender<EditorCommand>,
     rev_manager: Arc<RevisionManager>,
     bytes: Bytes,
@@ -259,32 +177,24 @@ pub(crate) async fn handle_push_rev(
         None => {
             // The server_prime is None means the client local revisions conflict with the
             // server, and it needs to override the client delta.
-            let md5 = override_client_delta(client_prime.clone(), &edit_cmd_tx).await?;
-            let repeated_revision = RepeatedRevision::new(revisions);
-            assert_eq!(repeated_revision.last().unwrap().md5, md5);
-            let _ = rev_manager.reset_document(repeated_revision).await?;
+            let (ret, rx) = oneshot::channel();
+            let _ = edit_cmd_tx.send(EditorCommand::OverrideDelta {
+                revisions,
+                delta: client_prime,
+                ret,
+            });
+            let _ = rx.await.map_err(internal_error)??;
             Ok(None)
         },
         Some(server_prime) => {
-            let md5 = compose_pushed_delta(client_prime.clone(), &edit_cmd_tx).await?;
-            for revision in &revisions {
-                let _ = rev_manager.add_remote_revision(revision).await?;
-            }
-            let (base_rev_id, rev_id) = rev_manager.next_rev_id_pair();
-            let (client_revision, server_revision) = make_client_and_server_revision(
-                doc_id,
-                user_id,
-                base_rev_id,
-                rev_id,
-                client_prime,
-                Some(server_prime),
-                md5,
-            )
-            .await;
-
-            // save the client revision
-            let _ = rev_manager.add_remote_revision(&client_revision).await?;
-            Ok(server_revision)
+            let (ret, rx) = oneshot::channel();
+            let _ = edit_cmd_tx.send(EditorCommand::ComposeRemoteDelta {
+                revisions,
+                client_delta: client_prime,
+                server_delta: server_prime,
+                ret,
+            });
+            Ok(rx.await.map_err(internal_error)??)
         },
     }
 }

+ 10 - 20
shared-lib/flowy-collaboration/src/document/history.rs

@@ -1,27 +1,17 @@
 use lib_ot::rich_text::RichTextDelta;
 
-const MAX_UNDOS: usize = 20;
+const MAX_UNDOES: usize = 20;
 
 #[derive(Debug, Clone)]
 pub struct UndoResult {
-    #[allow(dead_code)]
-    success: bool,
-
-    #[allow(dead_code)]
-    len: usize,
-}
-
-impl UndoResult {
-    pub fn fail() -> Self { UndoResult { success: false, len: 0 } }
-
-    pub fn success(len: usize) -> Self { UndoResult { success: true, len } }
+    pub delta: RichTextDelta,
 }
 
 #[derive(Debug, Clone)]
 pub struct History {
     #[allow(dead_code)]
     cur_undo: usize,
-    undos: Vec<RichTextDelta>,
+    undoes: Vec<RichTextDelta>,
     redoes: Vec<RichTextDelta>,
     capacity: usize,
 }
@@ -30,9 +20,9 @@ impl std::default::Default for History {
     fn default() -> Self {
         History {
             cur_undo: 1,
-            undos: Vec::new(),
+            undoes: Vec::new(),
             redoes: Vec::new(),
-            capacity: MAX_UNDOS,
+            capacity: MAX_UNDOES,
         }
     }
 }
@@ -40,11 +30,11 @@ impl std::default::Default for History {
 impl History {
     pub fn new() -> Self { History::default() }
 
-    pub fn can_undo(&self) -> bool { !self.undos.is_empty() }
+    pub fn can_undo(&self) -> bool { !self.undoes.is_empty() }
 
     pub fn can_redo(&self) -> bool { !self.redoes.is_empty() }
 
-    pub fn add_undo(&mut self, delta: RichTextDelta) { self.undos.push(delta); }
+    pub fn add_undo(&mut self, delta: RichTextDelta) { self.undoes.push(delta); }
 
     pub fn add_redo(&mut self, delta: RichTextDelta) { self.redoes.push(delta); }
 
@@ -56,8 +46,8 @@ impl History {
         self.redoes.clear();
         self.add_undo(delta);
 
-        if self.undos.len() > self.capacity {
-            self.undos.remove(0);
+        if self.undoes.len() > self.capacity {
+            self.undoes.remove(0);
         }
     }
 
@@ -65,7 +55,7 @@ impl History {
         if !self.can_undo() {
             return None;
         }
-        let delta = self.undos.pop().unwrap();
+        let delta = self.undoes.pop().unwrap();
         Some(delta)
     }