Переглянути джерело

fix: potential async errors (#1772)

Nathan.fooo 2 роки тому
батько
коміт
71022ed934

+ 1 - 0
frontend/appflowy_tauri/src-tauri/Cargo.lock

@@ -1215,6 +1215,7 @@ dependencies = [
  "revision-model",
  "tokio",
  "tracing",
+ "user-model",
  "ws-model",
 ]
 

+ 6 - 6
frontend/appflowy_tauri/src/services/backend/index.ts

@@ -1,6 +1,6 @@
-export * from './classes/flowy-user';
-export * from './classes/flowy-document';
-export * from './classes/flowy-grid';
-export * from './classes/flowy-folder';
-export * from './classes/flowy-net';
-export * from './classes/flowy-error';
+export * from "./classes/flowy-user";
+export * from "./classes/flowy-document";
+export * from "./classes/flowy-database";
+export * from "./classes/flowy-folder";
+export * from "./classes/flowy-net";
+export * from "./classes/flowy-error";

+ 1 - 0
frontend/rust-lib/Cargo.lock

@@ -1003,6 +1003,7 @@ dependencies = [
  "revision-model",
  "tokio",
  "tracing",
+ "user-model",
  "ws-model",
 ]
 

+ 5 - 3
frontend/rust-lib/flowy-core/Cargo.toml

@@ -10,12 +10,13 @@ lib-dispatch = { path = "../lib-dispatch" }
 lib-log = { path = "../lib-log" }
 flowy-user = { path = "../flowy-user" }
 flowy-net = { path = "../flowy-net" }
-flowy-folder = { path = "../flowy-folder", default-features = false }
-flowy-database = { path = "../flowy-database", default-features = false }
+flowy-folder = { path = "../flowy-folder" }
+flowy-database = { path = "../flowy-database" }
 grid-model = { path = "../../../shared-lib/grid-model" }
+user-model = { path = "../../../shared-lib/user-model" }
 flowy-client-ws = { path = "../../../shared-lib/flowy-client-ws" }
 flowy-sqlite = { path = "../flowy-sqlite", optional = true }
-flowy-document = { path = "../flowy-document", default-features = false }
+flowy-document = { path = "../flowy-document" }
 flowy-revision = { path = "../flowy-revision" }
 flowy-error = { path = "../flowy-error", features = ["adaptor_ws"] }
 flowy-task = { path = "../flowy-task" }
@@ -32,6 +33,7 @@ lib-ws = { path = "../../../shared-lib/lib-ws" }
 lib-infra = { path = "../../../shared-lib/lib-infra" }
 
 [features]
+default = ["rev-sqlite"]
 http_sync = ["flowy-folder/cloud_sync", "flowy-document/cloud_sync"]
 native_sync = ["flowy-folder/cloud_sync", "flowy-document/cloud_sync"]
 use_bunyan = ["lib-log/use_bunyan"]

+ 100 - 97
frontend/rust-lib/flowy-core/src/lib.rs

@@ -1,21 +1,23 @@
 mod deps_resolve;
 pub mod module;
-pub use flowy_net::get_client_server_configuration;
-
 use crate::deps_resolve::*;
-
 use flowy_client_ws::{listen_on_websocket, FlowyWebSocketConnect, NetworkType};
 use flowy_database::manager::DatabaseManager;
 use flowy_document::entities::DocumentVersionPB;
 use flowy_document::{DocumentConfig, DocumentManager};
+use flowy_error::FlowyResult;
 use flowy_folder::entities::ViewDataFormatPB;
 use flowy_folder::{errors::FlowyError, manager::FolderManager};
+pub use flowy_net::get_client_server_configuration;
 use flowy_net::local_server::LocalServer;
 use flowy_net::ClientServerConfiguration;
 use flowy_task::{TaskDispatcher, TaskRunner};
-use flowy_user::services::{notifier::UserStatus, UserSession, UserSessionConfig};
+use flowy_user::event_map::UserStatusCallback;
+use flowy_user::services::{UserSession, UserSessionConfig};
 use lib_dispatch::prelude::*;
 use lib_dispatch::runtime::tokio_default_runtime;
+
+use lib_infra::future::{to_fut, Fut};
 use module::make_plugins;
 pub use module::*;
 use std::time::Duration;
@@ -27,6 +29,7 @@ use std::{
     },
 };
 use tokio::sync::{broadcast, RwLock};
