فهرست منبع

push rev if client doc is outdated

appflowy 3 سال پیش
والد
کامیت
15d628c750

+ 18 - 1
backend/src/service/doc/edit/edit_actor.rs

@@ -35,6 +35,12 @@ pub enum EditMsg {
     DocumentJson {
         ret: oneshot::Sender<DocResult<String>>,
     },
+    NewDocUser {
+        user: Arc<WsUser>,
+        socket: Socket,
+        rev_id: i64,
+        ret: oneshot::Sender<DocResult<()>>,
+    },
 }
 
 pub struct EditDocActor {
@@ -78,7 +84,6 @@ impl EditDocActor {
                 revision,
                 ret,
             } => {
-                // ret.send(self.handle_client_data(client_data, pool).await);
                 let user = EditUser {
                     user: user.clone(),
                     socket: socket.clone(),
@@ -92,6 +97,18 @@ impl EditDocActor {
                     .map_err(internal_error);
                 let _ = ret.send(json);
             },
+            EditMsg::NewDocUser {
+                user,
+                socket,
+                rev_id,
+                ret,
+            } => {
+                let user = EditUser {
+                    user: user.clone(),
+                    socket: socket.clone(),
+                };
+                let _ = ret.send(self.edit_doc.new_connection(user, rev_id, self.pg_pool.clone()).await);
+            },
         }
     }
 }

+ 64 - 41
backend/src/service/doc/edit/edit_doc.rs

@@ -5,6 +5,7 @@ use crate::service::{
 };
 use actix_web::web::Data;
 
+use crate::service::doc::edit::interval::Interval;
 use bytes::Bytes;
 use dashmap::DashMap;
 use flowy_document::{
@@ -22,6 +23,7 @@ use parking_lot::RwLock;
 use protobuf::Message;
 use sqlx::PgPool;
 use std::{
+    cmp::Ordering,
     convert::TryInto,
     sync::{
         atomic::{AtomicI64, Ordering::SeqCst},
@@ -52,6 +54,28 @@ impl ServerEditDoc {
 
     pub fn document_json(&self) -> String { self.document.read().to_json() }
 
+    pub async fn new_connection(&self, user: EditUser, rev_id: i64, _pg_pool: Data<PgPool>) -> Result<(), ServerError> {
+        self.users.insert(user.id(), user.clone());
+        let cur_rev_id = self.rev_id.load(SeqCst);
+        if cur_rev_id > rev_id {
+            let doc_delta = self.document.read().delta().clone();
+            let cli_revision = self.mk_revision(rev_id, doc_delta);
+            let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision);
+            user.socket.do_send(ws_cli_revision).map_err(internal_error)?;
+        }
+
+        Ok(())
+    }
+
+    #[tracing::instrument(
+        level = "debug",
+        skip(self, user, pg_pool),
+        fields(
+            rev_id = %self.rev_id.load(SeqCst),
+            revision_rev_id = %revision.rev_id,
+            revision_base_rev_id = %revision.base_rev_id
+        )
+    )]
     pub async fn apply_revision(
         &self,
         user: EditUser,
@@ -60,49 +84,48 @@ impl ServerEditDoc {
     ) -> Result<(), ServerError> {
         // Opti: find out another way to keep the user socket available.
         self.users.insert(user.id(), user.clone());
-        log::debug!(
-            "cur_base_rev_id: {}, expect_base_rev_id: {} rev_id: {}",
-            self.rev_id.load(SeqCst),
-            revision.base_rev_id,
-            revision.rev_id
-        );
-
         let cur_rev_id = self.rev_id.load(SeqCst);
-        if cur_rev_id > revision.rev_id {
-            // The client document is outdated. Transform the client revision delta and then
-            // send the prime delta to the client. Client should compose the this prime
-            // delta.
+        match cur_rev_id.cmp(&revision.rev_id) {
+            Ordering::Less => {
+                if cur_rev_id != revision.base_rev_id {
+                    // The server document is outdated, try to get the missing revision from the
+                    // client.
+                    user.socket
+                        .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id))
+                        .map_err(internal_error)?;
+                } else {
+                    let _ = self.compose_revision(&revision, pg_pool).await?;
+                    user.socket
+                        .do_send(mk_acked_ws_message(&revision))
+                        .map_err(internal_error)?;
+                }
+            },
+            Ordering::Equal => {},
+            Ordering::Greater => {
+                // The client document is outdated. Transform the client revision delta and then
+                // send the prime delta to the client. Client should compose the this prime
+                // delta.
+                let cli_revision = self.transform_client_revision(&revision)?;
+                let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision);
+                user.socket.do_send(ws_cli_revision).map_err(internal_error)?;
+            },
+        }
+        Ok(())
+    }
 
