Przeglądaj źródła

add some code documentation

appflowy 3 lat temu
rodzic
commit
60e9071685

+ 1 - 0
backend/Cargo.lock

@@ -1325,6 +1325,7 @@ name = "flowy-document"
 version = "0.1.0"
 dependencies = [
  "async-stream",
+ "async-trait",
  "bytecount",
  "byteorder",
  "bytes",

+ 1 - 0
frontend/rust-lib/flowy-document/Cargo.toml

@@ -39,6 +39,7 @@ chrono = "0.4.19"
 futures-util = "0.3.15"
 byteorder = {version = "1.3.4"}
 async-stream = "0.3.2"
+async-trait = "0.1.52"
 futures = "0.3.15"
 pin-project = "1.0.0"
 

+ 1 - 1
frontend/rust-lib/flowy-document/src/controller.rs

@@ -194,7 +194,7 @@ impl OpenDocCache {
 fn listen_ws_state_changed(mut state_receiver: WSStateReceiver, receivers: Arc<DocumentWSReceivers>) {
     tokio::spawn(async move {
         while let Ok(state) = state_receiver.recv().await {
-            receivers.ws_connect_state_changed(&state);
+            receivers.ws_connect_state_changed(&state).await;
         }
     });
 }

+ 25 - 25
frontend/rust-lib/flowy-document/src/core/edit/editor.rs

@@ -1,8 +1,7 @@
 use crate::{
     context::DocumentUser,
     core::{
-        web_socket::{make_document_ws_manager, DocumentWebSocketManager},
-        DocumentMD5,
+        web_socket::{make_document_ws_manager, DocumentWebSocketManager, EditorCommandSender},
         DocumentRevisionManager,
         DocumentWSReceiver,
         DocumentWebSocket,
@@ -20,14 +19,14 @@ use lib_ot::{
     rich_text::{RichTextAttribute, RichTextDelta},
 };
 use std::sync::Arc;
-use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
+use tokio::sync::{mpsc, oneshot};
 
 pub struct ClientDocumentEditor {
     pub doc_id: String,
     #[allow(dead_code)]
     rev_manager: Arc<DocumentRevisionManager>,
     ws_manager: Arc<DocumentWebSocketManager>,
-    edit_queue: UnboundedSender<EditorCommand>,
+    edit_cmd_tx: EditorCommandSender,
 }
 
 impl ClientDocumentEditor {
@@ -43,11 +42,11 @@ impl ClientDocumentEditor {
         let doc_id = doc_id.to_string();
         let user_id = user.user_id()?;
 
-        let edit_queue = spawn_edit_queue(user, rev_manager.clone(), delta);
+        let edit_cmd_tx = spawn_edit_queue(user, rev_manager.clone(), delta);
         let ws_manager = make_document_ws_manager(
             doc_id.clone(),
             user_id.clone(),
-            edit_queue.clone(),
+            edit_cmd_tx.clone(),
             rev_manager.clone(),
             ws,
         )
@@ -56,7 +55,7 @@ impl ClientDocumentEditor {
             doc_id,
             rev_manager,
             ws_manager,
-            edit_queue,
+            edit_cmd_tx,
         });
         Ok(editor)
     }
@@ -68,7 +67,7 @@ impl ClientDocumentEditor {
             data: data.to_string(),
             ret,
         };
-        let _ = self.edit_queue.send(msg);
+        let _ = self.edit_cmd_tx.send(msg).await;
         let _ = rx.await.map_err(internal_error)??;
         Ok(())
     }
@@ -76,7 +75,7 @@ impl ClientDocumentEditor {
     pub async fn delete(&self, interval: Interval) -> Result<(), FlowyError> {
         let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
         let msg = EditorCommand::Delete { interval, ret };
-        let _ = self.edit_queue.send(msg);
+        let _ = self.edit_cmd_tx.send(msg).await;
         let _ = rx.await.map_err(internal_error)??;
         Ok(())
     }
@@ -88,7 +87,7 @@ impl ClientDocumentEditor {
             attribute,
             ret,
         };
-        let _ = self.edit_queue.send(msg);
+        let _ = self.edit_cmd_tx.send(msg).await;
         let _ = rx.await.map_err(internal_error)??;
         Ok(())
     }
@@ -100,7 +99,7 @@ impl ClientDocumentEditor {
             data: data.to_string(),
             ret,
         };
-        let _ = self.edit_queue.send(msg);
+        let _ = self.edit_cmd_tx.send(msg).await;
         let _ = rx.await.map_err(internal_error)??;
         Ok(())
     }
@@ -108,21 +107,21 @@ impl ClientDocumentEditor {
     pub async fn can_undo(&self) -> bool {
         let (ret, rx) = oneshot::channel::<bool>();
         let msg = EditorCommand::CanUndo { ret };
-        let _ = self.edit_queue.send(msg);
+        let _ = self.edit_cmd_tx.send(msg).await;
         rx.await.unwrap_or(false)
     }
 
     pub async fn can_redo(&self) -> bool {
         let (ret, rx) = oneshot::channel::<bool>();
         let msg = EditorCommand::CanRedo { ret };
-        let _ = self.edit_queue.send(msg);
+        let _ = self.edit_cmd_tx.send(msg).await;
         rx.await.unwrap_or(false)
     }
 
     pub async fn undo(&self) -> Result<(), FlowyError> {
         let (ret, rx) = oneshot::channel();
         let msg = EditorCommand::Undo { ret };
-        let _ = self.edit_queue.send(msg);
+        let _ = self.edit_cmd_tx.send(msg).await;
         let _ = rx.await.map_err(internal_error)??;
         Ok(())
     }
@@ -130,15 +129,15 @@ impl ClientDocumentEditor {
     pub async fn redo(&self) -> Result<(), FlowyError> {
         let (ret, rx) = oneshot::channel();
         let msg = EditorCommand::Redo { ret };
-        let _ = self.edit_queue.send(msg);
+        let _ = self.edit_cmd_tx.send(msg).await;
         let _ = rx.await.map_err(internal_error)??;
         Ok(())
     }
 
     pub async fn document_json(&self) -> FlowyResult<String> {
         let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
-        let msg = EditorCommand::ReadDoc { ret };
-        let _ = self.edit_queue.send(msg);
+        let msg = EditorCommand::ReadDocumentAsJson { ret };
+        let _ = self.edit_cmd_tx.send(msg).await;
         let json = rx.await.map_err(internal_error)??;
         Ok(json)
     }
@@ -151,7 +150,7 @@ impl ClientDocumentEditor {
             delta: delta.clone(),
             ret,
         };
-        let _ = self.edit_queue.send(msg);
+        let _ = self.edit_cmd_tx.send(msg).await;
         let _ = rx.await.map_err(internal_error)??;
         Ok(())
     }
@@ -162,12 +161,13 @@ impl ClientDocumentEditor {
     pub(crate) fn ws_handler(&self) -> Arc<dyn DocumentWSReceiver> { self.ws_manager.clone() }
 }
 
+// The edit queue will exit after the EditorCommandSender was dropped.
 fn spawn_edit_queue(
     user: Arc<dyn DocumentUser>,
     rev_manager: Arc<DocumentRevisionManager>,
     delta: RichTextDelta,
-) -> UnboundedSender<EditorCommand> {
-    let (sender, receiver) = mpsc::unbounded_channel::<EditorCommand>();
+) -> EditorCommandSender {
+    let (sender, receiver) = mpsc::channel(1000);
     let actor = EditorCommandQueue::new(user, rev_manager, delta, receiver);
     tokio::spawn(actor.run());
     sender
@@ -176,17 +176,17 @@ fn spawn_edit_queue(
 #[cfg(feature = "flowy_unit_test")]
 impl ClientDocumentEditor {
     pub async fn doc_json(&self) -> FlowyResult<String> {
-        let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
-        let msg = EditorCommand::ReadDoc { ret };
-        let _ = self.edit_queue.send(msg);
+        let (ret, rx) = oneshot::channel::<CollaborateResult<crate::core::DocumentMD5>>();
+        let msg = EditorCommand::ReadDocumentAsJson { ret };
+        let _ = self.edit_cmd_tx.send(msg).await;
         let s = rx.await.map_err(internal_error)??;
         Ok(s)
     }
 
     pub async fn doc_delta(&self) -> FlowyResult<RichTextDelta> {
         let (ret, rx) = oneshot::channel::<CollaborateResult<RichTextDelta>>();
-        let msg = EditorCommand::ReadDocDelta { ret };
-        let _ = self.edit_queue.send(msg);
+        let msg = EditorCommand::ReadDocumentAsDelta { ret };
+        let _ = self.edit_cmd_tx.send(msg).await;
         let delta = rx.await.map_err(internal_error)??;
         Ok(delta)
     }

+ 16 - 11
frontend/rust-lib/flowy-document/src/core/edit/queue.rs

@@ -1,4 +1,7 @@
-use crate::{context::DocumentUser, core::DocumentRevisionManager};
+use crate::{
+    context::DocumentUser,
+    core::{web_socket::EditorCommandReceiver, DocumentRevisionManager},
+};
 use async_stream::stream;
 use flowy_collaboration::{
     document::{history::UndoResult, Document, NewlineDoc},
@@ -12,14 +15,16 @@ use lib_ot::{
     core::{Interval, OperationTransformable},
     rich_text::{RichTextAttribute, RichTextDelta},
 };
-use std::sync::Arc;
-use tokio::sync::{mpsc, oneshot, RwLock};
+use std::{cell::Cell, sync::Arc};
+use tokio::sync::{oneshot, RwLock};
 
+// The EditorCommandQueue executes each command that will alter the document in
+// serial.
 pub(crate) struct EditorCommandQueue {
     document: Arc<RwLock<Document>>,
     user: Arc<dyn DocumentUser>,
     rev_manager: Arc<DocumentRevisionManager>,
-    receiver: Option<mpsc::UnboundedReceiver<EditorCommand>>,
+    receiver: Option<EditorCommandReceiver>,
 }
 
 impl EditorCommandQueue {
@@ -27,7 +32,7 @@ impl EditorCommandQueue {
         user: Arc<dyn DocumentUser>,
         rev_manager: Arc<DocumentRevisionManager>,
         delta: RichTextDelta,
-        receiver: mpsc::UnboundedReceiver<EditorCommand>,
+        receiver: EditorCommandReceiver,
     ) -> Self {
         let document = Arc::new(RwLock::new(Document::from_delta(delta)));
         Self {
@@ -183,11 +188,11 @@ impl EditorCommandQueue {
                 let _ = self.save_local_delta(delta, md5).await?;
                 let _ = ret.send(Ok(()));
             },
-            EditorCommand::ReadDoc { ret } => {
+            EditorCommand::ReadDocumentAsJson { ret } => {
                 let data = self.document.read().await.to_json();
                 let _ = ret.send(Ok(data));
             },
-            EditorCommand::ReadDocDelta { ret } => {
+            EditorCommand::ReadDocumentAsDelta { ret } => {
                 let delta = self.document.read().await.delta().clone();
                 let _ = ret.send(Ok(delta));
             },
@@ -286,11 +291,11 @@ pub(crate) enum EditorCommand {
     Redo {
         ret: Ret<()>,
     },
-    ReadDoc {
+    ReadDocumentAsJson {
         ret: Ret<String>,
     },
     #[allow(dead_code)]
-    ReadDocDelta {
+    ReadDocumentAsDelta {
         ret: Ret<RichTextDelta>,
     },
 }
@@ -310,8 +315,8 @@ impl std::fmt::Debug for EditorCommand {
             EditorCommand::CanRedo { .. } => "CanRedo",
             EditorCommand::Undo { .. } => "Undo",
             EditorCommand::Redo { .. } => "Redo",
-            EditorCommand::ReadDoc { .. } => "ReadDoc",
-            EditorCommand::ReadDocDelta { .. } => "ReadDocDelta",
+            EditorCommand::ReadDocumentAsJson { .. } => "ReadDocumentAsJson",
+            EditorCommand::ReadDocumentAsDelta { .. } => "ReadDocumentAsDelta",
         };
         f.write_str(s)
     }

+ 14 - 7
frontend/rust-lib/flowy-document/src/core/web_socket/mod.rs

@@ -4,7 +4,6 @@ pub use ws_manager::*;
 use crate::core::{
     web_socket::{DocumentWSSinkDataProvider, DocumentWSSteamConsumer},
     DocumentRevisionManager,
-    DocumentWSReceiver,
     DocumentWebSocket,
     EditorCommand,
     TransformDeltas,
@@ -21,12 +20,20 @@ use flowy_error::{internal_error, FlowyError, FlowyResult};
 use lib_infra::future::FutureResult;
 use lib_ws::WSConnectState;
 use std::{collections::VecDeque, convert::TryFrom, sync::Arc};
-use tokio::sync::{broadcast, mpsc::UnboundedSender, oneshot, RwLock};
+use tokio::sync::{
+    broadcast,
+    mpsc::{Receiver, Sender},
+    oneshot,
+    RwLock,
+};
+
+pub(crate) type EditorCommandSender = Sender<EditorCommand>;
+pub(crate) type EditorCommandReceiver = Receiver<EditorCommand>;
 
 pub(crate) async fn make_document_ws_manager(
     doc_id: String,
     user_id: String,
-    edit_cmd_tx: UnboundedSender<EditorCommand>,
+    edit_cmd_tx: EditorCommandSender,
     rev_manager: Arc<DocumentRevisionManager>,
     ws_conn: Arc<dyn DocumentWebSocket>,
 ) -> Arc<DocumentWebSocketManager> {
@@ -68,7 +75,7 @@ fn listen_document_ws_state(
 
 pub(crate) struct DocumentWebSocketSteamConsumerAdapter {
     pub(crate) doc_id: String,
-    pub(crate) edit_cmd_tx: UnboundedSender<EditorCommand>,
+    pub(crate) edit_cmd_tx: EditorCommandSender,
     pub(crate) rev_manager: Arc<DocumentRevisionManager>,
     pub(crate) shared_sink: Arc<SharedWSSinkDataProvider>,
 }
@@ -94,7 +101,7 @@ impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
     }
 
     fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> FutureResult<(), FlowyError> {
-        // the _new_user will be used later
+        // Do nothing by now, just a placeholder for future extension.
         FutureResult::new(async move { Ok(()) })
     }
 
@@ -121,7 +128,7 @@ impl DocumentWSSinkDataProvider for DocumentWSSinkDataProviderAdapter {
 
 async fn transform_pushed_revisions(
     revisions: Vec<Revision>,
-    edit_cmd: &UnboundedSender<EditorCommand>,
+    edit_cmd: &EditorCommandSender,
 ) -> FlowyResult<TransformDeltas> {
     let (ret, rx) = oneshot::channel::<CollaborateResult<TransformDeltas>>();
     let _ = edit_cmd.send(EditorCommand::TransformRevision { revisions, ret });
@@ -130,7 +137,7 @@ async fn transform_pushed_revisions(
 
 #[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))]
 pub(crate) async fn handle_remote_revision(
-    edit_cmd_tx: UnboundedSender<EditorCommand>,
+    edit_cmd_tx: EditorCommandSender,
     rev_manager: Arc<DocumentRevisionManager>,
     bytes: Bytes,
 ) -> FlowyResult<Option<Revision>> {

+ 48 - 41
frontend/rust-lib/flowy-document/src/core/web_socket/ws_manager.rs

@@ -3,6 +3,7 @@ use crate::{
     ws_receivers::{DocumentWSReceiver, DocumentWebSocket},
 };
 use async_stream::stream;
+use async_trait::async_trait;
 use bytes::Bytes;
 use flowy_collaboration::entities::{
     revision::{RevId, RevisionRange},
@@ -17,21 +18,35 @@ use tokio::{
     sync::{
         broadcast,
         mpsc,
-        mpsc::{UnboundedReceiver, UnboundedSender},
+        mpsc::{Receiver, Sender},
     },
     task::spawn_blocking,
     time::{interval, Duration},
 };
 
+// The consumer consumes the messages pushed by the web socket.
+pub trait DocumentWSSteamConsumer: Send + Sync {
+    fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError>;
+    fn receive_ack(&self, id: String, ty: DocumentServerWSDataType) -> FutureResult<(), FlowyError>;
+    fn receive_new_user_connect(&self, new_user: NewDocumentUser) -> FutureResult<(), FlowyError>;
+    fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError>;
+}
+
+// The sink provides the data that will be sent through the web socket to the
+// backend.
+pub trait DocumentWSSinkDataProvider: Send + Sync {
+    fn next(&self) -> FutureResult<Option<DocumentClientWSData>, FlowyError>;
+}
+
 pub struct DocumentWebSocketManager {
     doc_id: String,
     data_provider: Arc<dyn DocumentWSSinkDataProvider>,
     stream_consumer: Arc<dyn DocumentWSSteamConsumer>,
     ws_conn: Arc<dyn DocumentWebSocket>,
-    ws_msg_tx: UnboundedSender<DocumentServerWSData>,
-    ws_msg_rx: Option<UnboundedReceiver<DocumentServerWSData>>,
+    ws_passthrough_tx: Sender<DocumentServerWSData>,
+    ws_passthrough_rx: Option<Receiver<DocumentServerWSData>>,
+    state_passthrough_tx: broadcast::Sender<WSConnectState>,
     stop_sync_tx: SinkStopTx,
-    state: broadcast::Sender<WSConnectState>,
 }
 
 impl DocumentWebSocketManager {
@@ -41,26 +56,26 @@ impl DocumentWebSocketManager {
         data_provider: Arc<dyn DocumentWSSinkDataProvider>,
         stream_consumer: Arc<dyn DocumentWSSteamConsumer>,
     ) -> Self {
-        let (ws_msg_tx, ws_msg_rx) = mpsc::unbounded_channel();
+        let (ws_passthrough_tx, ws_passthrough_rx) = mpsc::channel(1000);
         let (stop_sync_tx, _) = tokio::sync::broadcast::channel(2);
         let doc_id = doc_id.to_string();
-        let (state, _) = broadcast::channel(2);
+        let (state_passthrough_tx, _) = broadcast::channel(2);
         let mut manager = DocumentWebSocketManager {
             doc_id,
             data_provider,
             stream_consumer,
             ws_conn,
-            ws_msg_tx,
-            ws_msg_rx: Some(ws_msg_rx),
+            ws_passthrough_tx,
+            ws_passthrough_rx: Some(ws_passthrough_rx),
+            state_passthrough_tx,
             stop_sync_tx,
-            state,
         };
         manager.run();
         manager
     }
 
     fn run(&mut self) {
-        let ws_msg_rx = self.ws_msg_rx.take().expect("Only take once");
+        let ws_msg_rx = self.ws_passthrough_rx.take().expect("Only take once");
         let sink = DocumentWSSink::new(
             &self.doc_id,
             self.data_provider.clone(),
@@ -77,7 +92,7 @@ impl DocumentWebSocketManager {
         tokio::spawn(stream.run());
     }
 
-    pub fn scribe_state(&self) -> broadcast::Receiver<WSConnectState> { self.state.subscribe() }
+    pub fn scribe_state(&self) -> broadcast::Receiver<WSConnectState> { self.state_passthrough_tx.subscribe() }
 
     pub(crate) fn stop(&self) {
         if self.stop_sync_tx.send(()).is_ok() {
@@ -86,16 +101,22 @@ impl DocumentWebSocketManager {
     }
 }
 
+//  DocumentWebSocketManager registers itself as a DocumentWSReceiver for each
+//  opened document. It will receive the web socket message and parser it into
+//  DocumentServerWSData.
+#[async_trait]
 impl DocumentWSReceiver for DocumentWebSocketManager {
-    fn receive_ws_data(&self, doc_data: DocumentServerWSData) {
-        match self.ws_msg_tx.send(doc_data) {
-            Ok(_) => {},
-            Err(e) => tracing::error!("❌ Propagate ws message failed. {}", e),
-        }
+    #[tracing::instrument(level = "debug", skip(self, doc_data), err)]
+    async fn receive_ws_data(&self, doc_data: DocumentServerWSData) -> Result<(), FlowyError> {
+        let _ = self.ws_passthrough_tx.send(doc_data).await.map_err(|e| {
+            let err_msg = format!("{} passthrough error: {}", self.doc_id, e);
+            FlowyError::internal().context(err_msg)
+        })?;
+        Ok(())
     }
 
-    fn connect_state_changed(&self, state: &WSConnectState) {
-        match self.state.send(state.clone()) {
+    fn connect_state_changed(&self, state: WSConnectState) {
+        match self.state_passthrough_tx.send(state) {
             Ok(_) => {},
             Err(e) => tracing::error!("{}", e),
         }
@@ -103,20 +124,13 @@ impl DocumentWSReceiver for DocumentWebSocketManager {
 }
 
 impl std::ops::Drop for DocumentWebSocketManager {
-    fn drop(&mut self) { tracing::debug!("{} HttpWebSocketManager was drop", self.doc_id) }
-}
-
-pub trait DocumentWSSteamConsumer: Send + Sync {
-    fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError>;
-    fn receive_ack(&self, id: String, ty: DocumentServerWSDataType) -> FutureResult<(), FlowyError>;
-    fn receive_new_user_connect(&self, new_user: NewDocumentUser) -> FutureResult<(), FlowyError>;
-    fn pull_revisions_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError>;
+    fn drop(&mut self) { tracing::trace!("{} DocumentWebSocketManager was dropped", self.doc_id) }
 }
 
 pub struct DocumentWSStream {
     doc_id: String,
     consumer: Arc<dyn DocumentWSSteamConsumer>,
-    ws_msg_rx: Option<mpsc::UnboundedReceiver<DocumentServerWSData>>,
+    ws_msg_rx: Option<mpsc::Receiver<DocumentServerWSData>>,
     stop_rx: Option<SinkStopRx>,
 }
 
@@ -124,7 +138,7 @@ impl DocumentWSStream {
     pub fn new(
         doc_id: &str,
         consumer: Arc<dyn DocumentWSSteamConsumer>,
-        ws_msg_rx: mpsc::UnboundedReceiver<DocumentServerWSData>,
+        ws_msg_rx: mpsc::Receiver<DocumentServerWSData>,
         stop_rx: SinkStopRx,
     ) -> Self {
         DocumentWSStream {
@@ -193,21 +207,14 @@ impl DocumentWSStream {
             DocumentServerWSDataType::UserConnect => {
                 let new_user = NewDocumentUser::try_from(bytes)?;
                 let _ = self.consumer.receive_new_user_connect(new_user).await;
-                // Notify the user that someone has connected to this document
             },
         }
         Ok(())
     }
 }
 
-pub type Tick = ();
-pub type SinkStopRx = broadcast::Receiver<()>;
-pub type SinkStopTx = broadcast::Sender<()>;
-
-pub trait DocumentWSSinkDataProvider: Send + Sync {
-    fn next(&self) -> FutureResult<Option<DocumentClientWSData>, FlowyError>;
-}
-
+type SinkStopRx = broadcast::Receiver<()>;
+type SinkStopTx = broadcast::Sender<()>;
 pub struct DocumentWSSink {
     provider: Arc<dyn DocumentWSSinkDataProvider>,
     ws_sender: Arc<dyn DocumentWebSocket>,
@@ -231,7 +238,7 @@ impl DocumentWSSink {
     }
 
     pub async fn run(mut self) {
-        let (tx, mut rx) = mpsc::unbounded_channel();
+        let (tx, mut rx) = mpsc::channel(1);
         let mut stop_rx = self.stop_rx.take().expect("Only take once");
         let doc_id = self.doc_id.clone();
         tokio::spawn(tick(tx));
@@ -245,7 +252,7 @@ impl DocumentWSSink {
                         }
                     },
                     _ = stop_rx.recv() => {
-                        tracing::debug!("[DocumentSink:{}] loop exit", doc_id);
+                        tracing::trace!("[DocumentSink:{}] loop exit", doc_id);
                         break
                     },
                 };
@@ -275,9 +282,9 @@ impl DocumentWSSink {
     }
 }
 
-async fn tick(sender: mpsc::UnboundedSender<Tick>) {
+async fn tick(sender: mpsc::Sender<()>) {
     let mut interval = interval(Duration::from_millis(SYNC_INTERVAL_IN_MILLIS));
-    while sender.send(()).is_ok() {
+    while sender.send(()).await.is_ok() {
         interval.tick().await;
     }
 }

+ 14 - 12
frontend/rust-lib/flowy-document/src/ws_receivers.rs

@@ -1,13 +1,15 @@
 use crate::errors::FlowyError;
+use async_trait::async_trait;
 use bytes::Bytes;
 use dashmap::DashMap;
 use flowy_collaboration::entities::ws::{DocumentClientWSData, DocumentServerWSData};
 use lib_ws::WSConnectState;
 use std::{convert::TryInto, sync::Arc};
 
+#[async_trait]
 pub(crate) trait DocumentWSReceiver: Send + Sync {
-    fn receive_ws_data(&self, data: DocumentServerWSData);
-    fn connect_state_changed(&self, state: &WSConnectState);
+    async fn receive_ws_data(&self, data: DocumentServerWSData) -> Result<(), FlowyError>;
+    fn connect_state_changed(&self, state: WSConnectState);
 }
 
 pub type WSStateReceiver = tokio::sync::broadcast::Receiver<WSConnectState>;
@@ -18,6 +20,7 @@ pub trait DocumentWebSocket: Send + Sync {
 
 pub struct DocumentWSReceivers {
     // key: the document id
+    // value: DocumentWSReceiver
     receivers: Arc<DashMap<String, Arc<dyn DocumentWSReceiver>>>,
 }
 
@@ -40,21 +43,20 @@ impl DocumentWSReceivers {
 
     pub(crate) fn remove(&self, id: &str) { self.receivers.remove(id); }
 
-    pub fn did_receive_data(&self, data: Bytes) {
+    pub async fn did_receive_data(&self, data: Bytes) {
         let data: DocumentServerWSData = data.try_into().unwrap();
         match self.receivers.get(&data.doc_id) {
-            None => {
-                log::error!("Can't find any source handler for {:?}", data.doc_id);
-            },
-            Some(handler) => {
-                handler.receive_ws_data(data);
+            None => tracing::error!("Can't find any source handler for {:?}", data.doc_id),
+            Some(handler) => match handler.receive_ws_data(data).await {
+                Ok(_) => {},
+                Err(e) => tracing::error!("{}", e),
             },
         }
     }
 
-    pub fn ws_connect_state_changed(&self, state: &WSConnectState) {
-        self.receivers.iter().for_each(|receiver| {
-            receiver.value().connect_state_changed(&state);
-        });
+    pub async fn ws_connect_state_changed(&self, state: &WSConnectState) {
+        for receiver in self.receivers.iter() {
+            receiver.value().connect_state_changed(state.clone());
+        }
     }
 }

+ 2 - 7
frontend/rust-lib/flowy-net/src/ws/local/local_server.rs

@@ -13,18 +13,13 @@ use tokio::sync::{mpsc, mpsc::UnboundedSender};
 pub struct LocalDocumentServer {
     pub doc_manager: Arc<ServerDocumentManager>,
     sender: mpsc::UnboundedSender<WebSocketRawMessage>,
-    persistence: Arc<LocalDocumentCloudPersistence>,
 }
 
 impl LocalDocumentServer {
     pub fn new(sender: mpsc::UnboundedSender<WebSocketRawMessage>) -> Self {
         let persistence = Arc::new(LocalDocumentCloudPersistence::default());
-        let doc_manager = Arc::new(ServerDocumentManager::new(persistence.clone()));
-        LocalDocumentServer {
-            doc_manager,
-            sender,
-            persistence,
-        }
+        let doc_manager = Arc::new(ServerDocumentManager::new(persistence));
+        LocalDocumentServer { doc_manager, sender }
     }
 
     pub async fn handle_client_data(

+ 1 - 1
frontend/rust-lib/flowy-net/src/ws/local/persistence.rs

@@ -1,5 +1,5 @@
 use crate::ws::local::DocumentCloudStorage;
-use dashmap::DashMap;
+
 use flowy_collaboration::{
     entities::doc::DocumentInfo,
     errors::CollaborateError,

+ 6 - 1
frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs

@@ -87,7 +87,12 @@ impl DocumentWebSocket for DocumentWebSocketImpl {
 struct WSMessageReceiverImpl(Arc<DocumentWSReceivers>);
 impl WSMessageReceiver for WSMessageReceiverImpl {
     fn source(&self) -> WSModule { WSModule::Doc }
-    fn receive_message(&self, msg: WebSocketRawMessage) { self.0.did_receive_data(Bytes::from(msg.data)); }
+    fn receive_message(&self, msg: WebSocketRawMessage) {
+        let receivers = self.0.clone();
+        tokio::spawn(async move {
+            receivers.did_receive_data(Bytes::from(msg.data)).await;
+        });
+    }
 }
 
 fn make_document_cloud_service(server_config: &ClientServerConfiguration) -> Arc<dyn DocumentCloudService> {

+ 3 - 3
shared-lib/flowy-collaboration/src/sync/server.rs

@@ -166,7 +166,7 @@ impl ServerDocumentManager {
 
 impl std::ops::Drop for ServerDocumentManager {
     fn drop(&mut self) {
-        log::debug!("ServerDocumentManager was drop");
+        log::trace!("ServerDocumentManager was dropped");
     }
 }
 
@@ -241,7 +241,7 @@ impl OpenDocHandle {
 
 impl std::ops::Drop for OpenDocHandle {
     fn drop(&mut self) {
-        tracing::debug!("{} OpenDocHandle was drop", self.doc_id);
+        tracing::trace!("{} OpenDocHandle was dropped", self.doc_id);
     }
 }
 
@@ -327,7 +327,7 @@ impl DocumentCommandQueue {
 
 impl std::ops::Drop for DocumentCommandQueue {
     fn drop(&mut self) {
-        tracing::debug!("{} DocumentCommandQueue was drop", self.doc_id);
+        tracing::trace!("{} DocumentCommandQueue was dropped", self.doc_id);
     }
 }
 

+ 0 - 1
shared-lib/lib-ws/src/ws.rs

@@ -160,7 +160,6 @@ async fn spawn_stream_and_handlers(
 pub struct WSHandlerFuture {
     #[pin]
     msg_rx: MsgReceiver,
-    // Opti: Hashmap would be better
     handlers: Handlers,
 }