+use user_model::UserProfile;
 
 static INIT_LOG: AtomicBool = AtomicBool::new(false);
 
@@ -83,9 +86,10 @@ fn create_log_filter(level: String, with_crates: Vec<String>) -> String {
     filters.push(format!("flowy_folder={}", level));
     filters.push(format!("flowy_user={}", level));
     filters.push(format!("flowy_document={}", level));
-    filters.push(format!("flowy_grid={}", level));
-    filters.push(format!("flowy_collaboration={}", "info"));
-    filters.push(format!("flowy_notification={}", level));
+    filters.push(format!("flowy_database={}", level));
+    filters.push(format!("flowy_sync={}", "info"));
+    filters.push(format!("flowy_client_sync={}", "info"));
+    filters.push(format!("flowy_notification={}", "info"));
     filters.push(format!("lib_ot={}", level));
     filters.push(format!("lib_ws={}", level));
     filters.push(format!("lib_infra={}", level));
@@ -162,6 +166,21 @@ impl FlowySDK {
             )
         });
 
+        let user_status_listener = UserStatusListener {
+            document_manager: document_manager.clone(),
+            folder_manager: folder_manager.clone(),
+            grid_manager: grid_manager.clone(),
+            ws_conn: ws_conn.clone(),
+            config: config.clone(),
+        };
+        let user_status_callback = UserStatusCallbackImpl {
+            listener: Arc::new(user_status_listener),
+        };
+        let cloned_user_session = user_session.clone();
+        runtime.block_on(async move {
+            cloned_user_session.clone().init(user_status_callback).await;
+        });
+
         let event_dispatcher = Arc::new(AFPluginDispatcher::construct(runtime, || {
             make_plugins(
                 &ws_conn,
@@ -171,16 +190,7 @@ impl FlowySDK {
                 &document_manager,
             )
         }));
-
-        _start_listening(
-            &config,
-            &event_dispatcher,
-            &ws_conn,
-            &user_session,
-            &document_manager,
-            &folder_manager,
-            &grid_manager,
-        );
+        _start_listening(&event_dispatcher, &ws_conn, &folder_manager);
 
         Self {
             config,
@@ -201,36 +211,17 @@ impl FlowySDK {
 }
 
 fn _start_listening(
-    config: &FlowySDKConfig,
     event_dispatcher: &AFPluginDispatcher,
     ws_conn: &Arc<FlowyWebSocketConnect>,
-    user_session: &Arc<UserSession>,
-    document_manager: &Arc<DocumentManager>,
     folder_manager: &Arc<FolderManager>,
-    grid_manager: &Arc<DatabaseManager>,
 ) {
-    let subscribe_user_status = user_session.notifier.subscribe_user_status();
     let subscribe_network_type = ws_conn.subscribe_network_ty();
     let folder_manager = folder_manager.clone();
-    let grid_manager = grid_manager.clone();
-    let cloned_folder_manager = folder_manager.clone();
+    let cloned_folder_manager = folder_manager;
     let ws_conn = ws_conn.clone();
-    let user_session = user_session.clone();
-    let document_manager = document_manager.clone();
-    let config = config.clone();
 
     event_dispatcher.spawn(async move {
-        user_session.init();
         listen_on_websocket(ws_conn.clone());
-        _listen_user_status(
-            config,
-            ws_conn.clone(),
-            subscribe_user_status,
-            document_manager,
-            folder_manager,
-            grid_manager,
-        )
-        .await;
     });
 
     event_dispatcher.spawn(async move {
@@ -253,66 +244,6 @@ fn mk_local_server(
     }
 }
 
-async fn _listen_user_status(
-    config: FlowySDKConfig,
-    ws_conn: Arc<FlowyWebSocketConnect>,
-    mut subscribe: broadcast::Receiver<UserStatus>,
-    document_manager: Arc<DocumentManager>,
-    folder_manager: Arc<FolderManager>,
-    grid_manager: Arc<DatabaseManager>,
-) {
-    while let Ok(status) = subscribe.recv().await {
-        let result = || async {
-            match status {
-                UserStatus::Login { token, user_id } => {
-                    tracing::trace!("User did login");
-                    folder_manager.initialize(&user_id, &token).await?;
-                    document_manager.initialize(&user_id).await?;
-                    grid_manager.initialize(&user_id, &token).await?;
-                    ws_conn.start(token, user_id).await?;
-                }
-                UserStatus::Logout { token: _, user_id } => {
-                    tracing::trace!("User did logout");
-                    folder_manager.clear(&user_id).await;
-                    ws_conn.stop().await;
-                }
-                UserStatus::Expired { token: _, user_id } => {
-                    tracing::trace!("User session has been expired");
-                    folder_manager.clear(&user_id).await;
-                    ws_conn.stop().await;
-                }
-                UserStatus::SignUp { profile, ret } => {
-                    tracing::trace!("User did sign up");
-
-                    let view_data_type = match config.document.version {
-                        DocumentVersionPB::V0 => ViewDataFormatPB::DeltaFormat,
-                        DocumentVersionPB::V1 => ViewDataFormatPB::TreeFormat,
-                    };
-                    folder_manager
-                        .initialize_with_new_user(&profile.id, &profile.token, view_data_type)
-                        .await?;
-                    document_manager
-                        .initialize_with_new_user(&profile.id, &profile.token)
-                        .await?;
-
-                    grid_manager
-                        .initialize_with_new_user(&profile.id, &profile.token)
-                        .await?;
-
-                    ws_conn.start(profile.token.clone(), profile.id.clone()).await?;
-                    let _ = ret.send(());
-                }
-            }
-            Ok::<(), FlowyError>(())
-        };
-
-        match result().await {
-            Ok(_) => {}
-            Err(e) => tracing::error!("{}", e),
-        }
-    }
-}
-
 async fn _listen_network_status(mut subscribe: broadcast::Receiver<NetworkType>, _core: Arc<FolderManager>) {
     while let Ok(_new_type) = subscribe.recv().await {
         // core.network_state_changed(new_type);
@@ -345,3 +276,75 @@ fn mk_user_session(
     let cloud_service = UserDepsResolver::resolve(local_server, server_config);
     Arc::new(UserSession::new(user_config, cloud_service))
 }
+
+struct UserStatusListener {
+    document_manager: Arc<DocumentManager>,
+    folder_manager: Arc<FolderManager>,
+    grid_manager: Arc<DatabaseManager>,
+    ws_conn: Arc<FlowyWebSocketConnect>,
+    config: FlowySDKConfig,
+}
+
+impl UserStatusListener {
+    async fn did_sign_in(&self, token: &str, user_id: &str) -> FlowyResult<()> {
+        self.folder_manager.initialize(user_id, token).await?;
+        self.document_manager.initialize(user_id).await?;
+        self.grid_manager.initialize(user_id, token).await?;
+        self.ws_conn.start(token.to_owned(), user_id.to_owned()).await?;
+        Ok(())
+    }
+
+    async fn did_sign_up(&self, user_profile: &UserProfile) -> FlowyResult<()> {
+        let view_data_type = match self.config.document.version {
+            DocumentVersionPB::V0 => ViewDataFormatPB::DeltaFormat,
+            DocumentVersionPB::V1 => ViewDataFormatPB::TreeFormat,
+        };
+        self.folder_manager
+            .initialize_with_new_user(&user_profile.id, &user_profile.token, view_data_type)
+            .await?;
+        self.document_manager
+            .initialize_with_new_user(&user_profile.id, &user_profile.token)
+            .await?;
+
+        self.grid_manager
+            .initialize_with_new_user(&user_profile.id, &user_profile.token)
+            .await?;
+
+        self.ws_conn
+            .start(user_profile.token.clone(), user_profile.id.clone())
+            .await?;
+        Ok(())
+    }
+
+    async fn did_expired(&self, _token: &str, user_id: &str) -> FlowyResult<()> {
+        self.folder_manager.clear(user_id).await;
+        self.ws_conn.stop().await;
+        Ok(())
+    }
+}
+
+struct UserStatusCallbackImpl {
+    listener: Arc<UserStatusListener>,
+}
+
+impl UserStatusCallback for UserStatusCallbackImpl {
+    fn did_sign_in(&self, token: &str, user_id: &str) -> Fut<FlowyResult<()>> {
+        let listener = self.listener.clone();
+        let token = token.to_owned();
+        let user_id = user_id.to_owned();
+        to_fut(async move { listener.did_sign_in(&token, &user_id).await })
+    }
+
+    fn did_sign_up(&self, user_profile: &UserProfile) -> Fut<FlowyResult<()>> {
+        let listener = self.listener.clone();
+        let user_profile = user_profile.clone();
+        to_fut(async move { listener.did_sign_up(&user_profile).await })
+    }
+
+    fn did_expired(&self, token: &str, user_id: &str) -> Fut<FlowyResult<()>> {
+        let listener = self.listener.clone();
+        let token = token.to_owned();
+        let user_id = user_id.to_owned();
+        to_fut(async move { listener.did_expired(&token, &user_id).await })
+    }
+}

+ 1 - 1
frontend/rust-lib/flowy-database/Cargo.toml

@@ -56,7 +56,7 @@ flowy-database = { path = "", features = ["flowy_unit_test"]}
 flowy-codegen = { path = "../flowy-codegen"}
 
 [features]
-default = []
+default = ["rev-sqlite"]
 rev-sqlite = ["flowy-sqlite"]
 dart = ["flowy-codegen/dart", "flowy-notification/dart"]
 ts = ["flowy-codegen/ts", "flowy-notification/ts"]

+ 1 - 0
frontend/rust-lib/flowy-document/Cargo.toml

@@ -58,6 +58,7 @@ flowy-codegen = { path = "../flowy-codegen"}
 
 
 [features]
+default = ["rev-sqlite"]
 sync = []
 cloud_sync = ["sync"]
 rev-sqlite = ["flowy-sqlite"]

+ 0 - 1
frontend/rust-lib/flowy-document/src/services/migration.rs

@@ -65,7 +65,6 @@ impl DocumentMigration {
         //
 
         KV::set_bool(&key, true);
-        tracing::debug!("Run document v1 migration");
         Ok(())
     }
 }

+ 1 - 1
frontend/rust-lib/flowy-folder/Cargo.toml

@@ -48,7 +48,7 @@ flowy-codegen = { path = "../flowy-codegen"}
 
 
 [features]
-default = []
+default = ["rev-sqlite"]
 sync = []
 cloud_sync = ["sync"]
 rev-sqlite = ["flowy-sqlite", "flowy-folder/rev-sqlite"]

+ 4 - 4
frontend/rust-lib/flowy-folder/src/services/view/controller.rs

@@ -209,7 +209,7 @@ impl ViewController {
         let deleted_view = self
             .persistence
             .begin_transaction(|transaction| {
-                let view = transaction.read_view(&view_id)?;
+                let view = transaction.read_view(view_id)?;
                 let views = read_belonging_views_on_local(&view.app_id, self.trash_controller.clone(), &transaction)?;
 
                 let index = views
@@ -223,12 +223,12 @@ impl ViewController {
             })
             .await?;
 
-        send_notification(&view_id, FolderNotification::ViewMoveToTrash)
+        send_notification(view_id, FolderNotification::ViewMoveToTrash)
             .payload(deleted_view)
             .send();
 
-        let processor = self.get_data_processor_from_view_id(&view_id).await?;
-        processor.close_view(&view_id).await?;
+        let processor = self.get_data_processor_from_view_id(view_id).await?;
+        processor.close_view(view_id).await?;
         Ok(())
     }
 

+ 1 - 1
frontend/rust-lib/flowy-folder/src/services/view/event_handler.rs

@@ -62,7 +62,7 @@ pub(crate) async fn delete_view_handler(
 ) -> Result<(), FlowyError> {
     let params: RepeatedViewIdPB = data.into_inner();
     for view_id in &params.items {
-        let _ = view_controller.move_view_to_trash(&view_id).await;
+        let _ = view_controller.move_view_to_trash(view_id).await;
     }
 
     let trash = view_controller

+ 2 - 2
frontend/rust-lib/flowy-net/src/request.rs

@@ -208,7 +208,7 @@ impl HttpRequestBuilder {
                 self.response = Some(flowy_response.data);
                 Ok(self)
             }
-            Some(error) => Err(FlowyError::new(error.code.into(), &error.msg)),
+            Some(error) => Err(FlowyError::new(error.code, &error.msg)),
         }
     }
 }
@@ -231,7 +231,7 @@ async fn get_response_data(original: Response) -> Result<Bytes, FlowyError> {
         let response: HttpResponse = serde_json::from_slice(&bytes)?;
         match response.error {
             None => Ok(response.data),
-            Some(error) => Err(FlowyError::new(error.code.into(), &error.msg)),
+            Some(error) => Err(FlowyError::new(error.code, &error.msg)),
         }
     } else {
         Err(FlowyError::http().context(original))

+ 1 - 1
frontend/rust-lib/flowy-revision/src/rev_manager.rs

@@ -127,7 +127,7 @@ impl<Connection: 'static> RevisionManager<Connection> {
         }
     }
 
-    #[tracing::instrument(name = "revision_manager_initialize", level = "debug", skip_all, fields(deserializer, object_id, deserialize_revisions) err)]
+    #[tracing::instrument(name = "revision_manager_initialize", level = "trace", skip_all, fields(deserializer, object_id, deserialize_revisions) err)]
     pub async fn initialize<B>(&mut self, _cloud: Option<Arc<dyn RevisionCloudService>>) -> FlowyResult<B::Output>
     where
         B: RevisionObjectDeserializer,

+ 3 - 3
frontend/rust-lib/flowy-test/Cargo.toml

@@ -6,11 +6,11 @@ edition = "2018"
 # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
 [dependencies]
-flowy-core = { path = "../flowy-core", default-features = false }
+flowy-core = { path = "../flowy-core" }
 flowy-user = { path = "../flowy-user"}
 flowy-net = { path = "../flowy-net"}
-flowy-folder = { path = "../flowy-folder", default-features = false}
-flowy-document= { path = "../flowy-document", default-features = false}
+flowy-folder = { path = "../flowy-folder" }
+flowy-document= { path = "../flowy-document" }
 lib-dispatch = { path = "../lib-dispatch" }
 
 flowy-client-sync = { path = "../flowy-client-sync"}

+ 1 - 0
frontend/rust-lib/flowy-user/Cargo.toml

@@ -34,6 +34,7 @@ flowy-test = { path = "../flowy-test" }
 nanoid = "0.4.0"
 
 [features]
+default = ["rev-sqlite"]
 rev-sqlite = ["flowy-sqlite"]
 dart = ["flowy-codegen/dart", "flowy-notification/dart"]
 ts = ["flowy-codegen/ts", "flowy-notification/ts"]

+ 13 - 1
frontend/rust-lib/flowy-user/src/entities/user_profile.rs

@@ -1,7 +1,7 @@
 use crate::errors::ErrorCode;
 use flowy_derive::ProtoBuf;
 use std::convert::TryInto;
-use user_model::{UpdateUserProfileParams, UserEmail, UserIcon, UserId, UserName, UserPassword};
+use user_model::{UpdateUserProfileParams, UserEmail, UserIcon, UserId, UserName, UserPassword, UserProfile};
 
 #[derive(Default, ProtoBuf)]
 pub struct UserTokenPB {
@@ -33,6 +33,18 @@ pub struct UserProfilePB {
     pub icon_url: String,
 }
 
+impl std::convert::From<UserProfile> for UserProfilePB {
+    fn from(user_profile: UserProfile) -> Self {
+        Self {
+            id: user_profile.id,
+            email: user_profile.email,
+            name: user_profile.name,
+            token: user_profile.token,
+            icon_url: user_profile.icon_url,
+        }
+    }
+}
+
 #[derive(ProtoBuf, Default)]
 pub struct UpdateUserProfilePayloadPB {
     #[pb(index = 1)]

+ 10 - 2
frontend/rust-lib/flowy-user/src/event_map.rs

@@ -1,9 +1,10 @@
 use crate::entities::UserProfilePB;
 use crate::{errors::FlowyError, handlers::*, services::UserSession};
 use lib_dispatch::prelude::*;
-use lib_infra::future::FutureResult;
+
+use lib_infra::future::{Fut, FutureResult};
 use std::sync::Arc;
-use user_model::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserProfileParams};
+use user_model::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserProfileParams, UserProfile};
 
 pub fn init(user_session: Arc<UserSession>) -> AFPlugin {
     AFPlugin::new()
@@ -21,6 +22,12 @@ pub fn init(user_session: Arc<UserSession>) -> AFPlugin {
         .event(UserEvent::GetUserSetting, get_user_setting)
 }
 
+pub trait UserStatusCallback: Send + Sync + 'static {
+    fn did_sign_in(&self, token: &str, user_id: &str) -> Fut<FlowyResult<()>>;
+    fn did_sign_up(&self, user_profile: &UserProfile) -> Fut<FlowyResult<()>>;
+    fn did_expired(&self, token: &str, user_id: &str) -> Fut<FlowyResult<()>>;
+}
+
 pub trait UserCloudService: Send + Sync {
     fn sign_up(&self, params: SignUpParams) -> FutureResult<SignUpResponse, FlowyError>;
     fn sign_in(&self, params: SignInParams) -> FutureResult<SignInResponse, FlowyError>;
@@ -31,6 +38,7 @@ pub trait UserCloudService: Send + Sync {
 }
 
 use flowy_derive::{Flowy_Event, ProtoBuf_Enum};
+use flowy_error::FlowyResult;
 use strum_macros::Display;
 
 #[derive(Clone, Copy, PartialEq, Eq, Debug, Display, Hash, ProtoBuf_Enum, Flowy_Event)]

+ 2 - 2
frontend/rust-lib/flowy-user/src/handlers/auth_handler.rs

@@ -12,7 +12,7 @@ pub async fn sign_in(
     session: AFPluginState<Arc<UserSession>>,
 ) -> DataResult<UserProfilePB, FlowyError> {
     let params: SignInParams = data.into_inner().try_into()?;
-    let user_profile = session.sign_in(params).await?;
+    let user_profile: UserProfilePB = session.sign_in(params).await?.into();
     data_result(user_profile)
 }
 
@@ -31,7 +31,7 @@ pub async fn sign_up(
     session: AFPluginState<Arc<UserSession>>,
 ) -> DataResult<UserProfilePB, FlowyError> {
     let params: SignUpParams = data.into_inner().try_into()?;
-    let user_profile = session.sign_up(params).await?;
+    let user_profile: UserProfilePB = session.sign_up(params).await?.into();
 
     data_result(user_profile)
 }

+ 2 - 2
frontend/rust-lib/flowy-user/src/handlers/user_handler.rs

@@ -15,7 +15,7 @@ pub async fn init_user_handler(session: AFPluginState<Arc<UserSession>>) -> Resu
 
 #[tracing::instrument(level = "debug", skip(session))]
 pub async fn check_user_handler(session: AFPluginState<Arc<UserSession>>) -> DataResult<UserProfilePB, FlowyError> {
-    let user_profile = session.check_user().await?;
+    let user_profile: UserProfilePB = session.check_user().await?.into();
     data_result(user_profile)
 }
 
@@ -23,7 +23,7 @@ pub async fn check_user_handler(session: AFPluginState<Arc<UserSession>>) -> Dat
 pub async fn get_user_profile_handler(
     session: AFPluginState<Arc<UserSession>>,
 ) -> DataResult<UserProfilePB, FlowyError> {
-    let user_profile = session.get_user_profile().await?;
+    let user_profile: UserProfilePB = session.get_user_profile().await?.into();
     data_result(user_profile)
 }
 

+ 3 - 4
frontend/rust-lib/flowy-user/src/services/database.rs

@@ -1,4 +1,3 @@
-use crate::entities::UserProfilePB;
 use flowy_error::{ErrorCode, FlowyError};
 use flowy_sqlite::ConnectionPool;
 use flowy_sqlite::{schema::user_table, DBConnection, Database};
@@ -6,7 +5,7 @@ use lazy_static::lazy_static;
 use parking_lot::RwLock;
 use std::path::PathBuf;
 use std::{collections::HashMap, sync::Arc, time::Duration};
-use user_model::{SignInResponse, SignUpResponse, UpdateUserProfileParams};
+use user_model::{SignInResponse, SignUpResponse, UpdateUserProfileParams, UserProfile};
 
 pub struct UserDB {
     db_dir: String,
@@ -117,9 +116,9 @@ impl std::convert::From<SignInResponse> for UserTable {
     }
 }
 
-impl std::convert::From<UserTable> for UserProfilePB {
+impl std::convert::From<UserTable> for UserProfile {
     fn from(table: UserTable) -> Self {
-        UserProfilePB {
+        UserProfile {
             id: table.id,
             email: table.email,
             name: table.name,

+ 0 - 1
frontend/rust-lib/flowy-user/src/services/mod.rs

@@ -1,4 +1,3 @@
 pub mod database;
-pub mod notifier;
 mod user_session;
 pub use user_session::*;

+ 0 - 64
frontend/rust-lib/flowy-user/src/services/notifier.rs

@@ -1,64 +0,0 @@
-use crate::entities::UserProfilePB;
-use tokio::sync::{broadcast, mpsc};
-
-#[derive(Clone)]
-pub enum UserStatus {
-    Login {
-        token: String,
-        user_id: String,
-    },
-    Logout {
-        token: String,
-        user_id: String,
-    },
-    Expired {
-        token: String,
-        user_id: String,
-    },
-    SignUp {
-        profile: UserProfilePB,
-        ret: mpsc::Sender<()>,
-    },
-}
-
-pub struct UserNotifier {
-    user_status_notifier: broadcast::Sender<UserStatus>,
-}
-
-impl std::default::Default for UserNotifier {
-    fn default() -> Self {
-        let (user_status_notifier, _) = broadcast::channel(10);
-        UserNotifier { user_status_notifier }
-    }
-}
-
-impl UserNotifier {
-    pub(crate) fn new() -> Self {
-        UserNotifier::default()
-    }
-
-    pub(crate) fn notify_login(&self, token: &str, user_id: &str) {
-        let _ = self.user_status_notifier.send(UserStatus::Login {
-            token: token.to_owned(),
-            user_id: user_id.to_owned(),
-        });
-    }
-
-    pub(crate) fn notify_sign_up(&self, ret: mpsc::Sender<()>, user_profile: &UserProfilePB) {
-        let _ = self.user_status_notifier.send(UserStatus::SignUp {
-            profile: user_profile.clone(),
-            ret,
-        });
-    }
-
-    pub(crate) fn notify_logout(&self, token: &str, user_id: &str) {
-        let _ = self.user_status_notifier.send(UserStatus::Logout {
-            token: token.to_owned(),
-            user_id: user_id.to_owned(),
-        });
-    }
-
-    pub fn subscribe_user_status(&self) -> broadcast::Receiver<UserStatus> {
-        self.user_status_notifier.subscribe()
-    }
-}

+ 48 - 27
frontend/rust-lib/flowy-user/src/services/user_session.rs

@@ -1,12 +1,10 @@
 use crate::entities::{UserProfilePB, UserSettingPB};
+use crate::event_map::UserStatusCallback;
 use crate::{
     errors::{ErrorCode, FlowyError},
     event_map::UserCloudService,
     notification::*,
-    services::{
-        database::{UserDB, UserTable, UserTableChangeset},
-        notifier::UserNotifier,
-    },
+    services::database::{UserDB, UserTable, UserTableChangeset},
 };
 use flowy_sqlite::ConnectionPool;
 use flowy_sqlite::{
@@ -17,8 +15,8 @@ use flowy_sqlite::{
 };
 use serde::{Deserialize, Serialize};
 use std::sync::Arc;
-use tokio::sync::mpsc;
-use user_model::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserProfileParams};
+use tokio::sync::RwLock;
+use user_model::{SignInParams, SignInResponse, SignUpParams, SignUpResponse, UpdateUserProfileParams, UserProfile};
 
 pub struct UserSessionConfig {
     root_dir: String,
@@ -43,25 +41,26 @@ pub struct UserSession {
     database: UserDB,
     config: UserSessionConfig,
     cloud_service: Arc<dyn UserCloudService>,
-    pub notifier: UserNotifier,
+    user_status_callback: RwLock<Option<Arc<dyn UserStatusCallback>>>,
 }
 
 impl UserSession {
     pub fn new(config: UserSessionConfig, cloud_service: Arc<dyn UserCloudService>) -> Self {
         let db = UserDB::new(&config.root_dir);
-        let notifier = UserNotifier::new();
+        let user_status_callback = RwLock::new(None);
         Self {
             database: db,
             config,
             cloud_service,
-            notifier,
+            user_status_callback,
         }
     }
 
-    pub fn init(&self) {
+    pub async fn init<C: UserStatusCallback + 'static>(&self, user_status_callback: C) {
         if let Ok(session) = self.get_session() {
-            self.notifier.notify_login(&session.token, &session.user_id);
+            let _ = user_status_callback.did_sign_in(&session.token, &session.user_id).await;
         }
+        *self.user_status_callback.write().await = Some(Arc::new(user_status_callback));
     }
 
     pub fn db_connection(&self) -> Result<DBConnection, FlowyError> {
@@ -81,11 +80,13 @@ impl UserSession {
     }
 
     #[tracing::instrument(level = "debug", skip(self))]
-    pub async fn sign_in(&self, params: SignInParams) -> Result<UserProfilePB, FlowyError> {
+    pub async fn sign_in(&self, params: SignInParams) -> Result<UserProfile, FlowyError> {
         if self.is_user_login(&params.email) {
             match self.get_user_profile().await {
                 Ok(profile) => {
-                    send_sign_in_notification().payload(profile.clone()).send();
+                    send_sign_in_notification()
+                        .payload::<UserProfilePB>(profile.clone().into())
+                        .send();
                     Ok(profile)
                 }
                 Err(err) => Err(err),
@@ -94,16 +95,24 @@ impl UserSession {
             let resp = self.cloud_service.sign_in(params).await?;
             let session: Session = resp.clone().into();
             self.set_session(Some(session))?;
-            let user_table = self.save_user(resp.into()).await?;
-            let user_profile: UserProfilePB = user_table.into();
-            self.notifier.notify_login(&user_profile.token, &user_profile.id);
-            send_sign_in_notification().payload(user_profile.clone()).send();
+            let user_profile: UserProfile = self.save_user(resp.into()).await?.into();
+            let _ = self
+                .user_status_callback
+                .read()
+                .await
+                .as_ref()
+                .unwrap()
+                .did_sign_in(&user_profile.token, &user_profile.id)
+                .await;
+            send_sign_in_notification()
+                .payload::<UserProfilePB>(user_profile.clone().into())
+                .send();
             Ok(user_profile)
         }
     }
 
     #[tracing::instrument(level = "debug", skip(self))]
-    pub async fn sign_up(&self, params: SignUpParams) -> Result<UserProfilePB, FlowyError> {
+    pub async fn sign_up(&self, params: SignUpParams) -> Result<UserProfile, FlowyError> {
         if self.is_user_login(&params.email) {
             self.get_user_profile().await
         } else {
@@ -111,11 +120,15 @@ impl UserSession {
             let session: Session = resp.clone().into();
             self.set_session(Some(session))?;
             let user_table = self.save_user(resp.into()).await?;
-            let user_profile: UserProfilePB = user_table.into();
-            let (ret, mut tx) = mpsc::channel(1);
-            self.notifier.notify_sign_up(ret, &user_profile);
-
-            let _ = tx.recv().await;
+            let user_profile: UserProfile = user_table.into();
+            let _ = self
+                .user_status_callback
+                .read()
+                .await
+                .as_ref()
+                .unwrap()
+                .did_sign_up(&user_profile)
+                .await;
             Ok(user_profile)
         }
     }
@@ -127,7 +140,14 @@ impl UserSession {
             diesel::delete(dsl::user_table.filter(dsl::id.eq(&session.user_id))).execute(&*(self.db_connection()?))?;
         self.database.close_user_db(&session.user_id)?;
         self.set_session(None)?;
-        self.notifier.notify_logout(&session.token, &session.user_id);
+        let _ = self
+            .user_status_callback
+            .read()
+            .await
+            .as_ref()
+            .unwrap()
+            .did_expired(&session.token, &session.user_id)
+            .await;
         self.sign_out_on_server(&session.token).await?;
 
         Ok(())
@@ -140,8 +160,9 @@ impl UserSession {
         diesel_update_table!(user_table, changeset, &*self.db_connection()?);
 
         let user_profile = self.get_user_profile().await?;
+        let profile_pb: UserProfilePB = user_profile.into();
         send_notification(&session.token, UserNotification::UserProfileUpdated)
-            .payload(user_profile)
+            .payload(profile_pb)
             .send();
         self.update_user_on_server(&session.token, params).await?;
         Ok(())
@@ -151,7 +172,7 @@ impl UserSession {
         Ok(())
     }
 
-    pub async fn check_user(&self) -> Result<UserProfilePB, FlowyError> {
+    pub async fn check_user(&self) -> Result<UserProfile, FlowyError> {
         let (user_id, token) = self.get_session()?.into_part();
 
         let user = dsl::user_table
@@ -162,7 +183,7 @@ impl UserSession {
         Ok(user.into())
     }
 
-    pub async fn get_user_profile(&self) -> Result<UserProfilePB, FlowyError> {
+    pub async fn get_user_profile(&self) -> Result<UserProfile, FlowyError> {
         let (user_id, token) = self.get_session()?.into_part();
         let user = dsl::user_table
             .filter(user_table::id.eq(&user_id))