Explorar o código

handle revision acked && fix bugs

appflowy %!s(int64=3) %!d(string=hai) anos
pai
achega
3cbce2c505

+ 5 - 0
backend/src/service/doc/edit/edit_doc.rs

@@ -112,6 +112,7 @@ impl ServerEditDoc {
             },
             Ordering::Equal => {
                 // Do nothing
+                log::warn!("Applied revision rev_id is the same as cur_rev_id");
             },
             Ordering::Greater => {
                 // The client document is outdated. Transform the client revision delta and then
@@ -175,6 +176,10 @@ impl ServerEditDoc {
         )
     )]
     fn compose_delta(&self, delta: Delta) -> Result<(), ServerError> {
+        if delta.is_empty() {
+            log::warn!("Composed delta is empty");
+        }
+
         match self.document.try_write_for(Duration::from_millis(300)) {
             None => {
                 log::error!("Failed to acquire write lock of document");

+ 1 - 1
rust-lib/flowy-document/src/entities/doc/revision.rs

@@ -156,7 +156,7 @@ impl RevisionRange {
     pub fn len(&self) -> i64 {
         debug_assert!(self.end >= self.start);
         if self.end >= self.start {
-            self.end - self.start
+            self.end - self.start + 1
         } else {
             0
         }

+ 4 - 3
rust-lib/flowy-document/src/services/doc/edit/edit_doc.rs

@@ -12,7 +12,7 @@ use crate::{
                 message::{DocumentMsg, TransformDeltas},
                 model::OpenDocAction,
             },
-            revision::{DocRevision, RevisionManager, RevisionServer, RevisionStoreActor},
+            revision::{RevisionManager, RevisionServer},
             UndoResult,
         },
         ws::{DocumentWebSocket, WsDocumentHandler},
@@ -45,7 +45,7 @@ impl ClientEditDoc {
         server: Arc<dyn RevisionServer>,
         user: Arc<dyn DocumentUser>,
     ) -> DocResult<Self> {
-        let (sender, mut receiver) = mpsc::channel(1);
+        let (sender, receiver) = mpsc::channel(1);
         let mut rev_manager = RevisionManager::new(doc_id, pool.clone(), server.clone(), sender);
         spawn_rev_receiver(receiver, ws.clone());
 
@@ -255,7 +255,7 @@ impl ClientEditDoc {
         );
         let _ = self.ws.send(revision.into());
 
-        save_document(self.document.clone(), local_rev_id.into()).await;
+        let _ = save_document(self.document.clone(), local_rev_id.into()).await?;
         Ok(())
     }
 
@@ -306,6 +306,7 @@ fn spawn_rev_receiver(mut receiver: mpsc::Receiver<Revision>, ws: Arc<dyn Docume
     tokio::spawn(async move {
         loop {
             while let Some(revision) = receiver.recv().await {
+                log::debug!("Send revision:{} to server", revision.rev_id);
                 match ws.send(revision.into()) {
                     Ok(_) => {},
                     Err(e) => log::error!("Send revision failed: {:?}", e),

+ 5 - 5
rust-lib/flowy-document/src/services/doc/revision/manager.rs

@@ -1,13 +1,13 @@
 use crate::{
     entities::doc::{Doc, RevId, RevType, Revision, RevisionRange},
-    errors::{internal_error, DocError, DocResult},
-    services::{doc::revision::RevisionStoreActor, util::RevIdCounter, ws::DocumentWebSocket},
+    errors::{DocError, DocResult},
+    services::{doc::revision::RevisionStore, util::RevIdCounter},
 };
 use flowy_database::ConnectionPool;
 use flowy_infra::future::ResultFuture;
 use flowy_ot::core::{Delta, OperationTransformable};
 use std::sync::Arc;
-use tokio::sync::{mpsc, oneshot};
+use tokio::sync::mpsc;
 
 pub struct DocRevision {
     pub base_rev_id: RevId,
@@ -22,7 +22,7 @@ pub trait RevisionServer: Send + Sync {
 pub struct RevisionManager {
     doc_id: String,
     rev_id_counter: RevIdCounter,
-    rev_store: Arc<RevisionStoreActor>,
+    rev_store: Arc<RevisionStore>,
 }
 
 impl RevisionManager {
@@ -32,7 +32,7 @@ impl RevisionManager {
         server: Arc<dyn RevisionServer>,
         pending_rev_sender: mpsc::Sender<Revision>,
     ) -> Self {
-        let rev_store = Arc::new(RevisionStoreActor::new(doc_id, pool, server, pending_rev_sender));
+        let rev_store = RevisionStore::new(doc_id, pool, server, pending_rev_sender);
         let rev_id_counter = RevIdCounter::new(0);
         Self {
             doc_id: doc_id.to_string(),

+ 124 - 7
rust-lib/flowy-document/src/services/doc/revision/model.rs

@@ -1,9 +1,17 @@
-use crate::{entities::doc::Revision, errors::DocResult, services::ws::DocumentWebSocket, sql_tables::RevState};
+use crate::{
+    entities::doc::{Revision, RevisionRange},
+    errors::{internal_error, DocError, DocResult},
+    sql_tables::{RevState, RevTableSql},
+};
+use async_stream::stream;
+use flowy_database::ConnectionPool;
+use flowy_infra::future::ResultFuture;
+use futures::{stream::StreamExt, TryFutureExt};
+use std::{sync::Arc, time::Duration};
+use tokio::sync::{broadcast, mpsc};
 
-use tokio::sync::oneshot;
-
-pub type PendingRevSender = oneshot::Sender<DocResult<()>>;
-pub type PendingRevReceiver = oneshot::Receiver<DocResult<()>>;
+pub type RevIdReceiver = broadcast::Receiver<i64>;
+pub type RevIdSender = broadcast::Sender<i64>;
 
 pub struct RevisionContext {
     pub revision: Revision,
@@ -21,9 +29,118 @@ impl RevisionContext {
 
 pub(crate) struct PendingRevId {
     pub rev_id: i64,
-    pub sender: PendingRevSender,
+    pub sender: RevIdSender,
 }
 
 impl PendingRevId {
-    pub(crate) fn new(rev_id: i64, sender: PendingRevSender) -> Self { Self { rev_id, sender } }
+    pub(crate) fn new(rev_id: i64, sender: RevIdSender) -> Self { Self { rev_id, sender } }
+
+    pub(crate) fn finish(&self, rev_id: i64) -> bool {
+        if self.rev_id > rev_id {
+            false
+        } else {
+            self.sender.send(self.rev_id);
+            true
+        }
+    }
+}
+
+pub(crate) struct Persistence {
+    pub(crate) rev_sql: Arc<RevTableSql>,
+    pub(crate) pool: Arc<ConnectionPool>,
+}
+
+impl Persistence {
+    pub(crate) fn new(pool: Arc<ConnectionPool>) -> Self {
+        let rev_sql = Arc::new(RevTableSql {});
+        Self { rev_sql, pool }
+    }
+
+    pub(crate) fn create_revs(&self, revisions_state: Vec<(Revision, RevState)>) -> DocResult<()> {
+        let conn = &*self.pool.get().map_err(internal_error)?;
+        conn.immediate_transaction::<_, DocError, _>(|| {
+            let _ = self.rev_sql.create_rev_table(revisions_state, conn)?;
+            Ok(())
+        })
+    }
+
+    pub(crate) fn read_rev_with_range(&self, doc_id: &str, range: RevisionRange) -> DocResult<Vec<Revision>> {
+        let conn = &*self.pool.get().map_err(internal_error).unwrap();
+        let revisions = self.rev_sql.read_rev_tables_with_range(doc_id, range, conn)?;
+        Ok(revisions)
+    }
+
+    pub(crate) fn read_rev(&self, doc_id: &str, rev_id: &i64) -> DocResult<Option<Revision>> {
+        let conn = self.pool.get().map_err(internal_error)?;
+        let some = self.rev_sql.read_rev_table(&doc_id, rev_id, &*conn)?;
+        Ok(some)
+    }
+}
+
+pub trait RevisionIterator: Send + Sync {
+    fn next(&self) -> ResultFuture<Option<Revision>, DocError>;
+}
+
+pub(crate) enum PendingMsg {
+    Revision { ret: RevIdReceiver },
+}
+
+pub(crate) type PendingSender = mpsc::UnboundedSender<PendingMsg>;
+pub(crate) type PendingReceiver = mpsc::UnboundedReceiver<PendingMsg>;
+
+pub(crate) struct PendingRevisionStream {
+    revisions: Arc<dyn RevisionIterator>,
+    receiver: Option<PendingReceiver>,
+    next_revision: mpsc::Sender<Revision>,
+}
+
+impl PendingRevisionStream {
+    pub(crate) fn new(
+        revisions: Arc<dyn RevisionIterator>,
+        pending_rx: PendingReceiver,
+        next_revision: mpsc::Sender<Revision>,
+    ) -> Self {
+        Self {
+            revisions,
+            receiver: Some(pending_rx),
+            next_revision,
+        }
+    }
+
+    pub async fn run(mut self) {
+        let mut receiver = self.receiver.take().expect("Should only call once");
+        let stream = stream! {
+            loop {
+                match receiver.recv().await {
+                    Some(msg) => yield msg,
+                    None => break,
+                }
+            }
+        };
+        stream
+            .for_each(|msg| async {
+                match self.handle_msg(msg).await {
+                    Ok(_) => {},
+                    Err(e) => log::error!("{:?}", e),
+                }
+            })
+            .await;
+    }
+
+    async fn handle_msg(&self, msg: PendingMsg) -> DocResult<()> {
+        match msg {
+            PendingMsg::Revision { ret } => self.prepare_next_pending_rev(ret).await,
+        }
+    }
+
+    async fn prepare_next_pending_rev(&self, mut ret: RevIdReceiver) -> DocResult<()> {
+        match self.revisions.next().await? {
+            None => Ok(()),
+            Some(revision) => {
+                self.next_revision.send(revision).await.map_err(internal_error);
+                let _ = tokio::time::timeout(Duration::from_millis(2000), ret.recv()).await;
+                Ok(())
+            },
+        }
+    }
 }

+ 65 - 188
rust-lib/flowy-document/src/services/doc/revision/rev_store.rs

@@ -2,67 +2,59 @@ use crate::{
     entities::doc::{revision_from_doc, Doc, RevId, RevType, Revision, RevisionRange},
     errors::{internal_error, DocError, DocResult},
     services::doc::revision::{
-        model::{PendingRevId, PendingRevReceiver, RevisionContext},
+        model::{RevisionIterator, *},
         RevisionServer,
     },
-    sql_tables::{RevState, RevTableSql},
+    sql_tables::RevState,
 };
-use async_stream::stream;
-use dashmap::{mapref::one::Ref, DashMap, DashSet};
+
+use dashmap::DashMap;
 use flowy_database::ConnectionPool;
+use flowy_infra::future::ResultFuture;
 use flowy_ot::core::{Delta, OperationTransformable};
 use futures::{stream::StreamExt, TryFutureExt};
-use std::{
-    collections::{HashMap, VecDeque},
-    sync::Arc,
-    time::Duration,
-};
+use std::{collections::VecDeque, sync::Arc, time::Duration};
 use tokio::{
-    sync::{mpsc, oneshot, RwLock, RwLockWriteGuard},
+    sync::{broadcast, mpsc, oneshot, RwLock},
     task::{spawn_blocking, JoinHandle},
 };
 
-pub struct RevisionStoreActor {
+pub struct RevisionStore {
     doc_id: String,
     persistence: Arc<Persistence>,
     revs_map: Arc<DashMap<i64, RevisionContext>>,
-    pending_revs_sender: RevSender,
+    pending_tx: PendingSender,
     pending_revs: Arc<RwLock<VecDeque<PendingRevId>>>,
     delay_save: RwLock<Option<JoinHandle<()>>>,
     server: Arc<dyn RevisionServer>,
 }
 
-impl RevisionStoreActor {
+impl RevisionStore {
     pub fn new(
         doc_id: &str,
         pool: Arc<ConnectionPool>,
         server: Arc<dyn RevisionServer>,
-        pending_rev_sender: mpsc::Sender<Revision>,
-    ) -> RevisionStoreActor {
+        next_revision: mpsc::Sender<Revision>,
+    ) -> Arc<RevisionStore> {
         let doc_id = doc_id.to_owned();
         let persistence = Arc::new(Persistence::new(pool));
         let revs_map = Arc::new(DashMap::new());
-        let (pending_revs_sender, receiver) = mpsc::unbounded_channel();
+        let (pending_tx, pending_rx) = mpsc::unbounded_channel();
         let pending_revs = Arc::new(RwLock::new(VecDeque::new()));
-        let pending = PendingRevision::new(
-            &doc_id,
-            receiver,
-            persistence.clone(),
-            revs_map.clone(),
-            pending_rev_sender,
-            pending_revs.clone(),
-        );
-        tokio::spawn(pending.run());
 
-        Self {
+        let store = Arc::new(Self {
             doc_id,
             persistence,
             revs_map,
-            pending_revs_sender,
             pending_revs,
+            pending_tx,
             delay_save: RwLock::new(None),
             server,
-        }
+        });
+
+        tokio::spawn(PendingRevisionStream::new(store.clone(), pending_rx, next_revision).run());
+
+        store
     }
 
     #[tracing::instrument(level = "debug", skip(self, revision))]
@@ -71,36 +63,36 @@ impl RevisionStoreActor {
             return Err(DocError::duplicate_rev().context(format!("Duplicate revision id: {}", revision.rev_id)));
         }
 
-        self.pending_revs_sender.send(PendingRevisionMsg::Revision {
-            revision: revision.clone(),
+        let (sender, receiver) = broadcast::channel(1);
+        let revs_map = self.revs_map.clone();
+        let mut rx = sender.subscribe();
+        tokio::spawn(async move {
+            match rx.recv().await {
+                Ok(rev_id) => match revs_map.get_mut(&rev_id) {
+                    None => {},
+                    Some(mut rev) => rev.value_mut().state = RevState::Acked,
+                },
+                Err(_) => {},
+            }
         });
+
+        let pending_rev = PendingRevId::new(revision.rev_id, sender);
+        self.pending_revs.write().await.push_back(pending_rev);
         self.revs_map.insert(revision.rev_id, RevisionContext::new(revision));
+
+        let _ = self.pending_tx.send(PendingMsg::Revision { ret: receiver });
         self.save_revisions().await;
         Ok(())
     }
 
-    #[tracing::instrument(level = "debug", skip(self, rev_id))]
+    #[tracing::instrument(level = "debug", skip(self))]
     pub async fn handle_revision_acked(&self, rev_id: RevId) {
         let rev_id = rev_id.value;
-        log::debug!("Receive revision acked: {}", rev_id);
-        match self.pending_revs.write().await.pop_front() {
-            None => {},
-            Some(pending) => {
-                debug_assert!(pending.rev_id == rev_id);
-                if pending.rev_id != rev_id {
-                    log::error!(
-                        "Acked: expected rev_id: {:?}, but receive: {:?}",
-                        pending.rev_id,
-                        rev_id
-                    );
-                }
-                pending.sender.send(Ok(()));
-            },
-        }
-        match self.revs_map.get_mut(&rev_id) {
-            None => {},
-            Some(mut rev) => rev.value_mut().state = RevState::Acked,
-        }
+        self.pending_revs
+            .write()
+            .await
+            .retain(|pending| !pending.finish(rev_id));
+
         self.save_revisions().await;
     }
 
@@ -124,14 +116,7 @@ impl RevisionStoreActor {
                 .map(|kv| (kv.revision.clone(), kv.state))
                 .collect::<Vec<(Revision, RevState)>>();
 
-            // TODO: Ok to unwrap?
-            let conn = &*persistence.pool.get().map_err(internal_error).unwrap();
-            let result = conn.immediate_transaction::<_, DocError, _>(|| {
-                let _ = persistence.rev_sql.create_rev_table(revisions_state, conn).unwrap();
-                Ok(())
-            });
-
-            match result {
+            match persistence.create_revs(revisions_state) {
                 Ok(_) => revs_map.retain(|k, _| !ids.contains(k)),
                 Err(e) => log::error!("Save revision failed: {:?}", e),
             }
@@ -152,14 +137,9 @@ impl RevisionStoreActor {
         } else {
             let doc_id = self.doc_id.clone();
             let persistence = self.persistence.clone();
-            let result = spawn_blocking(move || {
-                let conn = &*persistence.pool.get().map_err(internal_error).unwrap();
-                let revisions = persistence.rev_sql.read_rev_tables_with_range(&doc_id, range, conn)?;
-                Ok(revisions)
-            })
-            .await
-            .map_err(internal_error)?;
-
+            let result = spawn_blocking(move || persistence.read_rev_with_range(&doc_id, range))
+                .await
+                .map_err(internal_error)?;
             result
         }
     }
@@ -172,20 +152,29 @@ impl RevisionStoreActor {
 
         let doc = self.server.fetch_document_from_remote(&self.doc_id).await?;
         let revision = revision_from_doc(doc.clone(), RevType::Remote);
-        let conn = &*self.persistence.pool.get().map_err(internal_error).unwrap();
-        let _ = conn.immediate_transaction::<_, DocError, _>(|| {
-            let _ = self
-                .persistence
-                .rev_sql
-                .create_rev_table(vec![(revision, RevState::Acked)], conn)
-                .unwrap();
-            Ok(())
-        })?;
-
+        let _ = self.persistence.create_revs(vec![(revision, RevState::Acked)])?;
         Ok(doc)
     }
 }
 
+impl RevisionIterator for RevisionStore {
+    fn next(&self) -> ResultFuture<Option<Revision>, DocError> {
+        let pending_revs = self.pending_revs.clone();
+        let revs_map = self.revs_map.clone();
+        let persistence = self.persistence.clone();
+        let doc_id = self.doc_id.clone();
+        ResultFuture::new(async move {
+            match pending_revs.read().await.front() {
+                None => Ok(None),
+                Some(pending) => match revs_map.get(&pending.rev_id) {
+                    None => persistence.read_rev(&doc_id, &pending.rev_id),
+                    Some(context) => Ok(Some(context.revision.clone())),
+                },
+            }
+        })
+    }
+}
+
 async fn fetch_from_local(doc_id: &str, persistence: Arc<Persistence>) -> DocResult<Doc> {
     let doc_id = doc_id.to_owned();
     spawn_blocking(move || {
@@ -220,118 +209,6 @@ async fn fetch_from_local(doc_id: &str, persistence: Arc<Persistence>) -> DocRes
     .map_err(internal_error)?
 }
 
-struct Persistence {
-    rev_sql: Arc<RevTableSql>,
-    pool: Arc<ConnectionPool>,
-}
-
-impl Persistence {
-    fn new(pool: Arc<ConnectionPool>) -> Self {
-        let rev_sql = Arc::new(RevTableSql {});
-        Self { rev_sql, pool }
-    }
-}
-
-enum PendingRevisionMsg {
-    Revision { revision: Revision },
-}
-
-type RevSender = mpsc::UnboundedSender<PendingRevisionMsg>;
-type RevReceiver = mpsc::UnboundedReceiver<PendingRevisionMsg>;
-
-struct PendingRevision {
-    doc_id: String,
-    pending_revs: Arc<RwLock<VecDeque<PendingRevId>>>,
-    persistence: Arc<Persistence>,
-    revs_map: Arc<DashMap<i64, RevisionContext>>,
-    msg_receiver: Option<RevReceiver>,
-    next_rev: mpsc::Sender<Revision>,
-}
-
-impl PendingRevision {
-    pub fn new(
-        doc_id: &str,
-        msg_receiver: RevReceiver,
-        persistence: Arc<Persistence>,
-        revs_map: Arc<DashMap<i64, RevisionContext>>,
-        next_rev: mpsc::Sender<Revision>,
-        pending_revs: Arc<RwLock<VecDeque<PendingRevId>>>,
-    ) -> Self {
-        Self {
-            doc_id: doc_id.to_owned(),
-            pending_revs,
-            msg_receiver: Some(msg_receiver),
-            persistence,
-            revs_map,
-            next_rev,
-        }
-    }
-
-    pub async fn run(mut self) {
-        let mut receiver = self.msg_receiver.take().expect("Should only call once");
-        let stream = stream! {
-            loop {
-                match receiver.recv().await {
-                    Some(msg) => yield msg,
-                    None => break,
-                }
-            }
-        };
-        stream
-            .for_each(|msg| async {
-                match self.handle_msg(msg).await {
-                    Ok(_) => {},
-                    Err(e) => log::error!("{:?}", e),
-                }
-            })
-            .await;
-    }
-
-    async fn handle_msg(&self, msg: PendingRevisionMsg) -> DocResult<()> {
-        match msg {
-            PendingRevisionMsg::Revision { revision } => self.handle_revision(revision).await,
-        }
-    }
-
-    async fn handle_revision(&self, revision: Revision) -> DocResult<()> {
-        let (sender, receiver) = oneshot::channel();
-        let pending_rev = PendingRevId {
-            rev_id: revision.rev_id,
-            sender,
-        };
-        self.pending_revs.write().await.push_back(pending_rev);
-        let _ = self.prepare_next_pending_rev(receiver).await?;
-        Ok(())
-    }
-
-    async fn prepare_next_pending_rev(&self, done: PendingRevReceiver) -> DocResult<()> {
-        let next_rev_notify = self.next_rev.clone();
-        let doc_id = self.doc_id.clone();
-        let _ = match self.pending_revs.read().await.front() {
-            None => Ok(()),
-            Some(pending) => match self.revs_map.get(&pending.rev_id) {
-                None => {
-                    let conn = self.persistence.pool.get().map_err(internal_error)?;
-                    let some = self
-                        .persistence
-                        .rev_sql
-                        .read_rev_table(&doc_id, &pending.rev_id, &*conn)?;
-                    match some {
-                        Some(revision) => next_rev_notify.send(revision).await.map_err(internal_error),
-                        None => Ok(()),
-                    }
-                },
-                Some(context) => next_rev_notify
-                    .send(context.revision.clone())
-                    .await
-                    .map_err(internal_error),
-            },
-        }?;
-        let _ = tokio::time::timeout(Duration::from_millis(2000), done).await;
-        Ok(())
-    }
-}
-
 // fn update_revisions(&self) {
 //     let rev_ids = self
 //         .revs