浏览代码

start ws connect after sign up

appflowy 3 年之前
父节点
当前提交
84d5d2c2f1

+ 3 - 1
frontend/rust-lib/flowy-net/src/services/mock/ws_mock.rs

@@ -61,7 +61,9 @@ impl FlowyWebSocket for Arc<MockWebSocket> {
         FutureResult::new(async { Ok(()) })
     }
 
-    fn conn_state_subscribe(&self) -> Receiver<WsConnectState> { self.state_sender.subscribe() }
+    fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
+
+    fn subscribe_connect_state(&self) -> Receiver<WsConnectState> { self.state_sender.subscribe() }
 
     fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
 

+ 2 - 1
frontend/rust-lib/flowy-net/src/services/ws/conn.rs

@@ -7,7 +7,8 @@ pub use lib_ws::{WsConnectState, WsMessage, WsMessageHandler};
 
 pub trait FlowyWebSocket: Send + Sync {
     fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError>;
-    fn conn_state_subscribe(&self) -> broadcast::Receiver<WsConnectState>;
+    fn stop_connect(&self) -> FutureResult<(), FlowyError>;
+    fn subscribe_connect_state(&self) -> broadcast::Receiver<WsConnectState>;
     fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError>;
     fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), FlowyError>;
     fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError>;

+ 40 - 28
frontend/rust-lib/flowy-net/src/services/ws/manager.rs

@@ -23,8 +23,8 @@ impl WsManager {
         } else {
             local_web_socket()
         };
-
         let (status_notifier, _) = broadcast::channel(10);
