Browse Source

enable local web socket

appflowy 3 years ago
parent
commit
d3a00b14b0

+ 1 - 1
frontend/rust-lib/flowy-document/src/services/controller.rs

@@ -171,7 +171,7 @@ impl OpenDocCache {
     pub(crate) fn remove(&self, id: &str) {
         let doc_id = id.to_string();
         match self.get(id) {
-            Ok(editor) => editor.stop_sync(),
+            Ok(editor) => editor.stop(),
             Err(e) => log::error!("{}", e),
         }
         self.inner.remove(&doc_id);

+ 20 - 296
frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs

@@ -1,29 +1,27 @@
-use crate::{errors::FlowyError, module::DocumentUser, services::doc::*};
-use bytes::Bytes;
-use flowy_collaboration::{
-    core::document::history::UndoResult,
-    entities::{
-        doc::DocDelta,
-        ws::{DocumentWSData, DocumentWSDataBuilder, DocumentWSDataType, NewDocumentUser},
+use crate::{
+    errors::FlowyError,
+    module::DocumentUser,
+    services::doc::{
+        web_socket::{initialize_document_web_socket, DocumentWebSocketContext, EditorWebSocket},
+        *,
     },
-    errors::CollaborateResult,
 };
+use bytes::Bytes;
+use flowy_collaboration::{core::document::history::UndoResult, entities::doc::DocDelta, errors::CollaborateResult};
 use flowy_database::ConnectionPool;
 use flowy_error::{internal_error, FlowyResult};
-use lib_infra::future::FutureResult;
 use lib_ot::{
     core::Interval,
-    revision::{RevId, RevType, Revision, RevisionRange},
+    revision::{RevId, RevType, Revision},
     rich_text::{RichTextAttribute, RichTextDelta},
 };
-use lib_ws::WSConnectState;
-use std::{collections::VecDeque, sync::Arc};
-use tokio::sync::{broadcast, mpsc, mpsc::UnboundedSender, oneshot, RwLock};
+use std::sync::Arc;
+use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
 
 pub struct ClientDocEditor {
     pub doc_id: String,
     rev_manager: Arc<RevisionManager>,
-    editor_ws: Arc<EditorWebSocket>,
+    editor_ws: Arc<dyn EditorWebSocket>,
     editor_cmd_sender: UnboundedSender<EditorCommand>,
     user: Arc<dyn DocumentUser>,
 }
@@ -42,34 +40,16 @@ impl ClientDocEditor {
         let doc_id = doc_id.to_string();
         let user_id = user.user_id()?;
         let rev_manager = Arc::new(rev_manager);
-        let combined_sink = Arc::new(CombinedSink::new(rev_manager.clone()));
-        let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
-            doc_id: doc_id.clone(),
+
+        let context = DocumentWebSocketContext {
+            doc_id: doc_id.to_owned(),
+            user_id: user_id.clone(),
             editor_cmd_sender: editor_cmd_sender.clone(),
             rev_manager: rev_manager.clone(),
-            user: user.clone(),
-            combined_sink: combined_sink.clone(),
-        });
-        let ws_stream_provider = Arc::new(DocumentWSSinkDataProviderAdapter(combined_sink.clone()));
-        let editor_ws = Arc::new(EditorWebSocket::new(
-            &doc_id,
             ws,
-            ws_stream_provider,
-            ws_stream_consumer,
-        ));
-
-        //
-        notify_user_conn(&user_id, &doc_id, rev_manager.clone(), combined_sink.clone()).await;
-
-        //
-        listen_document_ws_state(
-            &user_id,
-            &doc_id,
-            editor_ws.scribe_state(),
-            rev_manager.clone(),
-            combined_sink,
-        );
+        };
 
+        let editor_ws = initialize_document_web_socket(context).await;
         let editor = Arc::new(Self {
             doc_id,
             rev_manager,
@@ -203,9 +183,9 @@ impl ClientDocEditor {
     }
 
     #[tracing::instrument(level = "debug", skip(self))]
-    pub fn stop_sync(&self) { self.editor_ws.stop(); }
+    pub fn stop(&self) { self.editor_ws.stop_web_socket(); }
 
-    pub(crate) fn ws_handler(&self) -> Arc<dyn DocumentWsHandler> { self.editor_ws.clone() }
+    pub(crate) fn ws_handler(&self) -> Arc<dyn DocumentWsHandler> { self.editor_ws.ws_handler() }
 }
 
 fn spawn_edit_queue(doc_id: &str, delta: RichTextDelta, _pool: Arc<ConnectionPool>) -> UnboundedSender<EditorCommand> {
@@ -215,262 +195,6 @@ fn spawn_edit_queue(doc_id: &str, delta: RichTextDelta, _pool: Arc<ConnectionPoo
     sender
 }
 
-fn listen_document_ws_state(
-    user_id: &str,
-    doc_id: &str,
-    mut subscriber: broadcast::Receiver<WSConnectState>,
-    rev_manager: Arc<RevisionManager>,
-    sink_data_provider: Arc<CombinedSink>,
-) {
-    let user_id = user_id.to_owned();
-    let doc_id = doc_id.to_owned();
-
-    tokio::spawn(async move {
-        while let Ok(state) = subscriber.recv().await {
-            match state {
-                WSConnectState::Init => {},
-                WSConnectState::Connecting => {},
-                WSConnectState::Connected => {
-                    // self.notify_user_conn()
-                    notify_user_conn(&user_id, &doc_id, rev_manager.clone(), sink_data_provider.clone()).await;
-                },
-                WSConnectState::Disconnected => {},
-            }
-        }
-    });
-}
-
-async fn notify_user_conn(
-    user_id: &str,
-    doc_id: &str,
-    rev_manager: Arc<RevisionManager>,
-    combined_sink: Arc<CombinedSink>,
-) {
-    let need_notify = match combined_sink.front().await {
-        None => true,
-        Some(data) => data.ty != DocumentWSDataType::UserConnect,
-    };
-
-    if need_notify {
-        let new_connect = NewDocumentUser {
-            user_id: user_id.to_owned(),
-            doc_id: doc_id.to_owned(),
-            rev_id: rev_manager.latest_rev_id(),
-        };
-
-        let data = DocumentWSDataBuilder::build_new_document_user_message(doc_id, new_connect);
-        combined_sink.push_front(data).await;
-    }
-}
-
-struct DocumentWebSocketSteamConsumerAdapter {
-    doc_id: String,
-    editor_cmd_sender: UnboundedSender<EditorCommand>,
-    rev_manager: Arc<RevisionManager>,
-    user: Arc<dyn DocumentUser>,
-    combined_sink: Arc<CombinedSink>,
-}
-
-impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
-    fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> {
-        let user = self.user.clone();
-        let rev_manager = self.rev_manager.clone();
-        let edit_cmd_tx = self.editor_cmd_sender.clone();
-        let combined_sink = self.combined_sink.clone();
-        let doc_id = self.doc_id.clone();
-        FutureResult::new(async move {
-            let user_id = user.user_id()?;
-            if let Some(revision) = handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await? {
-                combined_sink.push_back(revision.into()).await;
-            }
-            Ok(())
-        })
-    }
-
-    fn receive_ack(&self, id: String, ty: DocumentWSDataType) -> FutureResult<(), FlowyError> {
-        let combined_sink = self.combined_sink.clone();
-        FutureResult::new(async move { combined_sink.ack(id, ty).await })
-    }
-
-    fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> FutureResult<(), FlowyError> {
-        FutureResult::new(async move { Ok(()) })
-    }
-
-    fn send_revision_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> {
-        let rev_manager = self.rev_manager.clone();
-        let combined_sink = self.combined_sink.clone();
-        FutureResult::new(async move {
-            let revision = rev_manager.mk_revisions(range).await?;
-            combined_sink.push_back(revision.into()).await;
-            Ok(())
-        })
-    }
-}
-
-struct DocumentWSSinkDataProviderAdapter(Arc<CombinedSink>);
-impl DocumentWSSinkDataProvider for DocumentWSSinkDataProviderAdapter {
-    fn next(&self) -> FutureResult<Option<DocumentWSData>, FlowyError> {
-        let combined_sink = self.0.clone();
-        FutureResult::new(async move { combined_sink.next().await })
-    }
-}
-
-#[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))]
-pub(crate) async fn handle_push_rev(
-    doc_id: &str,
-    user_id: &str,
-    edit_cmd_tx: UnboundedSender<EditorCommand>,
-    rev_manager: Arc<RevisionManager>,
-    bytes: Bytes,
-) -> FlowyResult<Option<Revision>> {
-    // Transform the revision
-    let (ret, rx) = oneshot::channel::<CollaborateResult<TransformDeltas>>();
-    let _ = edit_cmd_tx.send(EditorCommand::ProcessRemoteRevision { bytes, ret });
-    let TransformDeltas {
-        client_prime,
-        server_prime,
-        server_rev_id,
-    } = rx.await.map_err(internal_error)??;
-
-    if rev_manager.rev_id() >= server_rev_id.value {
-        // Ignore this push revision if local_rev_id >= server_rev_id
-        return Ok(None);
-    }
-
-    // compose delta
-    let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
-    let msg = EditorCommand::ComposeDelta {
-        delta: client_prime.clone(),
-        ret,
-    };
-    let _ = edit_cmd_tx.send(msg);
-    let md5 = rx.await.map_err(internal_error)??;
-
-    // update rev id
-    rev_manager.update_rev_id_counter_value(server_rev_id.clone().into());
-    let (local_base_rev_id, local_rev_id) = rev_manager.next_rev_id();
-    let delta_data = client_prime.to_bytes();
-    // save the revision
-    let revision = Revision::new(
-        &doc_id,
-        local_base_rev_id,
-        local_rev_id,
-        delta_data,
-        RevType::Remote,
-        &user_id,
-        md5.clone(),
-    );
-
-    let _ = rev_manager.add_remote_revision(&revision).await?;
-
-    // send the server_prime delta
-    let delta_data = server_prime.to_bytes();
-    Ok(Some(Revision::new(
-        &doc_id,
-        local_base_rev_id,
-        local_rev_id,
-        delta_data,
-        RevType::Remote,
-        &user_id,
-        md5,
-    )))
-}
-
-#[derive(Clone)]
-enum SourceType {
-    Shared,
-    Revision,
-}
-
-#[derive(Clone)]
-struct CombinedSink {
-    shared: Arc<RwLock<VecDeque<DocumentWSData>>>,
-    rev_manager: Arc<RevisionManager>,
-    source_ty: Arc<RwLock<SourceType>>,
-}
-
-impl CombinedSink {
-    fn new(rev_manager: Arc<RevisionManager>) -> Self {
-        CombinedSink {
-            shared: Arc::new(RwLock::new(VecDeque::new())),
-            rev_manager,
-            source_ty: Arc::new(RwLock::new(SourceType::Shared)),
-        }
-    }
-
-    // FIXME: return Option<&DocumentWSData> would be better
-    async fn front(&self) -> Option<DocumentWSData> { self.shared.read().await.front().cloned() }
-
-    async fn push_front(&self, data: DocumentWSData) { self.shared.write().await.push_front(data); }
-
-    async fn push_back(&self, data: DocumentWSData) { self.shared.write().await.push_back(data); }
-
-    async fn next(&self) -> FlowyResult<Option<DocumentWSData>> {
-        let source_ty = self.source_ty.read().await.clone();
-        match source_ty {
-            SourceType::Shared => match self.shared.read().await.front() {
-                None => {
-                    *self.source_ty.write().await = SourceType::Revision;
-                    Ok(None)
-                },
-                Some(data) => {
-                    tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", data.doc_id, data.ty);
-                    Ok(Some(data.clone()))
-                },
-            },
-            SourceType::Revision => {
-                if !self.shared.read().await.is_empty() {
-                    *self.source_ty.write().await = SourceType::Shared;
-                    return Ok(None);
-                }
-
-                match self.rev_manager.next_sync_revision().await? {
-                    Some(rev) => {
-                        tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", rev.doc_id, rev.rev_id);
-                        Ok(Some(rev.into()))
-                    },
-                    None => Ok(None),
-                }
-            },
-        }
-    }
-
-    async fn ack(&self, id: String, _ty: DocumentWSDataType) -> FlowyResult<()> {
-        // let _ = self.rev_manager.ack_revision(id).await?;
-        let source_ty = self.source_ty.read().await.clone();
-        match source_ty {
-            SourceType::Shared => {
-                let should_pop = match self.shared.read().await.front() {
-                    None => false,
-                    Some(val) => {
-                        if val.id == id {
-                            true
-                        } else {
-                            tracing::error!("The front element's {} is not equal to the {}", val.id, id);
-                            false
-                        }
-                    },
-                };
-                if should_pop {
-                    let _ = self.shared.write().await.pop_front();
-                }
-            },
-            SourceType::Revision => {
-                match id.parse::<i64>() {
-                    Ok(rev_id) => {
-                        let _ = self.rev_manager.ack_revision(rev_id).await?;
-                    },
-                    Err(e) => {
-                        tracing::error!("Parse rev_id from {} failed. {}", id, e);
-                    },
-                };
-            },
-        }
-
-        Ok(())
-    }
-}
-
 #[cfg(feature = "flowy_unit_test")]
 impl ClientDocEditor {
     pub async fn doc_json(&self) -> FlowyResult<String> {

+ 0 - 0
frontend/rust-lib/flowy-document/src/services/doc/edit/editor_edit_cmd_queue.rs → frontend/rust-lib/flowy-document/src/services/doc/edit/editor_cmd_queue.rs


+ 2 - 4
frontend/rust-lib/flowy-document/src/services/doc/edit/mod.rs

@@ -1,7 +1,5 @@
 mod editor;
-mod editor_edit_cmd_queue;
-mod editor_web_socket;
+mod editor_cmd_queue;
 
 pub use editor::*;
-pub(crate) use editor_edit_cmd_queue::*;
-pub use editor_web_socket::*;
+pub(crate) use editor_cmd_queue::*;

+ 1 - 1
frontend/rust-lib/flowy-document/src/services/doc/mod.rs

@@ -1,6 +1,6 @@
 pub mod edit;
 pub mod revision;
-
+mod web_socket;
 pub use crate::services::ws_handlers::*;
 pub use edit::*;
 pub use revision::*;

+ 24 - 17
frontend/rust-lib/flowy-document/src/services/doc/edit/editor_web_socket.rs → frontend/rust-lib/flowy-document/src/services/doc/web_socket/http_ws_impl.rs

@@ -1,4 +1,7 @@
-use crate::services::doc::{DocumentWebSocket, DocumentWsHandler, SYNC_INTERVAL_IN_MILLIS};
+use crate::services::{
+    doc::{web_socket::web_socket::EditorWebSocket, SYNC_INTERVAL_IN_MILLIS},
+    ws_handlers::{DocumentWebSocket, DocumentWsHandler},
+};
 use async_stream::stream;
 use bytes::Bytes;
 use flowy_collaboration::entities::ws::{DocumentWSData, DocumentWSDataType, NewDocumentUser};
@@ -18,7 +21,7 @@ use tokio::{
     time::{interval, Duration},
 };
 
-pub(crate) struct EditorWebSocket {
+pub struct EditorHttpWebSocket {
     doc_id: String,
     data_provider: Arc<dyn DocumentWSSinkDataProvider>,
     stream_consumer: Arc<dyn DocumentWSSteamConsumer>,
@@ -29,8 +32,8 @@ pub(crate) struct EditorWebSocket {
     state: broadcast::Sender<WSConnectState>,
 }
 
-impl EditorWebSocket {
-    pub(crate) fn new(
+impl EditorHttpWebSocket {
+    pub fn new(
         doc_id: &str,
         ws: Arc<dyn DocumentWebSocket>,
         data_provider: Arc<dyn DocumentWSSinkDataProvider>,
@@ -40,7 +43,7 @@ impl EditorWebSocket {
         let (stop_sync_tx, _) = tokio::sync::broadcast::channel(2);
         let doc_id = doc_id.to_string();
         let (state, _) = broadcast::channel(2);
-        let mut manager = EditorWebSocket {
+        let mut manager = EditorHttpWebSocket {
             doc_id,
             data_provider,
             stream_consumer,
@@ -50,11 +53,11 @@ impl EditorWebSocket {
             stop_sync_tx,
             state,
         };
-        manager.start_sync();
+        manager.start_web_socket();
         manager
     }
 
-    fn start_sync(&mut self) {
+    fn start_web_socket(&mut self) {
         let ws_msg_rx = self.ws_msg_rx.take().expect("Only take once");
         let sink = DocumentWebSocketSink::new(
             &self.doc_id,
@@ -72,16 +75,20 @@ impl EditorWebSocket {
         tokio::spawn(stream.run());
     }
 
-    pub(crate) fn stop(&self) {
+    pub fn scribe_state(&self) -> broadcast::Receiver<WSConnectState> { self.state.subscribe() }
+}
+
+impl EditorWebSocket for Arc<EditorHttpWebSocket> {
+    fn stop_web_socket(&self) {
         if self.stop_sync_tx.send(()).is_ok() {
             tracing::debug!("{} stop sync", self.doc_id)
         }
     }
 
-    pub(crate) fn scribe_state(&self) -> broadcast::Receiver<WSConnectState> { self.state.subscribe() }
+    fn ws_handler(&self) -> Arc<dyn DocumentWsHandler> { self.clone() }
 }
 
-impl DocumentWsHandler for EditorWebSocket {
+impl DocumentWsHandler for EditorHttpWebSocket {
     fn receive(&self, doc_data: DocumentWSData) {
         match self.ws_msg_tx.send(doc_data) {
             Ok(_) => {},
@@ -104,7 +111,7 @@ pub trait DocumentWSSteamConsumer: Send + Sync {
     fn send_revision_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError>;
 }
 
-pub(crate) struct DocumentWebSocketStream {
+pub struct DocumentWebSocketStream {
     doc_id: String,
     consumer: Arc<dyn DocumentWSSteamConsumer>,
     ws_msg_rx: Option<mpsc::UnboundedReceiver<DocumentWSData>>,
@@ -112,7 +119,7 @@ pub(crate) struct DocumentWebSocketStream {
 }
 
 impl DocumentWebSocketStream {
-    pub(crate) fn new(
+    pub fn new(
         doc_id: &str,
         consumer: Arc<dyn DocumentWSSteamConsumer>,
         ws_msg_rx: mpsc::UnboundedReceiver<DocumentWSData>,
@@ -197,15 +204,15 @@ impl DocumentWebSocketStream {
     }
 }
 
-pub(crate) type Tick = ();
-pub(crate) type SinkStopRx = broadcast::Receiver<()>;
-pub(crate) type SinkStopTx = broadcast::Sender<()>;
+pub type Tick = ();
+pub type SinkStopRx = broadcast::Receiver<()>;
+pub type SinkStopTx = broadcast::Sender<()>;
 
 pub trait DocumentWSSinkDataProvider: Send + Sync {
     fn next(&self) -> FutureResult<Option<DocumentWSData>, FlowyError>;
 }
 
-pub(crate) struct DocumentWebSocketSink {
+pub struct DocumentWebSocketSink {
     provider: Arc<dyn DocumentWSSinkDataProvider>,
     ws_sender: Arc<dyn DocumentWebSocket>,
     stop_rx: Option<SinkStopRx>,
@@ -213,7 +220,7 @@ pub(crate) struct DocumentWebSocketSink {
 }
 
 impl DocumentWebSocketSink {
-    pub(crate) fn new(
+    pub fn new(
         doc_id: &str,
         provider: Arc<dyn DocumentWSSinkDataProvider>,
         ws_sender: Arc<dyn DocumentWebSocket>,

+ 18 - 0
frontend/rust-lib/flowy-document/src/services/doc/web_socket/local_ws_impl.rs

@@ -0,0 +1,18 @@
+use crate::services::doc::{web_socket::EditorWebSocket, DocumentWsHandler};
+use flowy_collaboration::entities::ws::DocumentWSData;
+use lib_ws::WSConnectState;
+use std::sync::Arc;
+
+pub(crate) struct EditorLocalWebSocket {}
+
+impl EditorWebSocket for Arc<EditorLocalWebSocket> {
+    fn stop_web_socket(&self) {}
+
+    fn ws_handler(&self) -> Arc<dyn DocumentWsHandler> { self.clone() }
+}
+
+impl DocumentWsHandler for EditorLocalWebSocket {
+    fn receive(&self, _doc_data: DocumentWSData) {}
+
+    fn connect_state_changed(&self, _state: &WSConnectState) {}
+}

+ 7 - 0
frontend/rust-lib/flowy-document/src/services/doc/web_socket/mod.rs

@@ -0,0 +1,7 @@
+#![allow(clippy::module_inception)]
+mod http_ws_impl;
+mod local_ws_impl;
+mod web_socket;
+
+pub(crate) use http_ws_impl::*;
+pub(crate) use web_socket::*;

+ 333 - 0
frontend/rust-lib/flowy-document/src/services/doc/web_socket/web_socket.rs

@@ -0,0 +1,333 @@
+use crate::services::doc::{
+    web_socket::{
+        local_ws_impl::EditorLocalWebSocket,
+        DocumentWSSinkDataProvider,
+        DocumentWSSteamConsumer,
+        EditorHttpWebSocket,
+    },
+    DocumentMD5,
+    DocumentWebSocket,
+    DocumentWsHandler,
+    EditorCommand,
+    RevisionManager,
+    TransformDeltas,
+};
+use bytes::Bytes;
+use flowy_collaboration::{
+    entities::ws::{DocumentWSData, DocumentWSDataBuilder, DocumentWSDataType, NewDocumentUser},
+    errors::CollaborateResult,
+};
+use flowy_error::{internal_error, FlowyError, FlowyResult};
+use lib_infra::future::FutureResult;
+use lib_ot::revision::{RevType, Revision, RevisionRange};
+use lib_ws::WSConnectState;
+use std::{collections::VecDeque, sync::Arc};
+use tokio::sync::{broadcast, mpsc::UnboundedSender, oneshot, RwLock};
+
+pub(crate) trait EditorWebSocket: Send + Sync {
+    fn stop_web_socket(&self);
+    fn ws_handler(&self) -> Arc<dyn DocumentWsHandler>;
+}
+
+pub(crate) struct DocumentWebSocketContext {
+    pub(crate) doc_id: String,
+    pub(crate) user_id: String,
+    pub(crate) editor_cmd_sender: UnboundedSender<EditorCommand>,
+    pub(crate) rev_manager: Arc<RevisionManager>,
+    pub(crate) ws: Arc<dyn DocumentWebSocket>,
+}
+
+pub(crate) async fn initialize_document_web_socket(ctx: DocumentWebSocketContext) -> Arc<dyn EditorWebSocket> {
+    if cfg!(feature = "http_server") {
+        let combined_sink = Arc::new(CombinedSink::new(ctx.rev_manager.clone()));
+        let ws_stream_consumer = Arc::new(DocumentWebSocketSteamConsumerAdapter {
+            doc_id: ctx.doc_id.clone(),
+            user_id: ctx.user_id.clone(),
+            editor_cmd_sender: ctx.editor_cmd_sender.clone(),
+            rev_manager: ctx.rev_manager.clone(),
+            combined_sink: combined_sink.clone(),
+        });
+        let ws_stream_provider = DocumentWSSinkDataProviderAdapter(combined_sink.clone());
+        let editor_ws = Arc::new(EditorHttpWebSocket::new(
+            &ctx.doc_id,
+            ctx.ws.clone(),
+            Arc::new(ws_stream_provider),
+            ws_stream_consumer,
+        ));
+
+        notify_user_conn(
+            &ctx.user_id,
+            &ctx.doc_id,
+            ctx.rev_manager.clone(),
+            combined_sink.clone(),
+        )
+        .await;
+
+        listen_document_ws_state(
+            &ctx.user_id,
+            &ctx.doc_id,
+            editor_ws.scribe_state(),
+            ctx.rev_manager.clone(),
+            combined_sink,
+        );
+
+        Arc::new(editor_ws)
+    } else {
+        Arc::new(Arc::new(EditorLocalWebSocket {}))
+    }
+}
+
+async fn notify_user_conn(
+    user_id: &str,
+    doc_id: &str,
+    rev_manager: Arc<RevisionManager>,
+    combined_sink: Arc<CombinedSink>,
+) {
+    let need_notify = match combined_sink.front().await {
+        None => true,
+        Some(data) => data.ty != DocumentWSDataType::UserConnect,
+    };
+
+    if need_notify {
+        let new_connect = NewDocumentUser {
+            user_id: user_id.to_owned(),
+            doc_id: doc_id.to_owned(),
+            rev_id: rev_manager.latest_rev_id(),
+        };
+
+        let data = DocumentWSDataBuilder::build_new_document_user_message(doc_id, new_connect);
+        combined_sink.push_front(data).await;
+    }
+}
+
+fn listen_document_ws_state(
+    user_id: &str,
+    doc_id: &str,
+    mut subscriber: broadcast::Receiver<WSConnectState>,
+    rev_manager: Arc<RevisionManager>,
+    sink_data_provider: Arc<CombinedSink>,
+) {
+    let user_id = user_id.to_owned();
+    let doc_id = doc_id.to_owned();
+
+    tokio::spawn(async move {
+        while let Ok(state) = subscriber.recv().await {
+            match state {
+                WSConnectState::Init => {},
+                WSConnectState::Connecting => {},
+                WSConnectState::Connected => {
+                    // self.notify_user_conn()
+                    notify_user_conn(&user_id, &doc_id, rev_manager.clone(), sink_data_provider.clone()).await;
+                },
+                WSConnectState::Disconnected => {},
+            }
+        }
+    });
+}
+
+pub(crate) struct DocumentWebSocketSteamConsumerAdapter {
+    pub(crate) doc_id: String,
+    pub(crate) user_id: String,
+    pub(crate) editor_cmd_sender: UnboundedSender<EditorCommand>,
+    pub(crate) rev_manager: Arc<RevisionManager>,
+    pub(crate) combined_sink: Arc<CombinedSink>,
+}
+
+impl DocumentWSSteamConsumer for DocumentWebSocketSteamConsumerAdapter {
+    fn receive_push_revision(&self, bytes: Bytes) -> FutureResult<(), FlowyError> {
+        let user_id = self.user_id.clone();
+        let rev_manager = self.rev_manager.clone();
+        let edit_cmd_tx = self.editor_cmd_sender.clone();
+        let combined_sink = self.combined_sink.clone();
+        let doc_id = self.doc_id.clone();
+        FutureResult::new(async move {
+            if let Some(revision) = handle_push_rev(&doc_id, &user_id, edit_cmd_tx, rev_manager, bytes).await? {
+                combined_sink.push_back(revision.into()).await;
+            }
+            Ok(())
+        })
+    }
+
+    fn receive_ack(&self, id: String, ty: DocumentWSDataType) -> FutureResult<(), FlowyError> {
+        let combined_sink = self.combined_sink.clone();
+        FutureResult::new(async move { combined_sink.ack(id, ty).await })
+    }
+
+    fn receive_new_user_connect(&self, _new_user: NewDocumentUser) -> FutureResult<(), FlowyError> {
+        FutureResult::new(async move { Ok(()) })
+    }
+
+    fn send_revision_in_range(&self, range: RevisionRange) -> FutureResult<(), FlowyError> {
+        let rev_manager = self.rev_manager.clone();
+        let combined_sink = self.combined_sink.clone();
+        FutureResult::new(async move {
+            let revision = rev_manager.mk_revisions(range).await?;
+            combined_sink.push_back(revision.into()).await;
+            Ok(())
+        })
+    }
+}
+
+pub(crate) struct DocumentWSSinkDataProviderAdapter(pub(crate) Arc<CombinedSink>);
+impl DocumentWSSinkDataProvider for DocumentWSSinkDataProviderAdapter {
+    fn next(&self) -> FutureResult<Option<DocumentWSData>, FlowyError> {
+        let combined_sink = self.0.clone();
+        FutureResult::new(async move { combined_sink.next().await })
+    }
+}
+
+#[tracing::instrument(level = "debug", skip(edit_cmd_tx, rev_manager, bytes))]
+pub(crate) async fn handle_push_rev(
+    doc_id: &str,
+    user_id: &str,
+    edit_cmd_tx: UnboundedSender<EditorCommand>,
+    rev_manager: Arc<RevisionManager>,
+    bytes: Bytes,
+) -> FlowyResult<Option<Revision>> {
+    // Transform the revision
+    let (ret, rx) = oneshot::channel::<CollaborateResult<TransformDeltas>>();
+    let _ = edit_cmd_tx.send(EditorCommand::ProcessRemoteRevision { bytes, ret });
+    let TransformDeltas {
+        client_prime,
+        server_prime,
+        server_rev_id,
+    } = rx.await.map_err(internal_error)??;
+
+    if rev_manager.rev_id() >= server_rev_id.value {
+        // Ignore this push revision if local_rev_id >= server_rev_id
+        return Ok(None);
+    }
+
+    // compose delta
+    let (ret, rx) = oneshot::channel::<CollaborateResult<DocumentMD5>>();
+    let msg = EditorCommand::ComposeDelta {
+        delta: client_prime.clone(),
+        ret,
+    };
+    let _ = edit_cmd_tx.send(msg);
+    let md5 = rx.await.map_err(internal_error)??;
+
+    // update rev id
+    rev_manager.update_rev_id_counter_value(server_rev_id.clone().into());
+    let (local_base_rev_id, local_rev_id) = rev_manager.next_rev_id();
+    let delta_data = client_prime.to_bytes();
+    // save the revision
+    let revision = Revision::new(
+        &doc_id,
+        local_base_rev_id,
+        local_rev_id,
+        delta_data,
+        RevType::Remote,
+        &user_id,
+        md5.clone(),
+    );
+
+    let _ = rev_manager.add_remote_revision(&revision).await?;
+
+    // send the server_prime delta
+    let delta_data = server_prime.to_bytes();
+    Ok(Some(Revision::new(
+        &doc_id,
+        local_base_rev_id,
+        local_rev_id,
+        delta_data,
+        RevType::Remote,
+        &user_id,
+        md5,
+    )))
+}
+
+#[derive(Clone)]
+enum SourceType {
+    Shared,
+    Revision,
+}
+
+#[derive(Clone)]
+pub(crate) struct CombinedSink {
+    shared: Arc<RwLock<VecDeque<DocumentWSData>>>,
+    rev_manager: Arc<RevisionManager>,
+    source_ty: Arc<RwLock<SourceType>>,
+}
+
+impl CombinedSink {
+    pub(crate) fn new(rev_manager: Arc<RevisionManager>) -> Self {
+        CombinedSink {
+            shared: Arc::new(RwLock::new(VecDeque::new())),
+            rev_manager,
+            source_ty: Arc::new(RwLock::new(SourceType::Shared)),
+        }
+    }
+
+    // FIXME: return Option<&DocumentWSData> would be better
+    pub(crate) async fn front(&self) -> Option<DocumentWSData> { self.shared.read().await.front().cloned() }
+
+    pub(crate) async fn push_front(&self, data: DocumentWSData) { self.shared.write().await.push_front(data); }
+
+    async fn push_back(&self, data: DocumentWSData) { self.shared.write().await.push_back(data); }
+
+    async fn next(&self) -> FlowyResult<Option<DocumentWSData>> {
+        let source_ty = self.source_ty.read().await.clone();
+        match source_ty {
+            SourceType::Shared => match self.shared.read().await.front() {
+                None => {
+                    *self.source_ty.write().await = SourceType::Revision;
+                    Ok(None)
+                },
+                Some(data) => {
+                    tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", data.doc_id, data.ty);
+                    Ok(Some(data.clone()))
+                },
+            },
+            SourceType::Revision => {
+                if !self.shared.read().await.is_empty() {
+                    *self.source_ty.write().await = SourceType::Shared;
+                    return Ok(None);
+                }
+
+                match self.rev_manager.next_sync_revision().await? {
+                    Some(rev) => {
+                        tracing::debug!("[DocumentSinkDataProvider]: {}:{:?}", rev.doc_id, rev.rev_id);
+                        Ok(Some(rev.into()))
+                    },
+                    None => Ok(None),
+                }
+            },
+        }
+    }
+
+    async fn ack(&self, id: String, _ty: DocumentWSDataType) -> FlowyResult<()> {
+        // let _ = self.rev_manager.ack_revision(id).await?;
+        let source_ty = self.source_ty.read().await.clone();
+        match source_ty {
+            SourceType::Shared => {
+                let should_pop = match self.shared.read().await.front() {
+                    None => false,
+                    Some(val) => {
+                        if val.id == id {
+                            true
+                        } else {
+                            tracing::error!("The front element's {} is not equal to the {}", val.id, id);
+                            false
+                        }
+                    },
+                };
+                if should_pop {
+                    let _ = self.shared.write().await.pop_front();
+                }
+            },
+            SourceType::Revision => {
+                match id.parse::<i64>() {
+                    Ok(rev_id) => {
+                        let _ = self.rev_manager.ack_revision(rev_id).await?;
+                    },
+                    Err(e) => {
+                        tracing::error!("Parse rev_id from {} failed. {}", id, e);
+                    },
+                };
+            },
+        }
+
+        Ok(())
+    }
+}