Преглед на файлове

refactor backend editor

appflowy преди 3 години
родител
ревизия
d3bfca10e9

+ 26 - 26
backend/Cargo.lock

@@ -455,9 +455,9 @@ dependencies = [
  "config",
  "dashmap",
  "derive_more",
+ "flowy-collaboration",
  "flowy-core-infra",
  "flowy-document",
- "flowy-ot",
  "flowy-sdk",
  "flowy-test",
  "flowy-user",
@@ -1195,6 +1195,26 @@ dependencies = [
  "syn",
 ]
 
+[[package]]
+name = "flowy-collaboration"
+version = "0.1.0"
+dependencies = [
+ "bytes",
+ "chrono",
+ "flowy-derive",
+ "lib-ot",
+ "log",
+ "md5",
+ "parking_lot",
+ "protobuf",
+ "serde",
+ "strum",
+ "strum_macros",
+ "tokio",
+ "tracing",
+ "url",
+]
+
 [[package]]
 name = "flowy-core"
 version = "0.1.0"
@@ -1209,11 +1229,11 @@ dependencies = [
  "derive_more",
  "diesel",
  "diesel_derives",
+ "flowy-collaboration",
  "flowy-core-infra",
  "flowy-database",
  "flowy-derive",
  "flowy-document",
- "flowy-ot",
  "futures",
  "futures-core",
  "lazy_static",
@@ -1239,8 +1259,8 @@ dependencies = [
  "bytes",
  "chrono",
  "derive_more",
+ "flowy-collaboration",
  "flowy-derive",
- "flowy-ot",
  "log",
  "protobuf",
  "strum",
@@ -1284,9 +1304,9 @@ dependencies = [
  "derive_more",
  "diesel",
  "diesel_derives",
+ "flowy-collaboration",
  "flowy-database",
  "flowy-derive",
- "flowy-ot",
  "futures",
  "futures-core",
  "futures-util",
@@ -1309,26 +1329,6 @@ dependencies = [
  "url",
 ]
 
-[[package]]
-name = "flowy-ot"
-version = "0.1.0"
-dependencies = [
- "bytes",
- "chrono",
- "flowy-derive",
- "lib-ot",
- "log",
- "md5",
- "parking_lot",
- "protobuf",
- "serde",
- "strum",
- "strum_macros",
- "tokio",
- "tracing",
- "url",
-]
-
 [[package]]
 name = "flowy-sdk"
 version = "0.1.0"
@@ -1336,10 +1336,10 @@ dependencies = [
  "backend-service",
  "bytes",
  "color-eyre",
+ "flowy-collaboration",
  "flowy-core",
  "flowy-database",
  "flowy-document",
- "flowy-ot",
  "flowy-user",
  "futures-core",
  "lib-dispatch",
@@ -1360,9 +1360,9 @@ dependencies = [
  "bincode",
  "bytes",
  "claim",
+ "flowy-collaboration",
  "flowy-core",
  "flowy-document",
- "flowy-ot",
  "flowy-sdk",
  "flowy-user",
  "futures-util",

+ 0 - 123
backend/src/services/doc/edit/edit_actor.rs

@@ -1,123 +0,0 @@
-use crate::{
-    services::doc::edit::ServerDocEditor,
-    web_socket::{entities::Socket, WsUser},
-};
-use actix_web::web::Data;
-use async_stream::stream;
-use backend_service::errors::{internal_error, Result as DocResult, ServerError};
-use flowy_collaboration::protobuf::Doc;
-use futures::stream::StreamExt;
-use lib_ot::protobuf::Revision;
-use sqlx::PgPool;
-use std::sync::{atomic::Ordering::SeqCst, Arc};
-use tokio::{
-    sync::{mpsc, oneshot},
-    task::spawn_blocking,
-};
-
-#[derive(Clone)]
-pub struct EditUser {
-    user: Arc<WsUser>,
-    pub(crate) socket: Socket,
-}
-
-impl EditUser {
-    pub fn id(&self) -> String { self.user.id().to_string() }
-}
-
-#[derive(Debug)]
-pub enum EditMsg {
-    Revision {
-        user: Arc<WsUser>,
-        socket: Socket,
-        revision: Revision,
-        ret: oneshot::Sender<DocResult<()>>,
-    },
-    DocumentJson {
-        ret: oneshot::Sender<DocResult<String>>,
-    },
-    DocumentRevId {
-        ret: oneshot::Sender<DocResult<i64>>,
-    },
-    NewDocUser {
-        user: Arc<WsUser>,
-        socket: Socket,
-        rev_id: i64,
-        ret: oneshot::Sender<DocResult<()>>,
-    },
-}
-
-pub struct EditDocActor {
-    receiver: Option<mpsc::Receiver<EditMsg>>,
-    edit_doc: Arc<ServerDocEditor>,
-    pg_pool: Data<PgPool>,
-}
-
-impl EditDocActor {
-    pub fn new(receiver: mpsc::Receiver<EditMsg>, doc: Doc, pg_pool: Data<PgPool>) -> Result<Self, ServerError> {
-        let edit_doc = Arc::new(ServerDocEditor::new(doc)?);
-        Ok(Self {
-            receiver: Some(receiver),
-            edit_doc,
-            pg_pool,
-        })
-    }
-
-    pub async fn run(mut self) {
-        let mut receiver = self
-            .receiver
-            .take()
-            .expect("DocActor's receiver should only take one time");
-
-        let stream = stream! {
-            loop {
-                match receiver.recv().await {
-                    Some(msg) => yield msg,
-                    None => break,
-                }
-            }
-        };
-        stream.for_each(|msg| self.handle_message(msg)).await;
-    }
-
-    async fn handle_message(&self, msg: EditMsg) {
-        match msg {
-            EditMsg::Revision {
-                user,
-                socket,
-                revision,
-                ret,
-            } => {
-                let user = EditUser {
-                    user: user.clone(),
-                    socket: socket.clone(),
-                };
-                let _ = ret.send(self.edit_doc.apply_revision(user, revision, self.pg_pool.clone()).await);
-            },
-            EditMsg::DocumentJson { ret } => {
-                let edit_context = self.edit_doc.clone();
-                let json = spawn_blocking(move || edit_context.document_json())
-                    .await
-                    .map_err(internal_error);
-                let _ = ret.send(json);
-            },
-            EditMsg::DocumentRevId { ret } => {
-                let edit_context = self.edit_doc.clone();
-                let _ = ret.send(Ok(edit_context.rev_id.load(SeqCst)));
-            },
-            EditMsg::NewDocUser {
-                user,
-                socket,
-                rev_id,
-                ret,
-            } => {
-                log::debug!("Receive new doc user: {:?}, rev_id: {}", user, rev_id);
-                let user = EditUser {
-                    user: user.clone(),
-                    socket: socket.clone(),
-                };
-                let _ = ret.send(self.edit_doc.new_doc_user(user, rev_id).await);
-            },
-        }
-    }
-}

+ 0 - 265
backend/src/services/doc/edit/editor.rs

@@ -1,265 +0,0 @@
-use crate::{
-    services::{
-        doc::{edit::edit_actor::EditUser, update_doc},
-        util::md5,
-    },
-    web_socket::{entities::Socket, WsMessageAdaptor},
-};
-use actix_web::web::Data;
-use backend_service::errors::{internal_error, ServerError};
-use dashmap::DashMap;
-use flowy_collaboration::{
-    core::document::Document,
-    entities::ws::{WsDataType, WsDocumentData},
-    protobuf::{Doc, UpdateDocParams},
-};
-use lib_ot::{
-    core::OperationTransformable,
-    protobuf::{RevId, RevType, Revision, RevisionRange},
-    rich_text::RichTextDelta,
-};
-use parking_lot::RwLock;
-use protobuf::Message;
-use sqlx::PgPool;
-use std::{
-    cmp::Ordering,
-    sync::{
-        atomic::{AtomicI64, Ordering::SeqCst},
-        Arc,
-    },
-    time::Duration,
-};
-
-pub struct ServerDocEditor {
-    pub doc_id: String,
-    pub rev_id: AtomicI64,
-    document: Arc<RwLock<Document>>,
-    users: DashMap<String, EditUser>,
-}
-
-impl ServerDocEditor {
-    pub fn new(doc: Doc) -> Result<Self, ServerError> {
-        let delta = RichTextDelta::from_bytes(&doc.data).map_err(internal_error)?;
-        let document = Arc::new(RwLock::new(Document::from_delta(delta)));
-        let users = DashMap::new();
-        Ok(Self {
-            doc_id: doc.id.clone(),
-            rev_id: AtomicI64::new(doc.rev_id),
-            document,
-            users,
-        })
-    }
-
-    #[tracing::instrument(
-        level = "debug",
-        skip(self, user),
-        fields(
-            user_id = %user.id(),
-            rev_id = %rev_id,
-        )
-    )]
-    pub async fn new_doc_user(&self, user: EditUser, rev_id: i64) -> Result<(), ServerError> {
-        self.users.insert(user.id(), user.clone());
-        let cur_rev_id = self.rev_id.load(SeqCst);
-        match cur_rev_id.cmp(&rev_id) {
-            Ordering::Less => {
-                user.socket
-                    .do_send(mk_pull_message(&self.doc_id, next(cur_rev_id), rev_id))
-                    .map_err(internal_error)?;
-            },
-            Ordering::Equal => {},
-            Ordering::Greater => {
-                let doc_delta = self.document.read().delta().clone();
-                let cli_revision = self.mk_revision(rev_id, doc_delta);
-                let ws_cli_revision = mk_push_message(&self.doc_id, cli_revision);
-                user.socket.do_send(ws_cli_revision).map_err(internal_error)?;
-            },
-        }
-
-        Ok(())
-    }
-
-    #[tracing::instrument(
-        level = "debug",
-        skip(self, user, pg_pool, revision),
-        fields(
-            cur_rev_id = %self.rev_id.load(SeqCst),
-            base_rev_id = %revision.base_rev_id,
-            rev_id = %revision.rev_id,
-        ),
-        err
-    )]
-    pub async fn apply_revision(
-        &self,
-        user: EditUser,
-        revision: Revision,
-        pg_pool: Data<PgPool>,
-    ) -> Result<(), ServerError> {
-        self.users.insert(user.id(), user.clone());
-        let cur_rev_id = self.rev_id.load(SeqCst);
-        match cur_rev_id.cmp(&revision.rev_id) {
-            Ordering::Less => {
-                let next_rev_id = next(cur_rev_id);
-                if cur_rev_id == revision.base_rev_id || next_rev_id == revision.base_rev_id {
-                    // The rev is in the right order, just compose it.
-                    let _ = self.compose_revision(&revision).await?;
-                    let _ = send_acked_msg(&user.socket, &revision)?;
-                    let _ = self.save_revision(&revision, pg_pool).await?;
-                } else {
-                    // The server document is outdated, pull the missing revision from the client.
-                    let _ = send_pull_message(&user.socket, &self.doc_id, next_rev_id, revision.rev_id)?;
-                }
-            },
-            Ordering::Equal => {
-                // Do nothing
-                log::warn!("Applied revision rev_id is the same as cur_rev_id");
-            },
-            Ordering::Greater => {
-                // The client document is outdated. Transform the client revision delta and then
-                // send the prime delta to the client. Client should compose the this prime
-                // delta.
-                let cli_revision = self.transform_revision(&revision)?;
-                let _ = send_push_message(&user.socket, &self.doc_id, cli_revision)?;
-            },
-        }
-        Ok(())
-    }
-
-    pub fn document_json(&self) -> String { self.document.read().to_json() }
-
-    async fn compose_revision(&self, revision: &Revision) -> Result<(), ServerError> {
-        let delta = RichTextDelta::from_bytes(&revision.delta_data).map_err(internal_error)?;
-        let _ = self.compose_delta(delta)?;
-        let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id));
-        Ok(())
-    }
-
-    #[tracing::instrument(level = "debug", skip(self, revision))]
-    fn transform_revision(&self, revision: &Revision) -> Result<Revision, ServerError> {
-        let cli_delta = RichTextDelta::from_bytes(&revision.delta_data).map_err(internal_error)?;
-        let (cli_prime, server_prime) = self
-            .document
-            .read()
-            .delta()
-            .transform(&cli_delta)
-            .map_err(internal_error)?;
-
-        let _ = self.compose_delta(server_prime)?;
-        let cli_revision = self.mk_revision(revision.rev_id, cli_prime);
-        Ok(cli_revision)
-    }
-
-    fn mk_revision(&self, base_rev_id: i64, delta: RichTextDelta) -> Revision {
-        let delta_data = delta.to_bytes().to_vec();
-        let md5 = md5(&delta_data);
-        Revision {
-            base_rev_id,
-            rev_id: self.rev_id.load(SeqCst),
-            delta_data,
-            md5,
-            doc_id: self.doc_id.to_string(),
-            ty: RevType::Remote,
-            ..Default::default()
-        }
-    }
-
-    #[tracing::instrument(
-        level = "debug",
-        skip(self, delta),
-        fields(
-            revision_delta = %delta.to_json(),
-            result,
-        )
-    )]
-    fn compose_delta(&self, delta: RichTextDelta) -> Result<(), ServerError> {
-        if delta.is_empty() {
-            log::warn!("Composed delta is empty");
-        }
-
-        match self.document.try_write_for(Duration::from_millis(300)) {
-            None => {
-                log::error!("Failed to acquire write lock of document");
-            },
-            Some(mut write_guard) => {
-                let _ = write_guard.compose_delta(delta).map_err(internal_error)?;
-                tracing::Span::current().record("result", &write_guard.to_json().as_str());
-            },
-        }
-        Ok(())
-    }
-
-    #[tracing::instrument(level = "debug", skip(self, revision, pg_pool), err)]
-    async fn save_revision(&self, revision: &Revision, pg_pool: Data<PgPool>) -> Result<(), ServerError> {
-        // Opti: save with multiple revisions
-        let mut params = UpdateDocParams::new();
-        params.set_doc_id(self.doc_id.clone());
-        params.set_data(self.document.read().to_json());
-        params.set_rev_id(revision.rev_id);
-        let _ = update_doc(pg_pool.get_ref(), params).await?;
-        Ok(())
-    }
-}
-
-#[tracing::instrument(level = "debug", skip(socket, doc_id, revision), err)]
-fn send_push_message(socket: &Socket, doc_id: &str, revision: Revision) -> Result<(), ServerError> {
-    let msg = mk_push_message(doc_id, revision);
-    socket.try_send(msg).map_err(internal_error)
-}
-
-fn mk_push_message(doc_id: &str, revision: Revision) -> WsMessageAdaptor {
-    let bytes = revision.write_to_bytes().unwrap();
-    let data = WsDocumentData {
-        doc_id: doc_id.to_string(),
-        ty: WsDataType::PushRev,
-        data: bytes,
-    };
-    data.into()
-}
-
-#[tracing::instrument(level = "debug", skip(socket, doc_id), err)]
-fn send_pull_message(socket: &Socket, doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> Result<(), ServerError> {
-    let msg = mk_pull_message(doc_id, from_rev_id, to_rev_id);
-    socket.try_send(msg).map_err(internal_error)
-}
-
-fn mk_pull_message(doc_id: &str, from_rev_id: i64, to_rev_id: i64) -> WsMessageAdaptor {
-    let range = RevisionRange {
-        doc_id: doc_id.to_string(),
-        start: from_rev_id,
-        end: to_rev_id,
-        ..Default::default()
-    };
-
-    let bytes = range.write_to_bytes().unwrap();
-    let data = WsDocumentData {
-        doc_id: doc_id.to_string(),
-        ty: WsDataType::PullRev,
-        data: bytes,
-    };
-    data.into()
-}
-
-#[tracing::instrument(level = "debug", skip(socket, revision), err)]
-fn send_acked_msg(socket: &Socket, revision: &Revision) -> Result<(), ServerError> {
-    let msg = mk_acked_message(revision);
-    socket.try_send(msg).map_err(internal_error)
-}
-
-fn mk_acked_message(revision: &Revision) -> WsMessageAdaptor {
-    // let mut wtr = vec![];
-    // let _ = wtr.write_i64::<BigEndian>(revision.rev_id);
-    let mut rev_id = RevId::new();
-    rev_id.set_value(revision.rev_id);
-    let data = rev_id.write_to_bytes().unwrap();
-
-    let data = WsDocumentData {
-        doc_id: revision.doc_id.clone(),
-        ty: WsDataType::Acked,
-        data,
-    };
-
-    data.into()
-}
-
-#[inline]
-fn next(rev_id: i64) -> i64 { rev_id + 1 }

+ 0 - 5
backend/src/services/doc/edit/mod.rs

@@ -1,5 +0,0 @@
-pub(crate) mod edit_actor;
-mod editor;
-
-pub use edit_actor::*;
-pub use editor::*;

+ 149 - 0
backend/src/services/doc/editor.rs

@@ -0,0 +1,149 @@
+use crate::{
+    services::doc::update_doc,
+    web_socket::{entities::Socket, WsMessageAdaptor, WsUser},
+};
+use actix_web::web::Data;
+use backend_service::errors::{internal_error, ServerError};
+use dashmap::DashMap;
+use flowy_collaboration::{
+    core::{
+        document::Document,
+        sync::{RevisionSynchronizer, RevisionUser, SyncResponse},
+    },
+    protobuf::{Doc, UpdateDocParams},
+};
+use lib_ot::{protobuf::Revision, rich_text::RichTextDelta};
+use sqlx::PgPool;
+use std::{
+    convert::TryInto,
+    sync::{
+        atomic::{AtomicI64, Ordering::SeqCst},
+        Arc,
+    },
+};
+
+#[rustfmt::skip]
+//                            ┌──────────────────────┐     ┌────────────┐
+//                       ┌───▶│ RevisionSynchronizer │────▶│  Document  │
+//                       │    └──────────────────────┘     └────────────┘
+// ┌────────────────┐    │
+// │ServerDocEditor │────┤                                          ┌───────────┐
+// └────────────────┘    │                                     ┌───▶│  WsUser   │
+//                       │                                     │    └───────────┘
+//                       │    ┌────────┐       ┌───────────┐   │    ┌───────────┐
+//                       └───▶│ Users  │◆──────│  DocUser  ├───┼───▶│  Socket   │
+//                            └────────┘       └───────────┘   │    └───────────┘
+//                                                             │    ┌───────────┐
+//                                                             └───▶│  PgPool   │
+//                                                                  └───────────┘
+pub struct ServerDocEditor {
+    pub doc_id: String,
+    pub rev_id: AtomicI64,
+    synchronizer: Arc<RevisionSynchronizer>,
+    users: DashMap<String, DocUser>,
+}
+
+impl ServerDocEditor {
+    pub fn new(doc: Doc) -> Result<Self, ServerError> {
+        let delta = RichTextDelta::from_bytes(&doc.data).map_err(internal_error)?;
+        let users = DashMap::new();
+        let synchronizer = Arc::new(RevisionSynchronizer::new(
+            &doc.id,
+            doc.rev_id,
+            Document::from_delta(delta),
+        ));
+
+        Ok(Self {
+            doc_id: doc.id.clone(),
+            rev_id: AtomicI64::new(doc.rev_id),
+            synchronizer,
+            users,
+        })
+    }
+
+    #[tracing::instrument(
+        level = "debug",
+        skip(self, user),
+        fields(
+            user_id = %user.id(),
+            rev_id = %rev_id,
+        )
+    )]
+    pub async fn new_doc_user(&self, user: DocUser, rev_id: i64) -> Result<(), ServerError> {
+        self.users.insert(user.id(), user.clone());
+        self.synchronizer.new_conn(user, rev_id);
+        Ok(())
+    }
+
+    #[tracing::instrument(
+        level = "debug",
+        skip(self, user, revision),
+        fields(
+            cur_rev_id = %self.rev_id.load(SeqCst),
+            base_rev_id = %revision.base_rev_id,
+            rev_id = %revision.rev_id,
+        ),
+        err
+    )]
+    pub async fn apply_revision(&self, user: DocUser, mut revision: Revision) -> Result<(), ServerError> {
+        self.users.insert(user.id(), user.clone());
+        let revision = (&mut revision).try_into().map_err(internal_error)?;
+        self.synchronizer.apply_revision(user, revision).unwrap();
+        Ok(())
+    }
+
+    pub fn document_json(&self) -> String { self.synchronizer.doc_json() }
+}
+
+#[derive(Clone)]
+pub struct DocUser {
+    pub user: Arc<WsUser>,
+    pub(crate) socket: Socket,
+    pub pg_pool: Data<PgPool>,
+}
+
+impl DocUser {
+    pub fn id(&self) -> String { self.user.id().to_string() }
+}
+
+impl RevisionUser for DocUser {
+    fn recv(&self, resp: SyncResponse) {
+        let result = match resp {
+            SyncResponse::Pull(data) => {
+                let msg: WsMessageAdaptor = data.into();
+                self.socket.try_send(msg).map_err(internal_error)
+            },
+            SyncResponse::Push(data) => {
+                let msg: WsMessageAdaptor = data.into();
+                self.socket.try_send(msg).map_err(internal_error)
+            },
+            SyncResponse::Ack(data) => {
+                let msg: WsMessageAdaptor = data.into();
+                self.socket.try_send(msg).map_err(internal_error)
+            },
+            SyncResponse::NewRevision {
+                rev_id,
+                doc_id,
+                doc_json,
+            } => {
+                let pg_pool = self.pg_pool.clone();
+                tokio::task::spawn(async move {
+                    let mut params = UpdateDocParams::new();
+                    params.set_doc_id(doc_id);
+                    params.set_data(doc_json);
+                    params.set_rev_id(rev_id);
+                    match update_doc(pg_pool.get_ref(), params).await {
+                        Ok(_) => {},
+                        Err(e) => log::error!("{}", e),
+                    }
+                });
+                Ok(())
+            },
+        };
+
+        match result {
+            Ok(_) => {},
+            Err(e) => log::error!("{}", e),
+        }
+    }
+}