+        listen_on_websocket(ws.clone());
         WsManager {
             inner: ws,
             connect_type: RwLock::new(NetworkType::default()),
@@ -35,11 +35,14 @@ impl WsManager {
 
     pub async fn start(&self, token: String) -> Result<(), FlowyError> {
         let addr = format!("{}/{}", self.addr, token);
-        self.listen_on_websocket();
+        self.inner.stop_connect().await;
+
         let _ = self.inner.start_connect(addr).await?;
         Ok(())
     }
 
+    pub async fn stop(&self) { self.inner.stop_connect().await; }
+
     pub fn update_network_type(&self, new_type: &NetworkType) {
         tracing::debug!("Network new state: {:?}", new_type);
         let old_type = self.connect_type.read().clone();
@@ -62,33 +65,10 @@ impl WsManager {
         }
     }
 
-    #[tracing::instrument(level = "debug", skip(self))]
-    fn listen_on_websocket(&self) {
-        let mut notify = self.inner.conn_state_subscribe();
-        let ws = self.inner.clone();
-        let _ = tokio::spawn(async move {
-            loop {
-                match notify.recv().await {
-                    Ok(state) => {
-                        tracing::info!("Websocket state changed: {}", state);
-                        match state {
-                            WsConnectState::Init => {},
-                            WsConnectState::Connected => {},
-                            WsConnectState::Connecting => {},
-                            WsConnectState::Disconnected => retry_connect(ws.clone(), 100).await,
-                        }
-                    },
-                    Err(e) => {
-                        tracing::error!("Websocket state notify error: {:?}", e);
-                        break;
-                    },
-                }
-            }
-        });
+    pub fn subscribe_websocket_state(&self) -> broadcast::Receiver<WsConnectState> {
+        self.inner.subscribe_connect_state()
     }
 
-    pub fn subscribe_websocket_state(&self) -> broadcast::Receiver<WsConnectState> { self.inner.conn_state_subscribe() }
-
     pub fn subscribe_network_ty(&self) -> broadcast::Receiver<NetworkType> { self.status_notifier.subscribe() }
 
     pub fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), FlowyError> {
@@ -99,6 +79,30 @@ impl WsManager {
     pub fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError> { self.inner.ws_sender() }
 }
 
+#[tracing::instrument(level = "debug", skip(ws))]
+fn listen_on_websocket(ws: Arc<dyn FlowyWebSocket>) {
+    let mut notify = ws.subscribe_connect_state();
+    let _ = tokio::spawn(async move {
+        loop {
+            match notify.recv().await {
+                Ok(state) => {
+                    tracing::info!("Websocket state changed: {}", state);
+                    match state {
+                        WsConnectState::Init => {},
+                        WsConnectState::Connected => {},
+                        WsConnectState::Connecting => {},
+                        WsConnectState::Disconnected => retry_connect(ws.clone(), 100).await,
+                    }
+                },
+                Err(e) => {
+                    tracing::error!("Websocket state notify error: {:?}", e);
+                    break;
+                },
+            }
+        }
+    });
+}
+
 async fn retry_connect(ws: Arc<dyn FlowyWebSocket>, count: usize) {
     match ws.reconnect(count).await {
         Ok(_) => {},
@@ -117,7 +121,15 @@ impl FlowyWebSocket for Arc<WsController> {
         })
     }
 
-    fn conn_state_subscribe(&self) -> Receiver<WsConnectState> { self.state_subscribe() }
+    fn stop_connect(&self) -> FutureResult<(), FlowyError> {
+        let controller = self.clone();
+        FutureResult::new(async move {
+            controller.stop().await;
+            Ok(())
+        })
+    }
+
+    fn subscribe_connect_state(&self) -> Receiver<WsConnectState> { self.subscribe_state() }
 
     fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError> {
         let cloned_ws = self.clone();

+ 3 - 1
frontend/rust-lib/flowy-net/src/services/ws/ws_local.rs

@@ -22,7 +22,9 @@ impl std::default::Default for LocalWebSocket {
 impl FlowyWebSocket for Arc<LocalWebSocket> {
     fn start_connect(&self, _addr: String) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
 
-    fn conn_state_subscribe(&self) -> Receiver<WsConnectState> { self.state_sender.subscribe() }
+    fn stop_connect(&self) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
+
+    fn subscribe_connect_state(&self) -> Receiver<WsConnectState> { self.state_sender.subscribe() }
 
     fn reconnect(&self, _count: usize) -> FutureResult<(), FlowyError> { FutureResult::new(async { Ok(()) }) }
 

+ 3 - 0
frontend/rust-lib/flowy-sdk/src/lib.rs

@@ -134,12 +134,15 @@ async fn _listen_user_status(
                 },
                 UserStatus::Logout { .. } => {
                     core.user_did_logout().await;
+                    let _ = ws_manager.stop().await;
                 },
                 UserStatus::Expired { .. } => {
                     core.user_session_expired().await;
+                    let _ = ws_manager.stop().await;
                 },
                 UserStatus::SignUp { profile, ret } => {
                     let _ = core.user_did_sign_up(&profile.token).await?;
+                    let _ = ws_manager.start(profile.token.clone()).await?;
                     let _ = ret.send(());
                 },
             }

+ 5 - 4
shared-lib/lib-ws/src/ws.rs

@@ -70,11 +70,12 @@ impl WsController {
 
     pub async fn start(&self, addr: String) -> Result<(), ServerError> {
         *self.addr.write() = Some(addr.clone());
-
         let strategy = FixedInterval::from_millis(5000).take(3);
         self.connect(addr, strategy).await
     }
 
+    pub async fn stop(&self) { self.sender_ctrl.write().set_state(WsConnectState::Disconnected); }
+
     async fn connect<T, I>(&self, addr: String, strategy: T) -> Result<(), ServerError>
     where
         T: IntoIterator<IntoIter = I, Item = Duration>,
@@ -130,7 +131,7 @@ impl WsController {
         self.connect(addr, strategy).await
     }
 
-    pub fn state_subscribe(&self) -> broadcast::Receiver<WsConnectState> { self.state_notify.subscribe() }
+    pub fn subscribe_state(&self) -> broadcast::Receiver<WsConnectState> { self.state_notify.subscribe() }
 
     pub fn sender(&self) -> Result<Arc<WsSender>, WsError> {
         match self.sender_ctrl.read().sender() {
@@ -359,8 +360,8 @@ impl WsSenderController {
             self.sender = None;
         }
 
-        self.state = state.clone();
-        let _ = self.state_notify.send(state);
+        self.state = state;
+        let _ = self.state_notify.send(self.state.clone());
     }
 
     fn set_error(&mut self, error: WsError) {