-            let (cli_prime, server_prime) = self.transform(&revision.delta_data).map_err(internal_error)?;
-            let _ = self.update_document_delta(server_prime)?;
+    async fn compose_revision(&self, revision: &Revision, pg_pool: Data<PgPool>) -> Result<(), ServerError> {
+        let delta = Delta::from_bytes(&revision.delta_data).map_err(internal_error)?;
+        let _ = self.compose_delta(delta)?;
+        let _ = self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id));
+        let _ = self.save_revision(&revision, pg_pool).await?;
+        Ok(())
+    }
 
-            log::debug!("{} client delta: {}", self.doc_id, cli_prime.to_json());
-            let cli_revision = self.mk_revision(revision.rev_id, cli_prime);
-            let ws_cli_revision = mk_push_rev_ws_message(&self.doc_id, cli_revision);
-            user.socket.do_send(ws_cli_revision).map_err(internal_error)?;
-            Ok(())
-        } else if cur_rev_id < revision.rev_id {
-            if cur_rev_id != revision.base_rev_id {
-                // The server document is outdated, try to get the missing revision from the
-                // client.
-                user.socket
-                    .do_send(mk_pull_rev_ws_message(&self.doc_id, cur_rev_id, revision.rev_id))
-                    .map_err(internal_error)?;
-            } else {
-                let delta = Delta::from_bytes(&revision.delta_data).map_err(internal_error)?;
-                let _ = self.update_document_delta(delta)?;
-                user.socket
-                    .do_send(mk_acked_ws_message(&revision))
-                    .map_err(internal_error)?;
-                self.rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(revision.rev_id));
-                let _ = self.save_revision(&revision, pg_pool).await?;
-            }
-
-            Ok(())
-        } else {
-            log::error!("Client rev_id should not equal to server rev_id");
-            Ok(())
-        }
+    fn transform_client_revision(&self, revision: &Revision) -> Result<Revision, ServerError> {
+        let (cli_prime, server_prime) = self.transform(&revision.delta_data).map_err(internal_error)?;
+        let _ = self.compose_delta(server_prime)?;
+        let cli_revision = self.mk_revision(revision.rev_id, cli_prime);
+        Ok(cli_revision)
     }
 
     fn mk_revision(&self, base_rev_id: i64, delta: Delta) -> Revision {
@@ -133,7 +156,7 @@ impl ServerEditDoc {
     }
 
     #[tracing::instrument(level = "debug", skip(self), err)]
-    fn update_document_delta(&self, delta: Delta) -> Result<(), ServerError> {
+    fn compose_delta(&self, delta: Delta) -> Result<(), ServerError> {
         // Opti: push each revision into queue and process it one by one.
         match self.document.try_write_for(Duration::from_millis(300)) {
             None => {

+ 57 - 0
backend/src/service/doc/edit/interval.rs

@@ -0,0 +1,57 @@
+use std::cmp::{max, min};
+
+#[derive(Clone, Copy, PartialEq, Eq)]
+pub struct Interval {
+    pub start: i64,
+    pub end: i64,
+}
+
+impl Interval {
+    /// Construct a new `Interval` representing the range [start..end).
+    /// It is an invariant that `start <= end`.
+    pub fn new(start: i64, end: i64) -> Interval {
+        debug_assert!(start <= end);
+        Interval { start, end }
+    }
+
+    pub fn start(&self) -> i64 { self.start }
+
+    pub fn end(&self) -> i64 { self.end }
+
+    pub fn is_before(&self, val: i64) -> bool { self.end <= val }
+
+    pub fn contains(&self, val: i64) -> bool { self.start <= val && val < self.end }
+
+    pub fn contains_range(&self, start: i64, end: i64) -> bool { !self.intersect(Interval::new(start, end)).is_empty() }
+
+    pub fn is_after(&self, val: i64) -> bool { self.start > val }
+
+    pub fn is_empty(&self) -> bool { self.end <= self.start }
+
+    pub fn intersect(&self, other: Interval) -> Interval {
+        let start = max(self.start, other.start);
+        let end = min(self.end, other.end);
+        Interval {
+            start,
+            end: max(start, end),
+        }
+    }
+
+    // the first half of self - other
+    pub fn prefix(&self, other: Interval) -> Interval {
+        Interval {
+            start: min(self.start, other.start),
+            end: min(self.end, other.start),
+        }
+    }
+
+    // the second half of self - other
+    pub fn suffix(&self, other: Interval) -> Interval {
+        Interval {
+            start: max(self.start, other.end),
+            end: max(self.end, other.end),
+        }
+    }
+
+    pub fn size(&self) -> i64 { self.end - self.start }
+}

+ 2 - 0
backend/src/service/doc/edit/mod.rs

@@ -1,6 +1,8 @@
 mod edit_actor;
 mod edit_doc;
+mod interval;
 mod open_handle;
 
+pub use edit_actor::*;
 pub use edit_doc::*;
 pub use open_handle::*;

+ 14 - 2
backend/src/service/doc/edit/open_handle.rs

@@ -10,7 +10,7 @@ use std::sync::Arc;
 use tokio::sync::{mpsc, oneshot};
 
 pub struct DocHandle {
-    sender: mpsc::Sender<EditMsg>,
+    pub sender: mpsc::Sender<EditMsg>,
 }
 
 impl DocHandle {
@@ -22,6 +22,18 @@ impl DocHandle {
         Ok(Self { sender })
     }
 
+    pub async fn handle_new_user(&self, user: Arc<WsUser>, rev_id: i64, socket: Socket) -> Result<(), ServerError> {
+        let (ret, rx) = oneshot::channel();
+        let msg = EditMsg::NewDocUser {
+            user,
+            socket,
+            rev_id,
+            ret,
+        };
+        let _ = self.send(msg, rx).await?;
+        Ok(())
+    }
+
     #[tracing::instrument(level = "debug", skip(self, user, socket, revision))]
     pub async fn apply_revision(
         &self,
@@ -46,7 +58,7 @@ impl DocHandle {
         self.send(msg, rx).await?
     }
 
-    async fn send<T>(&self, msg: EditMsg, rx: oneshot::Receiver<T>) -> DocResult<T> {
+    pub(crate) async fn send<T>(&self, msg: EditMsg, rx: oneshot::Receiver<T>) -> DocResult<T> {
         let _ = self.sender.send(msg).await.map_err(internal_error)?;
         let result = rx.await?;
         Ok(result)

+ 28 - 13
backend/src/service/doc/ws_actor.rs

@@ -1,5 +1,5 @@
 use crate::service::{
-    doc::doc::DocManager,
+    doc::{doc::DocManager, edit::DocHandle},
     util::{md5, parse_from_bytes},
     ws::{entities::Socket, WsClientData, WsUser},
 };
@@ -74,21 +74,29 @@ impl DocWsActor {
         match document_data.ty {
             WsDataType::Acked => Ok(()),
             WsDataType::PushRev => self.handle_push_rev(user, socket, data, pool).await,
-            WsDataType::NewDocUser => self.handle_new_doc_user(socket, data).await,
+            WsDataType::NewDocUser => self.handle_new_doc_user(user, socket, data, pool).await,
             WsDataType::PullRev => Ok(()),
             WsDataType::Conflict => Ok(()),
         }
     }
 
-    async fn handle_new_doc_user(&self, _socket: Socket, data: Vec<u8>) -> DocResult<()> {
-        let _user = spawn_blocking(move || {
+    async fn handle_new_doc_user(
+        &self,
+        user: Arc<WsUser>,
+        socket: Socket,
+        data: Vec<u8>,
+        pool: Data<PgPool>,
+    ) -> DocResult<()> {
+        let doc_user = spawn_blocking(move || {
             let user: NewDocUser = parse_from_bytes(&data)?;
             DocResult::Ok(user)
         })
         .await
         .map_err(internal_error)??;
-
-        unimplemented!()
+        if let Some(handle) = self.doc_handle(&doc_user.doc_id, pool).await {
+            handle.handle_new_user(user, doc_user.rev_id, socket).await?;
+        }
+        Ok(())
     }
 
     async fn handle_push_rev(
@@ -105,15 +113,22 @@ impl DocWsActor {
         })
         .await
         .map_err(internal_error)??;
+        if let Some(handle) = self.doc_handle(&revision.doc_id, pool).await {
+            handle.apply_revision(user, socket, revision).await?;
+        }
+        Ok(())
+    }
 
-        match self.doc_manager.get(&revision.doc_id, pool).await? {
-            Some(edit_doc) => {
-                edit_doc.apply_revision(user, socket, revision).await?;
-                Ok(())
+    async fn doc_handle(&self, doc_id: &str, pool: Data<PgPool>) -> Option<Arc<DocHandle>> {
+        match self.doc_manager.get(doc_id, pool).await {
+            Ok(Some(edit_doc)) => Some(edit_doc),
+            Ok(None) => {
+                log::error!("Document with id: {} not exist", doc_id);
+                None
             },
-            None => {
-                log::error!("Document with id: {} not exist", &revision.doc_id);
-                Ok(())
+            Err(e) => {
+                log::error!("Get doc handle failed: {:?}", e);
+                None
             },
         }
     }