Quellcode durchsuchen

add feature flowy_unit_test

appflowy vor 3 Jahren
Ursprung
Commit
6b338d4cc5

+ 1 - 1
backend/Cargo.toml

@@ -100,6 +100,6 @@ futures-util = "0.3.15"
 backend = { path = ".", features = ["flowy_test"]}
 flowy-sdk = { path = "../frontend/rust-lib/flowy-sdk", features = ["http_server"] }
 flowy-user = { path = "../frontend/rust-lib/flowy-user", features = ["http_server"] }
-flowy-document = { path = "../frontend/rust-lib/flowy-document", features = ["flowy_test", "http_server"] }
+flowy-document = { path = "../frontend/rust-lib/flowy-document", features = ["flowy_unit_test", "http_server"] }
 flowy-test = { path = "../frontend/rust-lib/flowy-test" }
 

+ 2 - 1
frontend/rust-lib/flowy-document/Cargo.toml

@@ -47,6 +47,7 @@ pin-project = "1.0.0"
 
 [dev-dependencies]
 flowy-test = { path = "../flowy-test" }
+flowy-document = { path = "../flowy-document", features = ["flowy_unit_test"]}
 color-eyre = { version = "0.5", default-features = false }
 criterion = "0.3"
 rand = "0.7.3"
@@ -55,4 +56,4 @@ env_logger = "0.8.2"
 
 [features]
 http_server = []
-flowy_test = []
+flowy_unit_test = ["lib-ot/flowy_unit_test"]

+ 14 - 9
frontend/rust-lib/flowy-document/src/services/doc/controller.rs

@@ -5,7 +5,7 @@ use crate::{
         cache::DocCache,
         doc::{
             edit::{ClientDocEditor, EditDocWsHandler},
-            revision::RevisionServer,
+            revision::{RevisionCache, RevisionManager, RevisionServer},
         },
         server::Server,
         ws::WsDocumentManager,
@@ -96,22 +96,27 @@ impl DocController {
         doc_id: &str,
         pool: Arc<ConnectionPool>,
     ) -> Result<Arc<ClientDocEditor>, DocError> {
+        let user = self.user.clone();
+        let rev_manager = self.make_rev_manager(doc_id, pool.clone())?;
+        let edit_ctx = ClientDocEditor::new(doc_id, user, pool, rev_manager, self.ws_manager.ws()).await?;
+        let ws_handler = Arc::new(EditDocWsHandler(edit_ctx.clone()));
+        self.ws_manager.register_handler(doc_id, ws_handler);
+        self.cache.set(edit_ctx.clone());
+        Ok(edit_ctx)
+    }
+
+    fn make_rev_manager(&self, doc_id: &str, pool: Arc<ConnectionPool>) -> Result<RevisionManager, DocError> {
         // Opti: require upgradable_read lock and then upgrade to write lock using
         // RwLockUpgradableReadGuard::upgrade(xx) of ws
         // let doc = self.read_doc(doc_id, pool.clone()).await?;
-        let ws = self.ws_manager.ws();
+        let ws_sender = self.ws_manager.ws();
         let token = self.user.token()?;
-        let user = self.user.clone();
         let server = Arc::new(RevisionServerImpl {
             token,
             server: self.server.clone(),
         });
-
-        let edit_ctx = Arc::new(ClientDocEditor::new(doc_id, pool, ws, server, user).await?);
-        let ws_handler = Arc::new(EditDocWsHandler(edit_ctx.clone()));
-        self.ws_manager.register_handler(doc_id, ws_handler);
-        self.cache.set(edit_ctx.clone());
-        Ok(edit_ctx)
+        let cache = Arc::new(RevisionCache::new(doc_id, pool, server));
+        Ok(RevisionManager::new(doc_id, cache, ws_sender))
     }
 }
 

+ 47 - 61
frontend/rust-lib/flowy-document/src/services/doc/edit/editor.rs

