Bläddra i källkod

refactor: add document editor trait

appflowy 2 år sedan
förälder
incheckning
177908db5b

+ 0 - 0
frontend/rust-lib/flowy-document/src/editor/mod.rs


+ 2 - 2
frontend/rust-lib/flowy-document/src/entities.rs

@@ -9,13 +9,13 @@ pub enum ExportType {
     Link = 2,
 }
 
-impl std::default::Default for ExportType {
+impl Default for ExportType {
     fn default() -> Self {
         ExportType::Text
     }
 }
 
-impl std::convert::From<i32> for ExportType {
+impl From<i32> for ExportType {
     fn from(val: i32) -> Self {
         match val {
             0 => ExportType::Text,

+ 2 - 2
frontend/rust-lib/flowy-document/src/event_handler.rs

@@ -12,7 +12,7 @@ pub(crate) async fn get_document_handler(
 ) -> DataResult<DocumentSnapshotPB, FlowyError> {
     let document_id: DocumentIdPB = data.into_inner();
     let editor = manager.open_document_editor(&document_id).await?;
-    let operations_str = editor.get_operation_str().await?;
+    let operations_str = editor.get_operations_str().await?;
     data_result(DocumentSnapshotPB {
         doc_id: document_id.into(),
         snapshot: operations_str,
@@ -35,7 +35,7 @@ pub(crate) async fn export_handler(
 ) -> DataResult<ExportDataPB, FlowyError> {
     let params: ExportParams = data.into_inner().try_into()?;
     let editor = manager.open_document_editor(&params.view_id).await?;
-    let operations_str = editor.get_operation_str().await?;
+    let operations_str = editor.get_operations_str().await?;
     data_result(ExportDataPB {
         data: operations_str,
         export_type: params.export_type,

+ 2 - 3
frontend/rust-lib/flowy-document/src/lib.rs

@@ -1,12 +1,11 @@
-pub mod editor;
 mod entities;
 mod event_handler;
 pub mod event_map;
 pub mod manager;
-mod queue;
-mod web_socket;
 
+pub mod old_editor;
 pub mod protobuf;
+
 pub use manager::*;
 pub mod errors {
     pub use flowy_error::{internal_error, ErrorCode, FlowyError};

+ 40 - 14
frontend/rust-lib/flowy-document/src/manager.rs

@@ -1,6 +1,6 @@
-use crate::editor::DocumentRevisionCompactor;
 use crate::entities::EditParams;
-use crate::{editor::DocumentEditor, errors::FlowyError, DocumentCloudService};
+use crate::old_editor::editor::{DocumentRevisionCompactor, OldDocumentEditor};
+use crate::{errors::FlowyError, DocumentCloudService};
 use bytes::Bytes;
 use dashmap::DashMap;
 use flowy_database::ConnectionPool;
@@ -15,6 +15,8 @@ use flowy_sync::entities::{
     ws_data::ServerRevisionWSData,
 };
 use lib_infra::future::FutureResult;
+use lib_ws::WSConnectState;
+use std::any::Any;
 use std::{convert::TryInto, sync::Arc};
 
 pub trait DocumentUser: Send + Sync {
@@ -24,6 +26,25 @@ pub trait DocumentUser: Send + Sync {
     fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError>;
 }
 
+pub trait DocumentEditor: Send + Sync {
+    fn get_operations_str(&self) -> FutureResult<String, FlowyError>;
+    fn compose_local_operations(&self, data: Bytes) -> FutureResult<(), FlowyError>;
+    fn close(&self);
+
+    fn receive_ws_data(&self, data: ServerRevisionWSData) -> FutureResult<(), FlowyError>;
+    fn receive_ws_state(&self, state: &WSConnectState);
+
+    /// Returns the `Any` reference that can be used to downcast back to the original,
+    /// concrete type.
+    ///
+    /// The indirection through `as_any` is because using `downcast_ref`
+    /// on `Box<A>` *directly* only lets us downcast back to `&A` again. You can take a look at [this](https://stackoverflow.com/questions/33687447/how-to-get-a-reference-to-a-concrete-type-from-a-trait-object)
+    /// for more information.
+    ///
+    ///
+    fn as_any(&self) -> &dyn Any;
+}
+
 pub struct DocumentManager {
     cloud_service: Arc<dyn DocumentCloudService>,
     rev_web_socket: Arc<dyn RevisionWebSocket>,
@@ -52,7 +73,10 @@ impl DocumentManager {
     }
 
     #[tracing::instrument(level = "trace", skip(self, editor_id), fields(editor_id), err)]
-    pub async fn open_document_editor<T: AsRef<str>>(&self, editor_id: T) -> Result<Arc<DocumentEditor>, FlowyError> {
+    pub async fn open_document_editor<T: AsRef<str>>(
+        &self,
+        editor_id: T,
+    ) -> Result<Arc<dyn DocumentEditor>, FlowyError> {
         let editor_id = editor_id.as_ref();
         tracing::Span::current().record("editor_id", &editor_id);
         self.get_document_editor(editor_id).await
@@ -75,7 +99,7 @@ impl DocumentManager {
         let _ = editor
             .compose_local_operations(Bytes::from(payload.operations_str))
             .await?;
-        let operations_str = editor.get_operation_str().await?;
+        let operations_str = editor.get_operations_str().await?;
         Ok(DocumentOperationsPB {
             doc_id: payload.doc_id.clone(),
             operations_str,
@@ -127,7 +151,7 @@ impl DocumentManager {
     ///
     /// returns: Result<Arc<DocumentEditor>, FlowyError>
     ///
-    async fn get_document_editor(&self, doc_id: &str) -> FlowyResult<Arc<DocumentEditor>> {
+    async fn get_document_editor(&self, doc_id: &str) -> FlowyResult<Arc<dyn DocumentEditor>> {
         match self.editor_map.get(doc_id) {
             None => {
                 let db_pool = self.user.db_pool()?;
@@ -151,7 +175,7 @@ impl DocumentManager {
         &self,
         doc_id: &str,
         pool: Arc<ConnectionPool>,
-    ) -> Result<Arc<DocumentEditor>, FlowyError> {
+    ) -> Result<Arc<dyn DocumentEditor>, FlowyError> {
         let user = self.user.clone();
         let token = self.user.token()?;
         let rev_manager = self.make_document_rev_manager(doc_id, pool.clone())?;
@@ -159,8 +183,10 @@ impl DocumentManager {
             token,
             server: self.cloud_service.clone(),
         });
-        let editor = DocumentEditor::new(doc_id, user, rev_manager, self.rev_web_socket.clone(), cloud_service).await?;
-        self.editor_map.insert(doc_id, &editor);
+        let editor =
+            OldDocumentEditor::new(doc_id, user, rev_manager, self.rev_web_socket.clone(), cloud_service).await?;
+        let editor: Arc<dyn DocumentEditor> = Arc::new(editor);
+        self.editor_map.insert(doc_id, editor.clone());
         Ok(editor)
     }
 
@@ -222,7 +248,7 @@ impl RevisionCloudService for DocumentRevisionCloudService {
 }
 
 pub struct DocumentEditorMap {
-    inner: DashMap<String, Arc<DocumentEditor>>,
+    inner: DashMap<String, Arc<dyn DocumentEditor>>,
 }
 
 impl DocumentEditorMap {
@@ -230,20 +256,20 @@ impl DocumentEditorMap {
         Self { inner: DashMap::new() }
     }
 
-    pub(crate) fn insert(&self, editor_id: &str, doc: &Arc<DocumentEditor>) {
+    pub(crate) fn insert(&self, editor_id: &str, editor: Arc<dyn DocumentEditor>) {
         if self.inner.contains_key(editor_id) {
-            log::warn!("Doc:{} already exists in cache", editor_id);
+            log::warn!("Editor:{} already exists in cache", editor_id);
         }
-        self.inner.insert(editor_id.to_string(), doc.clone());
+        self.inner.insert(editor_id.to_string(), editor);
     }
 
-    pub(crate) fn get(&self, editor_id: &str) -> Option<Arc<DocumentEditor>> {
+    pub(crate) fn get(&self, editor_id: &str) -> Option<Arc<dyn DocumentEditor>> {
         Some(self.inner.get(editor_id)?.clone())
     }
 
     pub(crate) fn remove(&self, editor_id: &str) {
         if let Some(editor) = self.get(editor_id) {
-            editor.stop()
+            editor.close()
         }
         self.inner.remove(editor_id);
     }

+ 0 - 0
frontend/rust-lib/flowy-document/src/old_editor/conflict.rs


+ 66 - 54
frontend/rust-lib/flowy-document/src/editor.rs → frontend/rust-lib/flowy-document/src/old_editor/editor.rs

@@ -1,9 +1,9 @@
-use crate::web_socket::EditorCommandSender;
-use crate::{
-    errors::FlowyError,
-    queue::{EditDocumentQueue, EditorCommand},
-    DocumentUser,
-};
+#![allow(unused_attributes)]
+#![allow(unused_attributes)]
+
+use crate::old_editor::queue::{EditDocumentQueue, EditorCommand};
+use crate::old_editor::web_socket::EditorCommandSender;
+use crate::{errors::FlowyError, DocumentEditor, DocumentUser};
 use bytes::Bytes;
 use flowy_error::{internal_error, FlowyResult};
 use flowy_revision::{
@@ -16,16 +16,18 @@ use flowy_sync::{
     errors::CollaborateResult,
     util::make_operations_from_revisions,
 };
+use lib_infra::future::FutureResult;
 use lib_ot::core::{AttributeEntry, AttributeHashMap};
 use lib_ot::{
     core::{DeltaOperation, Interval},
     text_delta::TextOperations,
 };
 use lib_ws::WSConnectState;
+use std::any::Any;
 use std::sync::Arc;
 use tokio::sync::{mpsc, oneshot};
 
-pub struct DocumentEditor {
+pub struct OldDocumentEditor {
     pub doc_id: String,
     #[allow(dead_code)]
     rev_manager: Arc<RevisionManager>,
@@ -34,7 +36,59 @@ pub struct DocumentEditor {
     edit_cmd_tx: EditorCommandSender,
 }
 
-impl DocumentEditor {
+impl DocumentEditor for Arc<OldDocumentEditor> {
+    fn get_operations_str(&self) -> FutureResult<String, FlowyError> {
+        let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
+        let msg = EditorCommand::GetOperationsString { ret };
+        let edit_cmd_tx = self.edit_cmd_tx.clone();
+        FutureResult::new(async move {
+            let _ = edit_cmd_tx.send(msg).await;
+            let json = rx.await.map_err(internal_error)??;
+            Ok(json)
+        })
+    }
+
+    fn compose_local_operations(&self, data: Bytes) -> FutureResult<(), FlowyError> {
+        let edit_cmd_tx = self.edit_cmd_tx.clone();
+        FutureResult::new(async move {
+            let operations = TextOperations::from_bytes(&data)?;
+            let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
+            let msg = EditorCommand::ComposeLocalOperations { operations, ret };
+
+            let _ = edit_cmd_tx.send(msg).await;
+            let _ = rx.await.map_err(internal_error)??;
+            Ok(())
+        })
+    }
+
+    fn close(&self) {
+        #[cfg(feature = "sync")]
+        self.ws_manager.stop();
+    }
+
+    #[allow(unused_variables)]
+    fn receive_ws_data(&self, data: ServerRevisionWSData) -> FutureResult<(), FlowyError> {
+        let cloned_self = self.clone();
+        FutureResult::new(async move {
+            #[cfg(feature = "sync")]
+            let _ = cloned_self.ws_manager.receive_ws_data(data).await?;
+
+            Ok(())
+        })
+    }
+
+    #[allow(unused_variables)]
+    fn receive_ws_state(&self, state: &WSConnectState) {
+        #[cfg(feature = "sync")]
+        self.ws_manager.connect_state_changed(state.clone());
+    }
+
+    fn as_any(&self) -> &dyn Any {
+        self
+    }
+}
+
+impl OldDocumentEditor {
     #[allow(unused_variables)]
     pub(crate) async fn new(
         doc_id: &str,
@@ -51,7 +105,7 @@ impl DocumentEditor {
 
         let edit_cmd_tx = spawn_edit_queue(user, rev_manager.clone(), operations);
         #[cfg(feature = "sync")]
-        let ws_manager = crate::web_socket::make_document_ws_manager(
+        let ws_manager = crate::old_editor::web_socket::make_document_ws_manager(
             doc_id.clone(),
             user_id.clone(),
             edit_cmd_tx.clone(),
@@ -142,51 +196,9 @@ impl DocumentEditor {
         let _ = rx.await.map_err(internal_error)??;
         Ok(())
     }
-
-    pub async fn get_operation_str(&self) -> FlowyResult<String> {
-        let (ret, rx) = oneshot::channel::<CollaborateResult<String>>();
-        let msg = EditorCommand::StringifyOperations { ret };
-        let _ = self.edit_cmd_tx.send(msg).await;
-        let json = rx.await.map_err(internal_error)??;
-        Ok(json)
-    }
-
-    #[tracing::instrument(level = "trace", skip(self, data), err)]
-    pub(crate) async fn compose_local_operations(&self, data: Bytes) -> Result<(), FlowyError> {
-        let operations = TextOperations::from_bytes(&data)?;
-        let (ret, rx) = oneshot::channel::<CollaborateResult<()>>();
-        let msg = EditorCommand::ComposeLocalOperations { operations, ret };
-        let _ = self.edit_cmd_tx.send(msg).await;
-        let _ = rx.await.map_err(internal_error)??;
-        Ok(())
-    }
-
-    #[cfg(feature = "sync")]
-    pub fn stop(&self) {
-        self.ws_manager.stop();
-    }
-
-    #[cfg(not(feature = "sync"))]
-    pub fn stop(&self) {}
-
-    #[cfg(feature = "sync")]
-    pub(crate) async fn receive_ws_data(&self, data: ServerRevisionWSData) -> Result<(), FlowyError> {
-        self.ws_manager.receive_ws_data(data).await
-    }
-    #[cfg(not(feature = "sync"))]
-    pub(crate) async fn receive_ws_data(&self, _data: ServerRevisionWSData) -> Result<(), FlowyError> {
-        Ok(())
-    }
-
-    #[cfg(feature = "sync")]
-    pub(crate) fn receive_ws_state(&self, state: &WSConnectState) {
-        self.ws_manager.connect_state_changed(state.clone());
-    }
-    #[cfg(not(feature = "sync"))]
-    pub(crate) fn receive_ws_state(&self, _state: &WSConnectState) {}
 }
 
-impl std::ops::Drop for DocumentEditor {
+impl std::ops::Drop for OldDocumentEditor {
     fn drop(&mut self) {
         tracing::trace!("{} DocumentEditor was dropped", self.doc_id)
     }
@@ -214,10 +226,10 @@ fn spawn_edit_queue(
 }
 
 #[cfg(feature = "flowy_unit_test")]
-impl DocumentEditor {
+impl OldDocumentEditor {
     pub async fn document_operations(&self) -> FlowyResult<TextOperations> {
         let (ret, rx) = oneshot::channel::<CollaborateResult<TextOperations>>();
-        let msg = EditorCommand::ReadOperations { ret };
+        let msg = EditorCommand::GetOperations { ret };
         let _ = self.edit_cmd_tx.send(msg).await;
         let delta = rx.await.map_err(internal_error)??;
         Ok(delta)

+ 4 - 0
frontend/rust-lib/flowy-document/src/old_editor/mod.rs

@@ -0,0 +1,4 @@
+pub mod conflict;
+pub mod editor;
+pub mod queue;
+mod web_socket;

+ 7 - 7
frontend/rust-lib/flowy-document/src/queue.rs → frontend/rust-lib/flowy-document/src/old_editor/queue.rs

@@ -1,4 +1,4 @@
-use crate::web_socket::{DocumentResolveOperations, EditorCommandReceiver};
+use crate::old_editor::web_socket::{DocumentResolveOperations, EditorCommandReceiver};
 use crate::DocumentUser;
 use async_stream::stream;
 use flowy_error::FlowyError;
@@ -161,11 +161,11 @@ impl EditDocumentQueue {
                 let _ = self.save_local_operations(operations, md5).await?;
                 let _ = ret.send(Ok(()));
             }
-            EditorCommand::StringifyOperations { ret } => {
+            EditorCommand::GetOperationsString { ret } => {
                 let data = self.document.read().await.get_operations_json();
                 let _ = ret.send(Ok(data));
             }
-            EditorCommand::ReadOperations { ret } => {
+            EditorCommand::GetOperations { ret } => {
                 let operations = self.document.read().await.get_operations().clone();
                 let _ = ret.send(Ok(operations));
             }
@@ -235,11 +235,11 @@ pub(crate) enum EditorCommand {
     Redo {
         ret: Ret<()>,
     },
-    StringifyOperations {
+    GetOperationsString {
         ret: Ret<String>,
     },
     #[allow(dead_code)]
-    ReadOperations {
+    GetOperations {
         ret: Ret<TextOperations>,
     },
 }
@@ -259,8 +259,8 @@ impl std::fmt::Debug for EditorCommand {
             EditorCommand::CanRedo { .. } => "CanRedo",
             EditorCommand::Undo { .. } => "Undo",
             EditorCommand::Redo { .. } => "Redo",
-            EditorCommand::StringifyOperations { .. } => "StringifyOperations",
-            EditorCommand::ReadOperations { .. } => "ReadOperations",
+            EditorCommand::GetOperationsString { .. } => "StringifyOperations",
+            EditorCommand::GetOperations { .. } => "ReadOperations",
         };
         f.write_str(s)
     }

+ 0 - 0
frontend/rust-lib/flowy-document/src/old_editor/util.rs


+ 3 - 4
frontend/rust-lib/flowy-document/src/web_socket.rs → frontend/rust-lib/flowy-document/src/old_editor/web_socket.rs

@@ -1,9 +1,10 @@
-use crate::queue::TextTransformOperations;
-use crate::{queue::EditorCommand, TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS};
+use crate::old_editor::queue::{EditorCommand, TextTransformOperations};
+use crate::TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS;
 use bytes::Bytes;
 use flowy_error::{internal_error, FlowyError, FlowyResult};
 use flowy_revision::*;
 use flowy_sync::entities::revision::Revision;
+use flowy_sync::util::make_operations_from_revisions;
 use flowy_sync::{
     entities::{
         revision::RevisionRange,
@@ -12,8 +13,6 @@ use flowy_sync::{
     errors::CollaborateResult,
 };
 use lib_infra::future::{BoxResultFuture, FutureResult};
-
-use flowy_sync::util::make_operations_from_revisions;
 use lib_ot::text_delta::TextOperations;
 use lib_ws::WSConnectState;
 use std::{sync::Arc, time::Duration};

+ 9 - 5
frontend/rust-lib/flowy-document/tests/document/script.rs

@@ -1,4 +1,4 @@
-use flowy_document::editor::DocumentEditor;
+use flowy_document::old_editor::editor::OldDocumentEditor;
 use flowy_document::TEXT_BLOCK_SYNC_INTERVAL_IN_MILLIS;
 use flowy_revision::disk::RevisionState;
 use flowy_test::{helper::ViewTest, FlowySDKTest};
@@ -17,21 +17,25 @@ pub enum EditorScript {
     AssertJson(&'static str),
 }
 
-pub struct DocumentEditorTest {
+pub struct OldDocumentEditorTest {
     pub sdk: FlowySDKTest,
-    pub editor: Arc<DocumentEditor>,
+    pub editor: Arc<OldDocumentEditor>,
 }
 
-impl DocumentEditorTest {
+impl OldDocumentEditorTest {
     pub async fn new() -> Self {
         let sdk = FlowySDKTest::default();
         let _ = sdk.init_user().await;
         let test = ViewTest::new_text_block_view(&sdk).await;
-        let editor = sdk
+        let document_editor = sdk
             .text_block_manager
             .open_document_editor(&test.view.id)
             .await
             .unwrap();
+        let editor = match document_editor.as_any().downcast_ref::<Arc<OldDocumentEditor>>() {
+            None => panic!(),
+            Some(editor) => editor.clone(),
+        };
         Self { sdk, editor }
     }
 

+ 8 - 8
frontend/rust-lib/flowy-document/tests/document/text_block_test.rs

@@ -14,7 +14,7 @@ async fn text_block_sync_current_rev_id_check() {
         AssertNextSyncRevId(None),
         AssertJson(r#"[{"insert":"123\n"}]"#),
     ];
-    DocumentEditorTest::new().await.run_scripts(scripts).await;
+    OldDocumentEditorTest::new().await.run_scripts(scripts).await;
 }
 
 #[tokio::test]
@@ -28,7 +28,7 @@ async fn text_block_sync_state_check() {
         AssertRevisionState(3, RevisionState::Ack),
         AssertJson(r#"[{"insert":"123\n"}]"#),
     ];
-    DocumentEditorTest::new().await.run_scripts(scripts).await;
+    OldDocumentEditorTest::new().await.run_scripts(scripts).await;
 }
 
 #[tokio::test]
@@ -40,7 +40,7 @@ async fn text_block_sync_insert_test() {
         AssertJson(r#"[{"insert":"123\n"}]"#),
         AssertNextSyncRevId(None),
     ];
-    DocumentEditorTest::new().await.run_scripts(scripts).await;
+    OldDocumentEditorTest::new().await.run_scripts(scripts).await;
 }
 
 #[tokio::test]
@@ -52,7 +52,7 @@ async fn text_block_sync_insert_in_chinese() {
         InsertText("好", offset),
         AssertJson(r#"[{"insert":"你好\n"}]"#),
     ];
-    DocumentEditorTest::new().await.run_scripts(scripts).await;
+    OldDocumentEditorTest::new().await.run_scripts(scripts).await;
 }
 
 #[tokio::test]
@@ -64,7 +64,7 @@ async fn text_block_sync_insert_with_emoji() {
         InsertText("☺️", offset),
         AssertJson(r#"[{"insert":"😁☺️\n"}]"#),
     ];
-    DocumentEditorTest::new().await.run_scripts(scripts).await;
+    OldDocumentEditorTest::new().await.run_scripts(scripts).await;
 }
 
 #[tokio::test]
@@ -76,7 +76,7 @@ async fn text_block_sync_delete_in_english() {
         Delete(Interval::new(0, 2)),
         AssertJson(r#"[{"insert":"3\n"}]"#),
     ];
-    DocumentEditorTest::new().await.run_scripts(scripts).await;
+    OldDocumentEditorTest::new().await.run_scripts(scripts).await;
 }
 
 #[tokio::test]
@@ -89,7 +89,7 @@ async fn text_block_sync_delete_in_chinese() {
         Delete(Interval::new(0, offset)),
         AssertJson(r#"[{"insert":"好\n"}]"#),
     ];
-    DocumentEditorTest::new().await.run_scripts(scripts).await;
+    OldDocumentEditorTest::new().await.run_scripts(scripts).await;
 }
 
 #[tokio::test]
@@ -101,5 +101,5 @@ async fn text_block_sync_replace_test() {
         Replace(Interval::new(0, 3), "abc"),
         AssertJson(r#"[{"insert":"abc\n"}]"#),
     ];
-    DocumentEditorTest::new().await.run_scripts(scripts).await;
+    OldDocumentEditorTest::new().await.run_scripts(scripts).await;
 }

+ 1 - 1
frontend/rust-lib/flowy-sdk/src/deps_resolve/folder_deps.rs

@@ -175,7 +175,7 @@ impl ViewDataProcessor for DocumentViewDataProcessor {
         let manager = self.0.clone();
         FutureResult::new(async move {
             let editor = manager.open_document_editor(view_id).await?;
-            let delta_bytes = Bytes::from(editor.get_operation_str().await?);
+            let delta_bytes = Bytes::from(editor.get_operations_str().await?);
             Ok(delta_bytes)
         })
     }