+ 120 - 22
backend/src/services/doc/manager.rs

@@ -1,18 +1,20 @@
 use crate::{
     services::doc::{
-        edit::edit_actor::{EditDocActor, EditMsg},
+        editor::{DocUser, ServerDocEditor},
         read_doc,
         ws_actor::{DocWsActor, DocWsMsg},
     },
     web_socket::{entities::Socket, WsBizHandler, WsClientData, WsUser},
 };
 use actix_web::web::Data;
+use async_stream::stream;
 use backend_service::errors::{internal_error, Result as DocResult, ServerError};
 use dashmap::DashMap;
 use flowy_collaboration::protobuf::{Doc, DocIdentifier};
+use futures::stream::StreamExt;
 use lib_ot::protobuf::Revision;
 use sqlx::PgPool;
-use std::sync::Arc;
+use std::sync::{atomic::Ordering::SeqCst, Arc};
 use tokio::{
     sync::{mpsc, oneshot},
     task::spawn_blocking,
@@ -43,13 +45,17 @@ impl DocumentCore {
 }
 
 impl WsBizHandler for DocumentCore {
-    fn receive_data(&self, client_data: WsClientData) {
+    fn receive(&self, data: WsClientData) {
         let (ret, rx) = oneshot::channel();
         let sender = self.ws_sender.clone();
         let pool = self.pg_pool.clone();
 
         actix_rt::spawn(async move {
-            let msg = DocWsMsg::ClientData { client_data, ret, pool };
+            let msg = DocWsMsg::ClientData {
+                client_data: data,
+                ret,
+                pool,
+            };
             match sender.send(msg).await {
                 Ok(_) => {},
                 Err(e) => log::error!("{}", e),
@@ -63,16 +69,9 @@ impl WsBizHandler for DocumentCore {
 }
 
 #[rustfmt::skip]
-//                                              EditDocActor
-//                                             ┌────────────────────────────────────┐
-//                                             │  ServerDocEditor                   │
-//                                             │  ┌──────────────────────────────┐  │
-// ┌────────────┐ 1  n ┌───────────────┐       │  │ ┌──────────┐    ┌──────────┐ │  │
-// │ DocManager │─────▶│ OpenDocHandle │──────▶│  │ │ Document │    │  Users   │ │  │
-// └────────────┘      └───────────────┘       │  │ └──────────┘    └──────────┘ │  │
-//                                             │  └──────────────────────────────┘  │
-//                                             │                                    │
-//                                             └────────────────────────────────────┘
+// ┌────────────┐ 1    n ┌───────────────┐     ┌──────────────────┐    ┌────────────────┐
+// │ DocManager │───────▶│ OpenDocHandle │────▶│ DocMessageQueue  │───▶│ServerDocEditor │
+// └────────────┘        └───────────────┘     └──────────────────┘    └────────────────┘
 pub struct DocManager {
     open_doc_map: DashMap<String, Arc<OpenDocHandle>>,
 }
@@ -109,20 +108,20 @@ impl DocManager {
 }
 
 pub struct OpenDocHandle {
-    pub sender: mpsc::Sender<EditMsg>,
+    pub sender: mpsc::Sender<DocMessage>,
 }
 
 impl OpenDocHandle {
     pub fn new(doc: Doc, pg_pool: Data<PgPool>) -> Result<Self, ServerError> {
         let (sender, receiver) = mpsc::channel(100);
-        let actor = EditDocActor::new(receiver, doc, pg_pool)?;
-        tokio::task::spawn(actor.run());
+        let queue = DocMessageQueue::new(receiver, doc, pg_pool)?;
+        tokio::task::spawn(queue.run());
         Ok(Self { sender })
     }
 
     pub async fn add_user(&self, user: Arc<WsUser>, rev_id: i64, socket: Socket) -> Result<(), ServerError> {
         let (ret, rx) = oneshot::channel();
-        let msg = EditMsg::NewDocUser {
+        let msg = DocMessage::NewConnectedUser {
             user,
             socket,
             rev_id,
@@ -139,7 +138,7 @@ impl OpenDocHandle {
         revision: Revision,
     ) -> Result<(), ServerError> {
         let (ret, rx) = oneshot::channel();
-        let msg = EditMsg::Revision {
+        let msg = DocMessage::ReceiveRevision {
             user,
             socket,
             revision,
@@ -151,19 +150,118 @@ impl OpenDocHandle {
 
     pub async fn document_json(&self) -> DocResult<String> {
         let (ret, rx) = oneshot::channel();
-        let msg = EditMsg::DocumentJson { ret };
+        let msg = DocMessage::GetDocJson { ret };
         self.send(msg, rx).await?
     }
 
     pub async fn rev_id(&self) -> DocResult<i64> {
         let (ret, rx) = oneshot::channel();
-        let msg = EditMsg::DocumentRevId { ret };
+        let msg = DocMessage::GetDocRevId { ret };
         self.send(msg, rx).await?
     }
 
-    pub(crate) async fn send<T>(&self, msg: EditMsg, rx: oneshot::Receiver<T>) -> DocResult<T> {
+    pub(crate) async fn send<T>(&self, msg: DocMessage, rx: oneshot::Receiver<T>) -> DocResult<T> {
         let _ = self.sender.send(msg).await.map_err(internal_error)?;
         let result = rx.await?;
         Ok(result)
     }
 }
+
+#[derive(Debug)]
+pub enum DocMessage {
+    NewConnectedUser {
+        user: Arc<WsUser>,
+        socket: Socket,
+        rev_id: i64,
+        ret: oneshot::Sender<DocResult<()>>,
+    },
+    ReceiveRevision {
+        user: Arc<WsUser>,
+        socket: Socket,
+        revision: Revision,
+        ret: oneshot::Sender<DocResult<()>>,
+    },
+    GetDocJson {
+        ret: oneshot::Sender<DocResult<String>>,
+    },
+    GetDocRevId {
+        ret: oneshot::Sender<DocResult<i64>>,
+    },
+}
+
+struct DocMessageQueue {
+    receiver: Option<mpsc::Receiver<DocMessage>>,
+    edit_doc: Arc<ServerDocEditor>,
+    pg_pool: Data<PgPool>,
+}
+
+impl DocMessageQueue {
+    fn new(receiver: mpsc::Receiver<DocMessage>, doc: Doc, pg_pool: Data<PgPool>) -> Result<Self, ServerError> {
+        let edit_doc = Arc::new(ServerDocEditor::new(doc)?);
+        Ok(Self {
+            receiver: Some(receiver),
+            edit_doc,
+            pg_pool,
+        })
+    }
+
+    async fn run(mut self) {
+        let mut receiver = self
+            .receiver
+            .take()
+            .expect("DocActor's receiver should only take one time");
+
+        let stream = stream! {
+            loop {
+                match receiver.recv().await {
+                    Some(msg) => yield msg,
+                    None => break,
+                }
+            }
+        };
+        stream.for_each(|msg| self.handle_message(msg)).await;
+    }
+
+    async fn handle_message(&self, msg: DocMessage) {
+        match msg {
+            DocMessage::NewConnectedUser {
+                user,
+                socket,
+                rev_id,
+                ret,
+            } => {
+                log::debug!("Receive new doc user: {:?}, rev_id: {}", user, rev_id);
+                let user = DocUser {
+                    user: user.clone(),
+                    socket: socket.clone(),
+                    pg_pool: self.pg_pool.clone(),
+                };
+                let _ = ret.send(self.edit_doc.new_doc_user(user, rev_id).await);
+            },
+            DocMessage::ReceiveRevision {
+                user,
+                socket,
+                revision,
+                ret,
+            } => {
+                let user = DocUser {
+                    user: user.clone(),
+                    socket: socket.clone(),
+                    pg_pool: self.pg_pool.clone(),
+                };
+                let _ = ret.send(self.edit_doc.apply_revision(user, revision).await);
+            },
+            DocMessage::GetDocJson { ret } => {
+                let edit_context = self.edit_doc.clone();
+                let json = spawn_blocking(move || edit_context.document_json())
+                    .await
+                    .map_err(internal_error);
+                let _ = ret.send(json);
+            },
+            DocMessage::GetDocRevId { ret } => {
+                let rev_id = self.edit_doc.rev_id.load(SeqCst);
+                let _ = ret.send(Ok(rev_id));
+            },
+        }
+    }
+}

+ 2 - 1
backend/src/services/doc/mod.rs

@@ -1,9 +1,10 @@
 #![allow(clippy::module_inception)]
+
 pub(crate) use crud::*;
 pub use router::*;
 
 pub mod crud;
-mod edit;
+mod editor;
 pub mod manager;
 pub mod router;
 mod ws_actor;

+ 1 - 1
backend/src/web_socket/biz_handler.rs

@@ -3,7 +3,7 @@ use lib_ws::WsModule;
 use std::{collections::HashMap, sync::Arc};
 
 pub trait WsBizHandler: Send + Sync {
-    fn receive_data(&self, client_data: WsClientData);
+    fn receive(&self, data: WsClientData);
 }
 
 pub type BizHandler = Arc<dyn WsBizHandler>;

+ 18 - 0
backend/src/web_socket/router.rs

@@ -12,6 +12,24 @@ use actix_web::{
 };
 use actix_web_actors::ws;
 
+#[rustfmt::skip]
+//                   WsClient
+//                  ┌─────────────┐
+//                  │ ┌────────┐  │
+//  wss://xxx ─────▶│ │ WsUser │  │───┐
+//                  │ └────────┘  │   │
+//                  └─────────────┘   │
+//                                    │
+//                                    │    ┌───────────────┐   ┌─────────────┐    ┌────────────────┐
+//                                    ├───▶│ WsBizHandlers │──▶│WsBizHandler │───▶│  WsClientData  │
+//                                    │    └───────────────┘   └─────────────┘    └────────────────┘
+//                   WsClient         │                               △
+//                  ┌─────────────┐   │                               │
+//                  │ ┌────────┐  │   │                               │
+//  wss://xxx ─────▶│ │ WsUser │  │───┘                       ┌───────────────┐
+//                  │ └────────┘  │                           │ DocumentCore  │
+//                  └─────────────┘                           └───────────────┘
+
 #[get("/{token}")]
 pub async fn establish_ws_connection(
     request: HttpRequest,

+ 1 - 1
backend/src/web_socket/ws_client.rs

@@ -75,7 +75,7 @@ impl WsClient {
                     socket,
                     data: Bytes::from(message.data),
                 };
-                handler.receive_data(client_data);
+                handler.receive(client_data);
             },
         }
     }

+ 3 - 9
frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs

@@ -15,7 +15,7 @@ use futures::stream::StreamExt;
 use lib_ot::revision::{RevId, RevisionRange};
 use std::{convert::TryFrom, sync::Arc};
 use tokio::{
-    sync::{broadcast, mpsc, mpsc::error::SendError},
+    sync::{broadcast, mpsc},
     task::spawn_blocking,
     time::{interval, Duration},
 };
@@ -175,7 +175,7 @@ impl RevisionUpStream {
         match self.revisions.next().await? {
             None => Ok(()),
             Some(record) => {
-                tracing::trace!(
+                tracing::debug!(
                     "[RevisionUpStream]: processes revision: {}:{:?}",
                     record.revision.doc_id,
                     record.revision.rev_id
@@ -190,13 +190,7 @@ impl RevisionUpStream {
 
 async fn tick(sender: mpsc::UnboundedSender<UpStreamMsg>) {
     let mut i = interval(Duration::from_secs(2));
-    loop {
-        match sender.send(UpStreamMsg::Tick) {
-            Ok(_) => {},
-            Err(_e) => {
-                break;
-            },
-        }
+    while sender.send(UpStreamMsg::Tick).is_ok() {
         i.tick().await;
     }
 }

+ 1 - 1
frontend/rust-lib/flowy-user/src/services/user/ws_manager.rs

@@ -218,7 +218,7 @@ mod mock {
 
     impl FlowyWsSender for broadcast::Sender<WsMessage> {
         fn send(&self, _msg: WsMessage) -> Result<(), UserError> {
-            // let _ = self.send(msg).unwrap();
+            let _ = self.send(msg).unwrap();
             Ok(())
         }
     }

+ 21 - 21
shared-lib/Cargo.lock

@@ -657,6 +657,26 @@ dependencies = [
  "syn",
 ]
 
+[[package]]
+name = "flowy-collaboration"
+version = "0.1.0"
+dependencies = [
+ "bytes",
+ "chrono",
+ "flowy-derive",
+ "lib-ot",
+ "log",
+ "md5",
+ "parking_lot",
+ "protobuf",
+ "serde",
+ "strum",
+ "strum_macros",
+ "tokio",
+ "tracing",
+ "url",
+]
+
 [[package]]
 name = "flowy-core-infra"
 version = "0.1.0"
@@ -664,8 +684,8 @@ dependencies = [
  "bytes",
  "chrono",
  "derive_more",
+ "flowy-collaboration",
  "flowy-derive",
- "flowy-ot",
  "log",
  "protobuf",
  "strum",
@@ -687,26 +707,6 @@ dependencies = [
  "trybuild",
 ]
 
-[[package]]
-name = "flowy-ot"
-version = "0.1.0"
-dependencies = [
- "bytes",
- "chrono",
- "flowy-derive",
- "lib-ot",
- "log",
- "md5",
- "parking_lot",
- "protobuf",
- "serde",
- "strum",
- "strum_macros",
- "tokio",
- "tracing",
- "url",
-]
-
 [[package]]
 name = "flowy-user-infra"
 version = "0.1.0"

+ 32 - 32
shared-lib/flowy-collaboration/src/core/sync/rev_sync.rs

@@ -21,57 +21,59 @@ use std::{
     },
     time::Duration,
 };
-use tokio::sync::mpsc;
 
-pub enum SynchronizerCommand {
+pub trait RevisionUser {
+    fn recv(&self, resp: SyncResponse);
+}
+
+pub enum SyncResponse {
     Pull(WsDocumentData),
     Push(WsDocumentData),
     Ack(WsDocumentData),
-    SaveRevision(Revision),
+    NewRevision {
+        rev_id: i64,
+        doc_json: String,
+        doc_id: String,
+    },
 }
 
-pub type CommandReceiver = Arc<dyn Fn(SynchronizerCommand)>;
-
 pub struct RevisionSynchronizer {
     pub doc_id: String,
     pub rev_id: AtomicI64,
     document: Arc<RwLock<Document>>,
-    command_receiver: CommandReceiver,
 }
 
 impl RevisionSynchronizer {
-    pub fn new(
-        doc_id: &str,
-        rev_id: i64,
-        document: Arc<RwLock<Document>>,
-        command_receiver: CommandReceiver,
-    ) -> RevisionSynchronizer {
+    pub fn new(doc_id: &str, rev_id: i64, document: Document) -> RevisionSynchronizer {
+        let document = Arc::new(RwLock::new(document));
         RevisionSynchronizer {
             doc_id: doc_id.to_string(),
             rev_id: AtomicI64::new(rev_id),
             document,
-            command_receiver,
         }
     }
 
-    pub fn new_conn(&self, rev_id: i64) {
+    pub fn new_conn<T: RevisionUser>(&self, user: T, rev_id: i64) {
         let cur_rev_id = self.rev_id.load(SeqCst);
         match cur_rev_id.cmp(&rev_id) {
             Ordering::Less => {
                 let msg = mk_pull_message(&self.doc_id, next(cur_rev_id), rev_id);
-                self.send_command(SynchronizerCommand::Pull(msg));
+                user.recv(SyncResponse::Pull(msg));
             },
             Ordering::Equal => {},
             Ordering::Greater => {
                 let doc_delta = self.document.read().delta().clone();
                 let revision = self.mk_revision(rev_id, doc_delta);
                 let data = mk_push_message(&self.doc_id, revision);
-                self.send_command(SynchronizerCommand::Push(data));
+                user.recv(SyncResponse::Push(data));
             },
         }
     }
 
-    pub fn apply_revision(&self, revision: Revision) -> Result<(), OTError> {
+    pub fn apply_revision<T>(&self, user: T, revision: Revision) -> Result<(), OTError>
+    where
+        T: RevisionUser,
+    {
         let cur_rev_id = self.rev_id.load(SeqCst);
         match cur_rev_id.cmp(&revision.rev_id) {
             Ordering::Less => {
@@ -79,12 +81,19 @@ impl RevisionSynchronizer {
                 if cur_rev_id == revision.base_rev_id || next_rev_id == revision.base_rev_id {
                     // The rev is in the right order, just compose it.
                     let _ = self.compose_revision(&revision)?;
-                    self.send_command(SynchronizerCommand::Ack(mk_acked_message(&revision)));
-                    self.send_command(SynchronizerCommand::SaveRevision(revision));
+                    user.recv(SyncResponse::Ack(mk_acked_message(&revision)));
+                    let rev_id = revision.rev_id;
+                    let doc_id = self.doc_id.clone();
+                    let doc_json = self.doc_json();
+                    user.recv(SyncResponse::NewRevision {
+                        rev_id,
+                        doc_id,
+                        doc_json,
+                    });
                 } else {
                     // The server document is outdated, pull the missing revision from the client.
                     let msg = mk_pull_message(&self.doc_id, next_rev_id, revision.rev_id);
-                    self.send_command(SynchronizerCommand::Pull(msg));
+                    user.recv(SyncResponse::Pull(msg));
                 }
             },
             Ordering::Equal => {
@@ -97,12 +106,14 @@ impl RevisionSynchronizer {
                 // delta.
                 let cli_revision = self.transform_revision(&revision)?;
                 let data = mk_push_message(&self.doc_id, cli_revision);
-                self.send_command(SynchronizerCommand::Push(data));
+                user.recv(SyncResponse::Push(data));
             },
         }
         Ok(())
     }
 
+    pub fn doc_json(&self) -> String { self.document.read().to_json() }
+
     fn compose_revision(&self, revision: &Revision) -> Result<(), OTError> {
         let delta = RichTextDelta::from_bytes(&revision.delta_data)?;
         let _ = self.compose_delta(delta)?;
@@ -120,16 +131,6 @@ impl RevisionSynchronizer {
         Ok(cli_revision)
     }
 
-    fn send_command(&self, command: SynchronizerCommand) { (self.command_receiver)(command); }
-
-    #[tracing::instrument(
-        level = "debug",
-        skip(self, delta),
-        fields(
-        revision_delta = %delta.to_json(),
-        result,
-        )
-    )]
     fn compose_delta(&self, delta: RichTextDelta) -> Result<(), OTError> {
         if delta.is_empty() {
             log::warn!("Composed delta is empty");
@@ -139,7 +140,6 @@ impl RevisionSynchronizer {
             None => log::error!("Failed to acquire write lock of document"),
             Some(mut write_guard) => {
                 let _ = write_guard.compose_delta(delta);
-                tracing::Span::current().record("result", &write_guard.to_json().as_str());
             },
         }
         Ok(())

+ 9 - 0
shared-lib/lib-ot/src/errors.rs

@@ -37,6 +37,7 @@ impl OTError {
 
     static_ot_error!(duplicate_revision, OTErrorCode::DuplicatedRevision);
     static_ot_error!(revision_id_conflict, OTErrorCode::RevisionIDConflict);
+    static_ot_error!(internal, OTErrorCode::Internal);
 }
 
 impl fmt::Display for OTError {
@@ -68,6 +69,7 @@ pub enum OTErrorCode {
     SerdeError,
     DuplicatedRevision,
     RevisionIDConflict,
+    Internal,
 }
 
 pub struct ErrorBuilder {
@@ -96,3 +98,10 @@ impl ErrorBuilder {
 
     pub fn build(mut self) -> OTError { OTError::new(self.code, &self.msg.take().unwrap_or_else(|| "".to_owned())) }
 }
+
+pub fn internal_error<T>(e: T) -> OTError
+where
+    T: std::fmt::Debug,
+{
+    OTError::internal().context(e)
+}

+ 6 - 0
shared-lib/lib-ot/src/revision/model.rs

@@ -30,6 +30,12 @@ impl Revision {
     pub fn is_empty(&self) -> bool { self.base_rev_id == self.rev_id }
 
     pub fn pair_rev_id(&self) -> (i64, i64) { (self.base_rev_id, self.rev_id) }
+
+    // pub fn from_pb(pb: &mut crate::protobuf::Revision) -> Self {
+    // pb.try_into().unwrap() }
+
+    // pub fn from_pb(mut pb: crate::protobuf::Revision) -> Self {
+    // Revision::try_from(&mut pb).unwrap() }
 }
 
 impl std::fmt::Debug for Revision {