appflowy 3 лет назад
Родитель
Сommit
aeb448c4de

+ 1 - 0
backend/Cargo.lock

@@ -1366,6 +1366,7 @@ dependencies = [
  "backend-service",
  "bytes",
  "dart-notify",
+ "dashmap",
  "derive_more",
  "diesel",
  "diesel_derives",

+ 1 - 1
backend/tests/document/helper.rs

@@ -66,7 +66,7 @@ impl ScriptContext {
             client_edit_context: None,
             client_sdk,
             client_user_session: user_session,
-            server_doc_manager: server.app_ctx.doc_biz.manager.clone(),
+            server_doc_manager: server.app_ctx.document_core.manager.clone(),
             server_pg_pool: Data::new(server.pg_pool.clone()),
             doc_id,
         }

+ 1 - 2
frontend/rust-lib/flowy-document/src/services/doc/controller.rs

@@ -14,9 +14,8 @@ use bytes::Bytes;
 use dashmap::DashMap;
 use flowy_database::ConnectionPool;
 use flowy_document_infra::entities::doc::{Doc, DocDelta, DocIdentifier};
-use lib_infra::future::{wrap_future, FnFuture, ResultFuture};
+use lib_infra::future::ResultFuture;
 use std::sync::Arc;
-use tokio::time::{interval, Duration};
 
 pub(crate) struct DocController {
     server: Server,

+ 0 - 1
frontend/rust-lib/flowy-document/src/services/doc/revision/manager.rs

@@ -46,7 +46,6 @@ impl RevisionManager {
 
     pub async fn add_revision(&self, revision: &Revision) -> Result<(), DocError> {
         let _ = self.cache.add_revision(revision.clone()).await?;
-
         Ok(())
     }
 

+ 1 - 1
frontend/rust-lib/flowy-document/src/services/ws/ws_manager.rs

@@ -39,7 +39,7 @@ impl WsDocumentManager {
 
     pub(crate) fn remove_handler(&self, id: &str) { self.handlers.remove(id); }
 
-    pub fn handle_ws_data(&self, data: Bytes) {
+    pub fn did_receive_ws_data(&self, data: Bytes) {
         let data: WsDocumentData = data.try_into().unwrap();
         match self.handlers.get(&data.doc_id) {
             None => {

+ 0 - 14
frontend/rust-lib/flowy-document/tests/editor/revision_test.rs

@@ -1,4 +1,3 @@
-use flowy_document_infra::core::{Document, FlowyDoc};
 use flowy_test::editor::{EditorScript::*, *};
 use lib_ot::{revision::RevState, rich_text::RichTextDeltaBuilder};
 
@@ -51,16 +50,3 @@ async fn doc_push_test() {
     ];
     EditorTest::new().await.run_scripts(scripts).await;
 }
-
-#[tokio::test]
-async fn doc_push_test2() {
-    let mut document = Document::new::<FlowyDoc>();
-    let delta_1 = document.insert(0, "123").unwrap();
-    let json = document.to_json();
-
-    let scripts = vec![
-        SimulatePushRevisionMessageWithDelta(delta_1),
-        AssertJson(r#"[{"insert":"\n123"}]"#),
-    ];
-    EditorTest::new().await.run_scripts(scripts).await;
-}

+ 11 - 13
frontend/rust-lib/flowy-sdk/src/deps_resolve/document_deps.rs

@@ -29,7 +29,7 @@ impl DocumentDepsResolver {
             user: self.user_session.clone(),
         });
         let ws_manager = Arc::new(WsDocumentManager::new(sender));
-        let ws_handler = Arc::new(WsDocumentReceiver {
+        let ws_handler = Arc::new(DocumentWsMessageReceiver {
             inner: ws_manager.clone(),
         });
         self.user_session.add_ws_handler(ws_handler);
@@ -74,15 +74,13 @@ struct WsSenderImpl {
 
 impl DocumentWebSocket for WsSenderImpl {
     fn send(&self, data: WsDocumentData) -> Result<(), DocError> {
-        if cfg!(feature = "http_server") {
-            let bytes: Bytes = data.try_into().unwrap();
-            let msg = WsMessage {
-                module: WsModule::Doc,
-                data: bytes.to_vec(),
-            };
-            let sender = self.user.ws_sender().map_err(internal_error)?;
-            sender.send_msg(msg).map_err(internal_error)?;
-        }
+        let bytes: Bytes = data.try_into().unwrap();
+        let msg = WsMessage {
+            module: WsModule::Doc,
+            data: bytes.to_vec(),
+        };
+        let sender = self.user.ws_sender().map_err(internal_error)?;
+        sender.send(msg).map_err(internal_error)?;
 
         Ok(())
     }
@@ -90,15 +88,15 @@ impl DocumentWebSocket for WsSenderImpl {
     fn state_notify(&self) -> WsStateReceiver { self.user.ws_state_notifier() }
 }
 
-struct WsDocumentReceiver {
+struct DocumentWsMessageReceiver {
     inner: Arc<WsDocumentManager>,
 }
 
-impl WsMessageHandler for WsDocumentReceiver {
+impl WsMessageHandler for DocumentWsMessageReceiver {
     fn source(&self) -> WsModule { WsModule::Doc }
 
     fn receive_message(&self, msg: WsMessage) {
         let data = Bytes::from(msg.data);
-        self.inner.handle_ws_data(data);
+        self.inner.did_receive_ws_data(data);
     }
 }

+ 1 - 0
frontend/rust-lib/flowy-user/Cargo.toml

@@ -37,6 +37,7 @@ tokio = { version = "1", features = ["rt"] }
 pin-project = "1.0.0"
 futures-core = { version = "0.3", default-features = false }
 r2d2 = "0.8.9"
+dashmap = "4.0"
 
 [dev-dependencies]
 flowy-test = { path = "../flowy-test" }

+ 9 - 8
frontend/rust-lib/flowy-user/src/services/user/user_session.rs

@@ -9,7 +9,10 @@ use crate::{
     notify::*,
     services::{
         server::{construct_user_server, Server},
-        user::{notifier::UserNotifier, ws_manager::WsManager},
+        user::{
+            notifier::UserNotifier,
+            ws_manager::{FlowyWsSender, WsManager},
+        },
     },
 };
 use backend_service::configuration::ClientServerConfiguration;
@@ -23,7 +26,7 @@ use flowy_database::{
 use flowy_user_infra::entities::{SignInResponse, SignUpResponse};
 use lib_infra::{entities::network_state::NetworkState, kv::KV};
 use lib_sqlite::ConnectionPool;
-use lib_ws::{WsConnectState, WsMessageHandler, WsSender};
+use lib_ws::{WsConnectState, WsMessageHandler};
 use parking_lot::RwLock;
 use serde::{Deserialize, Serialize};
 use std::sync::Arc;
@@ -196,8 +199,8 @@ impl UserSession {
         self.notifier.update_network_type(&new_state.ty);
     }
 
-    pub fn ws_sender(&self) -> Result<Arc<WsSender>, UserError> {
-        let sender = self.ws_manager.sender()?;
+    pub fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, UserError> {
+        let sender = self.ws_manager.ws_sender()?;
         Ok(sender)
     }
 
@@ -301,10 +304,8 @@ impl UserSession {
 
     #[tracing::instrument(level = "debug", skip(self, token))]
     pub async fn start_ws_connection(&self, token: &str) -> Result<(), UserError> {
-        if cfg!(feature = "http_server") {
-            let addr = format!("{}/{}", self.server.ws_addr(), token);
-            let _ = self.ws_manager.start(addr).await?;
-        }
+        let addr = format!("{}/{}", self.server.ws_addr(), token);
+        let _ = self.ws_manager.start(addr).await?;
         Ok(())
     }
 }

+ 149 - 15
frontend/rust-lib/flowy-user/src/services/user/ws_manager.rs

@@ -1,12 +1,25 @@
 use crate::errors::UserError;
-use lib_infra::entities::network_state::NetworkType;
-use lib_ws::{WsConnectState, WsController};
+
+use lib_infra::{entities::network_state::NetworkType, future::ResultFuture};
+use lib_ws::{WsConnectState, WsController, WsMessage, WsMessageHandler, WsSender};
 use parking_lot::RwLock;
 use std::sync::Arc;
-use tokio::sync::broadcast;
+use tokio::sync::{broadcast, broadcast::Receiver};
+
+pub trait FlowyWebSocket: Send + Sync {
+    fn start_connect(&self, addr: String) -> ResultFuture<(), UserError>;
+    fn conn_state_subscribe(&self) -> broadcast::Receiver<WsConnectState>;
+    fn reconnect(&self, count: usize) -> ResultFuture<(), UserError>;
+    fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), UserError>;
+    fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, UserError>;
+}
+
+pub trait FlowyWsSender: Send + Sync {
+    fn send(&self, msg: WsMessage) -> Result<(), UserError>;
+}
 
 pub struct WsManager {
-    inner: Arc<WsController>,
+    inner: Arc<dyn FlowyWebSocket>,
     connect_type: RwLock<NetworkType>,
 }
 
@@ -38,12 +51,10 @@ impl WsManager {
         }
     }
 
-    pub fn state_subscribe(&self) -> broadcast::Receiver<WsConnectState> { self.inner.state_subscribe() }
-
     #[tracing::instrument(level = "debug", skip(self))]
     fn listen_on_websocket(&self) {
-        let mut notify = self.inner.state_subscribe();
-        let ws_controller = self.inner.clone();
+        let mut notify = self.inner.conn_state_subscribe();
+        let ws = self.inner.clone();
         let _ = tokio::spawn(async move {
             loop {
                 match notify.recv().await {
@@ -53,7 +64,7 @@ impl WsManager {
                             WsConnectState::Init => {},
                             WsConnectState::Connected => {},
                             WsConnectState::Connecting => {},
-                            WsConnectState::Disconnected => retry_connect(ws_controller.clone(), 100).await,
+                            WsConnectState::Disconnected => retry_connect(ws.clone(), 100).await,
                         }
                     },
                     Err(e) => {
@@ -64,10 +75,22 @@ impl WsManager {
             }
         });
     }
+
+    pub fn state_subscribe(&self) -> broadcast::Receiver<WsConnectState> { self.inner.conn_state_subscribe() }
+
+    pub fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), UserError> {
+        let _ = self.inner.add_handler(handler)?;
+        Ok(())
+    }
+
+    pub fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, UserError> {
+        //
+        self.inner.ws_sender()
+    }
 }
 
-async fn retry_connect(ws_controller: Arc<WsController>, count: usize) {
-    match ws_controller.retry(count).await {
+async fn retry_connect(ws: Arc<dyn FlowyWebSocket>, count: usize) {
+    match ws.reconnect(count).await {
         Ok(_) => {},
         Err(e) => {
             log::error!("websocket connect failed: {:?}", e);
@@ -77,15 +100,126 @@ async fn retry_connect(ws_controller: Arc<WsController>, count: usize) {
 
 impl std::default::Default for WsManager {
     fn default() -> Self {
+        let ws: Arc<dyn FlowyWebSocket> = if cfg!(feature = "http_server") {
+            Arc::new(Arc::new(WsController::new()))
+        } else {
+            mock::MockWebSocket::new()
+        };
+
         WsManager {
-            inner: Arc::new(WsController::new()),
+            inner: ws,
             connect_type: RwLock::new(NetworkType::default()),
         }
     }
 }
 
-impl std::ops::Deref for WsManager {
-    type Target = WsController;
+impl FlowyWebSocket for Arc<WsController> {
+    fn start_connect(&self, addr: String) -> ResultFuture<(), UserError> {
+        let cloned_ws = self.clone();
+        ResultFuture::new(async move {
+            let _ = cloned_ws.start(addr).await?;
+            Ok(())
+        })
+    }
+
+    fn conn_state_subscribe(&self) -> Receiver<WsConnectState> { self.state_subscribe() }
+
+    fn reconnect(&self, count: usize) -> ResultFuture<(), UserError> {
+        let cloned_ws = self.clone();
+        ResultFuture::new(async move {
+            let _ = cloned_ws.retry(count).await?;
+            Ok(())
+        })
+    }
+
+    fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), UserError> {
+        let _ = self.add_handler(handler)?;
+        Ok(())
+    }
+
+    fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, UserError> {
+        let sender = self.sender()?;
+        Ok(sender)
+    }
+}
+
+impl FlowyWsSender for WsSender {
+    fn send(&self, msg: WsMessage) -> Result<(), UserError> {
+        let _ = self.send_msg(msg)?;
+        Ok(())
+    }
+}
+
+// #[cfg(not(feature = "http_server"))]
+mod mock {
+    use crate::{
+        errors::UserError,
+        services::user::ws_manager::{FlowyWebSocket, FlowyWsSender},
+    };
+    use dashmap::DashMap;
+    use lib_infra::future::ResultFuture;
+    use lib_ws::{WsConnectState, WsMessage, WsMessageHandler, WsModule};
+    use std::sync::Arc;
+    use tokio::sync::{broadcast, broadcast::Receiver};
+
+    pub struct MockWebSocket {
+        handlers: DashMap<WsModule, Arc<dyn WsMessageHandler>>,
+        state_sender: broadcast::Sender<WsConnectState>,
+        ws_sender: broadcast::Sender<WsMessage>,
+    }
+
+    impl std::default::Default for MockWebSocket {
+        fn default() -> Self {
+            let (state_sender, _) = broadcast::channel(16);
+            let (ws_sender, _) = broadcast::channel(16);
+            MockWebSocket {
+                handlers: DashMap::new(),
+                state_sender,
+                ws_sender,
+            }
+        }
+    }
+
+    impl MockWebSocket {
+        pub fn new() -> Arc<MockWebSocket> {
+            let ws = Arc::new(MockWebSocket::default());
+            let mut ws_receiver = ws.ws_sender.subscribe();
+            let cloned_ws = ws.clone();
+            tokio::spawn(async move {
+                while let Ok(message) = ws_receiver.recv().await {
+                    match cloned_ws.handlers.get(&message.module) {
+                        None => log::error!("Can't find any handler for message: {:?}", message),
+                        Some(handler) => handler.receive_message(message.clone()),
+                    }
+                }
+            });
+            ws
+        }
+    }
+
+    impl FlowyWebSocket for MockWebSocket {
+        fn start_connect(&self, _addr: String) -> ResultFuture<(), UserError> { ResultFuture::new(async { Ok(()) }) }
+
+        fn conn_state_subscribe(&self) -> Receiver<WsConnectState> { self.state_sender.subscribe() }
+
+        fn reconnect(&self, _count: usize) -> ResultFuture<(), UserError> { ResultFuture::new(async { Ok(()) }) }
+
+        fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), UserError> {
+            let source = handler.source();
+            if self.handlers.contains_key(&source) {
+                log::error!("WsSource's {:?} is already registered", source);
+            }
+            self.handlers.insert(source, handler);
+            Ok(())
+        }
+
+        fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, UserError> { Ok(Arc::new(self.ws_sender.clone())) }
+    }
 
-    fn deref(&self) -> &Self::Target { &self.inner }
+    impl FlowyWsSender for broadcast::Sender<WsMessage> {
+        fn send(&self, _msg: WsMessage) -> Result<(), UserError> {
+            // let _ = self.send(msg).unwrap();
+            Ok(())
+        }
+    }
 }

+ 1 - 1
shared-lib/lib-ws/src/ws.rs

@@ -68,7 +68,7 @@ impl WsController {
         Ok(())
     }
 
-    pub async fn start_connect(&self, addr: String) -> Result<(), ServerError> {
+    pub async fn start(&self, addr: String) -> Result<(), ServerError> {
         *self.addr.write() = Some(addr.clone());
 
         let strategy = FixedInterval::from_millis(5000).take(3);