@@ -4,7 +4,7 @@ use crate::{
     services::{
         doc::{
             edit::{EditCommand, EditCommandQueue, OpenDocAction, TransformDeltas},
-            revision::{RevisionManager, RevisionServer},
+            revision::{RevisionDownStream, RevisionManager},
         },
         ws::{DocumentWebSocket, WsDocumentHandler},
     },
@@ -13,20 +13,17 @@ use bytes::Bytes;
 use flowy_database::ConnectionPool;
 use flowy_document_infra::{
     core::history::UndoResult,
-    entities::{
-        doc::DocDelta,
-        ws::{WsDataType, WsDocumentData},
-    },
+    entities::{doc::DocDelta, ws::WsDocumentData},
     errors::DocumentResult,
 };
 use lib_infra::retry::{ExponentialBackoff, Retry};
 use lib_ot::{
     core::Interval,
-    revision::{RevId, RevType, Revision, RevisionRange},
+    revision::{RevId, RevType, Revision},
     rich_text::{RichTextAttribute, RichTextDelta},
 };
 use lib_ws::WsConnectState;
-use std::{convert::TryFrom, sync::Arc};
+use std::sync::Arc;
 use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
 
 pub type DocId = String;
@@ -35,34 +32,48 @@ pub struct ClientDocEditor {
     pub doc_id: DocId,
     rev_manager: Arc<RevisionManager>,
     edit_tx: UnboundedSender<EditCommand>,
-    ws: Arc<dyn DocumentWebSocket>,
+    ws_sender: Arc<dyn DocumentWebSocket>,
     user: Arc<dyn DocumentUser>,
+    ws_msg_tx: UnboundedSender<WsDocumentData>,
+}
+
+#[cfg(feature = "flowy_unit_test")]
+impl ClientDocEditor {
+    pub async fn doc_json(&self) -> DocResult<String> {
+        let (ret, rx) = oneshot::channel::<DocumentResult<String>>();
+        let msg = EditCommand::ReadDoc { ret };
+        let _ = self.edit_tx.send(msg);
+        let s = rx.await.map_err(internal_error)??;
+        Ok(s)
+    }
+
+    pub fn rev_manager(&self) -> Arc<RevisionManager> { self.rev_manager.clone() }
 }
 
 impl ClientDocEditor {
     pub(crate) async fn new(
         doc_id: &str,
-        pool: Arc<ConnectionPool>,
-        ws: Arc<dyn DocumentWebSocket>,
-        server: Arc<dyn RevisionServer>,
         user: Arc<dyn DocumentUser>,
-    ) -> DocResult<Self> {
-        let (sender, receiver) = mpsc::unbounded_channel();
-        let mut rev_manager = RevisionManager::new(doc_id, pool.clone(), server.clone(), sender);
-        spawn_rev_receiver(receiver, ws.clone());
-
+        pool: Arc<ConnectionPool>,
+        mut rev_manager: RevisionManager,
+        ws_sender: Arc<dyn DocumentWebSocket>,
+    ) -> DocResult<Arc<Self>> {
         let delta = rev_manager.load_document().await?;
         let edit_queue_tx = spawn_edit_queue(doc_id, delta, pool.clone());
         let doc_id = doc_id.to_string();
         let rev_manager = Arc::new(rev_manager);
-        let edit_doc = Self {
+        let (ws_msg_tx, ws_msg_rx) = mpsc::unbounded_channel();
+        let edit_doc = Arc::new(Self {
             doc_id,
             rev_manager,
             edit_tx: edit_queue_tx,
-            ws,
             user,
-        };
+            ws_msg_tx,
+            ws_sender,
+        });
         edit_doc.notify_open_doc();
+
+        start_sync(edit_doc.clone(), ws_msg_rx);
         Ok(edit_doc)
     }
 
@@ -180,20 +191,11 @@ impl ClientDocEditor {
         Ok(())
     }
 
-    #[cfg(feature = "flowy_test")]
-    pub async fn doc_json(&self) -> DocResult<String> {
-        let (ret, rx) = oneshot::channel::<DocumentResult<String>>();
-        let msg = EditCommand::ReadDoc { ret };
-        let _ = self.edit_tx.send(msg);
-        let s = rx.await.map_err(internal_error)??;
-        Ok(s)
-    }
-
     #[tracing::instrument(level = "debug", skip(self))]
     fn notify_open_doc(&self) {
         let rev_id: RevId = self.rev_manager.rev_id().into();
         if let Ok(user_id) = self.user.user_id() {
-            let action = OpenDocAction::new(&user_id, &self.doc_id, &rev_id, &self.ws);
+            let action = OpenDocAction::new(&user_id, &self.doc_id, &rev_id, &self.ws_sender);
             let strategy = ExponentialBackoff::from_millis(50).take(3);
             let retry = Retry::spawn(strategy, action);
             tokio::spawn(async move {
@@ -206,7 +208,7 @@ impl ClientDocEditor {
     }
 
     #[tracing::instrument(level = "debug", skip(self))]
-    async fn handle_push_rev(&self, bytes: Bytes) -> DocResult<()> {
+    pub(crate) async fn handle_push_rev(&self, bytes: Bytes) -> DocResult<()> {
         // Transform the revision
         let (ret, rx) = oneshot::channel::<DocumentResult<TransformDeltas>>();
         let _ = self.edit_tx.send(EditCommand::ProcessRemoteRevision { bytes, ret });
@@ -253,27 +255,14 @@ impl ClientDocEditor {
             &self.doc_id,
             RevType::Remote,
         );
-        let _ = self.ws.send(revision.into());
+        let _ = self.ws_sender.send(revision.into());
         Ok(())
     }
 
     async fn handle_ws_message(&self, doc_data: WsDocumentData) -> DocResult<()> {
-        let bytes = Bytes::from(doc_data.data);
-        match doc_data.ty {
-            WsDataType::PushRev => {
-                let _ = self.handle_push_rev(bytes).await?;
-            },
-            WsDataType::PullRev => {
-                let range = RevisionRange::try_from(bytes)?;
-                let revision = self.rev_manager.mk_revisions(range).await?;
-                let _ = self.ws.send(revision.into());
-            },
-            WsDataType::Acked => {
-                let rev_id = RevId::try_from(bytes)?;
-                let _ = self.rev_manager.ack_revision(rev_id).await?;
-            },
-            WsDataType::Conflict => {},
-            WsDataType::NewDocUser => {},
+        match self.ws_msg_tx.send(doc_data) {
+            Ok(_) => {},
+            Err(e) => log::error!("Propagate ws message data failed. {}", e),
         }
         Ok(())
     }
@@ -307,23 +296,20 @@ impl WsDocumentHandler for EditDocWsHandler {
     }
 }
 
-fn spawn_rev_receiver(mut receiver: mpsc::UnboundedReceiver<Revision>, ws: Arc<dyn DocumentWebSocket>) {
-    tokio::spawn(async move {
-        loop {
-            while let Some(revision) = receiver.recv().await {
-                // tracing::debug!("Send revision:{} to server", revision.rev_id);
-                match ws.send(revision.into()) {
-                    Ok(_) => {},
-                    Err(e) => log::error!("Send revision failed: {:?}", e),
-                };
-            }
-        }
-    });
-}
-
 fn spawn_edit_queue(doc_id: &str, delta: RichTextDelta, _pool: Arc<ConnectionPool>) -> UnboundedSender<EditCommand> {
     let (sender, receiver) = mpsc::unbounded_channel::<EditCommand>();
     let actor = EditCommandQueue::new(doc_id, delta, receiver);
     tokio::spawn(actor.run());
     sender
 }
+
+fn start_sync(editor: Arc<ClientDocEditor>, ws_msg_rx: mpsc::UnboundedReceiver<WsDocumentData>) {
+    let rev_manager = editor.rev_manager.clone();
+    let ws_sender = editor.ws_sender.clone();
+
+    let up_stream = editor.rev_manager.make_up_stream();
+    let down_stream = RevisionDownStream::new(editor, rev_manager, ws_msg_rx, ws_sender);
+
+    tokio::spawn(up_stream.run());
+    tokio::spawn(down_stream.run());
+}

+ 7 - 0
frontend/rust-lib/flowy-document/src/services/doc/revision/cache.rs

@@ -257,3 +257,10 @@ impl RevisionDiskCache for Persistence {
 impl Persistence {
     pub(crate) fn new(pool: Arc<ConnectionPool>) -> Self { Self { pool } }
 }
+
+#[cfg(feature = "flowy_unit_test")]
+impl RevisionCache {
+    pub fn dish_cache(&self) -> Arc<DocRevisionDeskCache> { self.dish_cache.clone() }
+
+    pub fn memory_cache(&self) -> Arc<RevisionMemoryCache> { self.memory_cache.clone() }
+}

+ 14 - 13
frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs

@@ -1,8 +1,10 @@
 use crate::{
     errors::{DocError, DocResult},
-    services::doc::revision::{RevisionCache, RevisionUploadStream},
+    services::{
+        doc::revision::{RevisionCache, RevisionUpStream},
+        ws::DocumentWebSocket,
+    },
 };
-use flowy_database::ConnectionPool;
 use flowy_document_infra::{entities::doc::Doc, util::RevIdCounter};
 use lib_infra::future::ResultFuture;
 use lib_ot::{
@@ -11,7 +13,6 @@ use lib_ot::{
     rich_text::RichTextDelta,
 };
 use std::sync::Arc;
-use tokio::sync::mpsc;
 
 pub trait RevisionServer: Send + Sync {
     fn fetch_document(&self, doc_id: &str) -> ResultFuture<Doc, DocError>;
@@ -21,22 +22,17 @@ pub struct RevisionManager {
     doc_id: String,
     rev_id_counter: RevIdCounter,
     cache: Arc<RevisionCache>,
+    ws_sender: Arc<dyn DocumentWebSocket>,
 }
 
 impl RevisionManager {
-    pub fn new(
-        doc_id: &str,
-        pool: Arc<ConnectionPool>,
-        server: Arc<dyn RevisionServer>,
-        ws_sender: mpsc::UnboundedSender<Revision>,
-    ) -> Self {
-        let cache = Arc::new(RevisionCache::new(doc_id, pool, server));
-        spawn_upload_stream(cache.clone(), ws_sender);
+    pub fn new(doc_id: &str, cache: Arc<RevisionCache>, ws_sender: Arc<dyn DocumentWebSocket>) -> Self {
         let rev_id_counter = RevIdCounter::new(0);
         Self {
             doc_id: doc_id.to_string(),
             rev_id_counter,
             cache,
+            ws_sender,
         }
     }
 
@@ -91,8 +87,13 @@ impl RevisionManager {
 
         Ok(revision)
     }
+
+    pub(crate) fn make_up_stream(&self) -> RevisionUpStream {
+        RevisionUpStream::new(self.cache.clone(), self.ws_sender.clone())
+    }
 }
 
-fn spawn_upload_stream(cache: Arc<RevisionCache>, ws_sender: mpsc::UnboundedSender<Revision>) {
-    tokio::spawn(RevisionUploadStream::new(cache, ws_sender).run());
+#[cfg(feature = "flowy_unit_test")]
+impl RevisionManager {
+    pub fn revision_cache(&self) -> Arc<RevisionCache> { self.cache.clone() }
 }

+ 98 - 14
frontend/rust-lib/flowy-document/src/services/doc/revision/sync.rs

@@ -1,27 +1,107 @@
 use crate::{
     errors::{internal_error, DocResult},
-    services::doc::revision::RevisionIterator,
+    services::{
+        doc::{
+            edit::ClientDocEditor,
+            revision::{RevisionIterator, RevisionManager},
+        },
+        ws::DocumentWebSocket,
+    },
 };
 use async_stream::stream;
+use bytes::Bytes;
+use flowy_document_infra::entities::ws::{WsDataType, WsDocumentData};
 use futures::stream::StreamExt;
-use lib_ot::revision::Revision;
-use std::sync::Arc;
+use lib_ot::revision::{RevId, RevisionRange};
+use std::{convert::TryFrom, sync::Arc};
 use tokio::{
     sync::mpsc,
+    task::spawn_blocking,
     time::{interval, Duration},
 };
 
-pub(crate) enum RevisionMsg {
+pub(crate) struct RevisionDownStream {
+    editor: Arc<ClientDocEditor>,
+    rev_manager: Arc<RevisionManager>,
+    receiver: Option<mpsc::UnboundedReceiver<WsDocumentData>>,
+    ws_sender: Arc<dyn DocumentWebSocket>,
+}
+
+impl RevisionDownStream {
+    pub(crate) fn new(
+        editor: Arc<ClientDocEditor>,
+        rev_manager: Arc<RevisionManager>,
+        receiver: mpsc::UnboundedReceiver<WsDocumentData>,
+        ws_sender: Arc<dyn DocumentWebSocket>,
+    ) -> Self {
+        RevisionDownStream {
+            editor,
+            rev_manager,
+            receiver: Some(receiver),
+            ws_sender,
+        }
+    }
+
+    pub async fn run(mut self) {
+        let mut receiver = self.receiver.take().expect("Only take once");
+        let stream = stream! {
+            loop {
+                match receiver.recv().await {
+                    Some(msg) => yield msg,
+                    None => break,
+                }
+            }
+        };
+        stream
+            .for_each(|msg| async {
+                match self.handle_message(msg).await {
+                    Ok(_) => {},
+                    Err(e) => log::error!("RevisionDownStream error: {}", e),
+                }
+            })
+            .await;
+    }
+
+    async fn handle_message(&self, msg: WsDocumentData) -> DocResult<()> {
+        let WsDocumentData { doc_id: _, ty, data } = msg;
+        let bytes = spawn_blocking(move || Bytes::from(data))
+            .await
+            .map_err(internal_error)?;
+        log::debug!("[RevisionDownStream]: receives new message: {:?}", ty);
+
+        match ty {
+            WsDataType::PushRev => {
+                let _ = self.editor.handle_push_rev(bytes).await?;
+            },
+            WsDataType::PullRev => {
+                let range = RevisionRange::try_from(bytes)?;
+                let revision = self.rev_manager.mk_revisions(range).await?;
+                let _ = self.ws_sender.send(revision.into());
+            },
+            WsDataType::Acked => {
+                let rev_id = RevId::try_from(bytes)?;
+                let _ = self.rev_manager.ack_revision(rev_id).await?;
+            },
+            WsDataType::Conflict => {},
+            WsDataType::NewDocUser => {},
+        }
+
+        Ok(())
+    }
+}
+
+// RevisionUpStream
+pub(crate) enum UpStreamMsg {
     Tick,
 }
 
-pub(crate) struct RevisionUploadStream {
+pub(crate) struct RevisionUpStream {
     revisions: Arc<dyn RevisionIterator>,
-    ws_sender: mpsc::UnboundedSender<Revision>,
+    ws_sender: Arc<dyn DocumentWebSocket>,
 }
 
-impl RevisionUploadStream {
-    pub(crate) fn new(revisions: Arc<dyn RevisionIterator>, ws_sender: mpsc::UnboundedSender<Revision>) -> Self {
+impl RevisionUpStream {
+    pub(crate) fn new(revisions: Arc<dyn RevisionIterator>, ws_sender: Arc<dyn DocumentWebSocket>) -> Self {
         Self { revisions, ws_sender }
     }
 
@@ -46,18 +126,22 @@ impl RevisionUploadStream {
             .await;
     }
 
-    async fn handle_msg(&self, msg: RevisionMsg) -> DocResult<()> {
+    async fn handle_msg(&self, msg: UpStreamMsg) -> DocResult<()> {
         match msg {
-            RevisionMsg::Tick => self.send_next_revision().await,
+            UpStreamMsg::Tick => self.send_next_revision().await,
         }
     }
 
     async fn send_next_revision(&self) -> DocResult<()> {
-        log::debug!("😁Tick");
         match self.revisions.next().await? {
             None => Ok(()),
             Some(record) => {
-                let _ = self.ws_sender.send(record.revision).map_err(internal_error);
+                log::debug!(
+                    "[RevisionUpStream]: processes revision: {}:{:?}",
+                    record.revision.doc_id,
+                    record.revision.rev_id
+                );
+                let _ = self.ws_sender.send(record.revision.into()).map_err(internal_error);
                 // let _ = tokio::time::timeout(Duration::from_millis(2000), ret.recv()).await;
                 Ok(())
             },
@@ -65,10 +149,10 @@ impl RevisionUploadStream {
     }
 }
 
-async fn tick(sender: mpsc::UnboundedSender<RevisionMsg>) {
+async fn tick(sender: mpsc::UnboundedSender<UpStreamMsg>) {
     let mut i = interval(Duration::from_secs(2));
     loop {
-        match sender.send(RevisionMsg::Tick) {
+        match sender.send(UpStreamMsg::Tick) {
             Ok(_) => {},
             Err(e) => log::error!("RevisionUploadStream tick error: {}", e),
         }

+ 12 - 2
frontend/rust-lib/flowy-document/tests/editor/revision_test.rs

@@ -3,6 +3,16 @@ use flowy_test::editor::*;
 #[tokio::test]
 async fn create_doc() {
     let test = EditorTest::new().await;
-    let _editor = test.create_doc().await;
-    println!("123");
+    let editor = test.create_doc().await;
+    let rev_manager = editor.rev_manager();
+    assert_eq!(rev_manager.rev_id(), 0);
+
+    let json = editor.doc_json().await.unwrap();
+    assert_eq!(json, r#"[{"insert":"\n"}]"#);
+
+    editor.insert(0, "123").await.unwrap();
+    assert_eq!(rev_manager.rev_id(), 1);
+
+    editor.insert(0, "456").await.unwrap();
+    assert_eq!(rev_manager.rev_id(), 2);
 }

+ 10 - 0
frontend/rust-lib/flowy-test/src/editor.rs

@@ -2,6 +2,7 @@ use crate::{helper::ViewTest, FlowySDKTest};
 use flowy_document::services::doc::edit::ClientDocEditor;
 use flowy_document_infra::entities::doc::DocIdentifier;
 use std::sync::Arc;
+use tokio::time::Interval;
 
 pub struct EditorTest {
     pub sdk: FlowySDKTest,
@@ -20,3 +21,12 @@ impl EditorTest {
         self.sdk.flowy_document.open(doc_identifier).await.unwrap()
     }
 }
+
+pub enum EditAction {
+    InsertText(&'static str, usize),
+    Delete(Interval),
+    Replace(Interval, &'static str),
+    Undo(),
+    Redo(),
+    AssertJson(&'static str),
+}

+ 1 - 1
frontend/rust-lib/lib-infra/src/kv/kv.rs

@@ -25,7 +25,7 @@ impl KV {
     }
 
     fn set(value: KeyValue) -> Result<(), String> {
-        log::debug!("set value: {:?}", value);
+        log::trace!("[KV]: set value: {:?}", value);
         update_cache(value.clone());
 
         let _ = diesel::replace_into(kv_table::table)

+ 2 - 0
shared-lib/lib-ot/Cargo.toml

@@ -24,5 +24,7 @@ strum_macros = "0.21"
 bytes = "1.0"
 
 
+[features]
+flowy_unit_test = []
 
 

+ 6 - 0
shared-lib/lib-ot/src/revision/cache.rs

@@ -125,3 +125,9 @@ impl RevisionRecord {
 
     pub fn ack(&mut self) { self.state = RevState::Acked; }
 }
+
+#[cfg(feature = "flowy_unit_test")]
+impl RevisionMemoryCache {
+    pub fn revs_map(&self) -> Arc<DashMap<i64, RevisionRecord>> { self.revs_map.clone() }
+    pub fn pending_revs(&self) -> Arc<RwLock<VecDeque<i64>>> { self.pending_revs.clone() }
+}