Browse Source

feat: user awareness (#3185)

* refactor: separate functions

* feat: init user awareness object

* test: create reminder event test

* docs: add documentation
Nathan.fooo 2 years ago
parent
commit
27b1f00e17
37 changed files with 913 additions and 497 deletions
  1. 9 7
      frontend/appflowy_tauri/src-tauri/Cargo.toml
  2. 26 10
      frontend/rust-lib/Cargo.lock
  3. 8 6
      frontend/rust-lib/Cargo.toml
  4. 2 2
      frontend/rust-lib/flowy-core/src/deps_resolve/collab_deps.rs
  5. 4 4
      frontend/rust-lib/flowy-core/src/deps_resolve/database_deps.rs
  6. 4 4
      frontend/rust-lib/flowy-core/src/deps_resolve/document_deps.rs
  7. 4 4
      frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps.rs
  8. 5 1
      frontend/rust-lib/flowy-core/src/integrate/server.rs
  9. 28 33
      frontend/rust-lib/flowy-core/src/lib.rs
  10. 2 2
      frontend/rust-lib/flowy-core/src/module.rs
  11. 1 1
      frontend/rust-lib/flowy-document2/tests/document/util.rs
  12. 4 0
      frontend/rust-lib/flowy-server/src/local_server/impls/user.rs
  13. 6 0
      frontend/rust-lib/flowy-server/src/self_host/impls/user.rs
  14. 23 3
      frontend/rust-lib/flowy-server/src/supabase/api/user.rs
  15. 1 0
      frontend/rust-lib/flowy-server/src/supabase/define.rs
  16. 6 6
      frontend/rust-lib/flowy-server/src/supabase/server.rs
  17. 3 3
      frontend/rust-lib/flowy-server/tests/supabase_test/util.rs
  18. 1 1
      frontend/rust-lib/flowy-test/tests/folder/supabase_test/helper.rs
  19. 1 0
      frontend/rust-lib/flowy-test/tests/user/local_test/mod.rs
  20. 33 0
      frontend/rust-lib/flowy-test/tests/user/local_test/user_awareness_test.rs
  21. 2 0
      frontend/rust-lib/flowy-user-deps/src/cloud.rs
  22. 1 0
      frontend/rust-lib/flowy-user/Cargo.toml
  23. 2 0
      frontend/rust-lib/flowy-user/src/entities/mod.rs
  24. 67 0
      frontend/rust-lib/flowy-user/src/entities/reminder.rs
  25. 1 1
      frontend/rust-lib/flowy-user/src/entities/user_profile.rs
  26. 101 78
      frontend/rust-lib/flowy-user/src/event_handler.rs
  27. 19 5
      frontend/rust-lib/flowy-user/src/event_map.rs
  28. 1 0
      frontend/rust-lib/flowy-user/src/lib.rs
  29. 100 314
      frontend/rust-lib/flowy-user/src/manager.rs
  30. 2 1
      frontend/rust-lib/flowy-user/src/migrations/define.rs
  31. 1 1
      frontend/rust-lib/flowy-user/src/migrations/historical_document.rs
  32. 5 2
      frontend/rust-lib/flowy-user/src/migrations/migration.rs
  33. 50 2
      frontend/rust-lib/flowy-user/src/services/entities.rs
  34. 91 0
      frontend/rust-lib/flowy-user/src/services/historical_user.rs
  35. 6 6
      frontend/rust-lib/flowy-user/src/services/mod.rs
  36. 168 0
      frontend/rust-lib/flowy-user/src/services/user_awareness.rs
  37. 125 0
      frontend/rust-lib/flowy-user/src/services/user_workspace.rs

+ 9 - 7
frontend/appflowy_tauri/src-tauri/Cargo.toml

@@ -34,13 +34,14 @@ default = ["custom-protocol"]
 custom-protocol = ["tauri/custom-protocol"]
 
 [patch.crates-io]
-collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3881ba" }
-collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3881ba" }
-collab-persistence = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3881ba" }
-collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3881ba" }
-collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3881ba" }
-appflowy-integrate = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3881ba" }
-collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3881ba" }
+collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7f26d5" }
+collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7f26d5" }
+collab-persistence = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7f26d5" }
+collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7f26d5" }
+collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7f26d5" }
+appflowy-integrate = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7f26d5" }
+collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7f26d5" }
+collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7f26d5" }
 
 #collab = { path = "../../../../AppFlowy-Collab/collab" }
 #collab-folder = { path = "../../../../AppFlowy-Collab/collab-folder" }
@@ -48,6 +49,7 @@ collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev =
 #collab-database = { path = "../../../../AppFlowy-Collab/collab-database" }
 #appflowy-integrate = { path = "../../../../AppFlowy-Collab/appflowy-integrate" }
 #collab-plugins = { path = "../../../../AppFlowy-Collab/collab-plugins" }
+#collab-user = { path = "../../../../AppFlowy-Collab/collab-user" }
 
 
 

+ 26 - 10
frontend/rust-lib/Cargo.lock

@@ -96,7 +96,7 @@ checksum = "9c7d0618f0e0b7e8ff11427422b64564d5fb0be1940354bfe2e0529b18a9d9b8"
 [[package]]
 name = "appflowy-integrate"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3881ba#3881bab021229020837ae65df604b9b87d0e8497"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7f26d5#7f26d568b87fb0a14242bfa018f8f1df0d03665c"
 dependencies = [
  "anyhow",
  "collab",
@@ -587,7 +587,7 @@ dependencies = [
 [[package]]
 name = "collab"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3881ba#3881bab021229020837ae65df604b9b87d0e8497"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7f26d5#7f26d568b87fb0a14242bfa018f8f1df0d03665c"
 dependencies = [
  "anyhow",
  "bytes",
@@ -605,7 +605,7 @@ dependencies = [
 [[package]]
 name = "collab-client-ws"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3881ba#3881bab021229020837ae65df604b9b87d0e8497"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7f26d5#7f26d568b87fb0a14242bfa018f8f1df0d03665c"
 dependencies = [
  "bytes",
  "collab-sync",
@@ -623,7 +623,7 @@ dependencies = [
 [[package]]
 name = "collab-database"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3881ba#3881bab021229020837ae65df604b9b87d0e8497"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7f26d5#7f26d568b87fb0a14242bfa018f8f1df0d03665c"
 dependencies = [
  "anyhow",
  "async-trait",
@@ -650,7 +650,7 @@ dependencies = [
 [[package]]
 name = "collab-derive"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3881ba#3881bab021229020837ae65df604b9b87d0e8497"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7f26d5#7f26d568b87fb0a14242bfa018f8f1df0d03665c"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -662,7 +662,7 @@ dependencies = [
 [[package]]
 name = "collab-document"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3881ba#3881bab021229020837ae65df604b9b87d0e8497"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7f26d5#7f26d568b87fb0a14242bfa018f8f1df0d03665c"
 dependencies = [
  "anyhow",
  "collab",
@@ -681,7 +681,7 @@ dependencies = [
 [[package]]
 name = "collab-folder"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3881ba#3881bab021229020837ae65df604b9b87d0e8497"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7f26d5#7f26d568b87fb0a14242bfa018f8f1df0d03665c"
 dependencies = [
  "anyhow",
  "chrono",
@@ -701,7 +701,7 @@ dependencies = [
 [[package]]
 name = "collab-persistence"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3881ba#3881bab021229020837ae65df604b9b87d0e8497"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7f26d5#7f26d568b87fb0a14242bfa018f8f1df0d03665c"
 dependencies = [
  "bincode",
  "chrono",
@@ -721,7 +721,7 @@ dependencies = [
 [[package]]
 name = "collab-plugins"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3881ba#3881bab021229020837ae65df604b9b87d0e8497"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7f26d5#7f26d568b87fb0a14242bfa018f8f1df0d03665c"
 dependencies = [
  "anyhow",
  "async-trait",
@@ -749,7 +749,7 @@ dependencies = [
 [[package]]
 name = "collab-sync"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=3881ba#3881bab021229020837ae65df604b9b87d0e8497"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7f26d5#7f26d568b87fb0a14242bfa018f8f1df0d03665c"
 dependencies = [
  "bytes",
  "collab",
@@ -768,6 +768,21 @@ dependencies = [
  "yrs",
 ]
 
+[[package]]
+name = "collab-user"
+version = "0.1.0"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=7f26d5#7f26d568b87fb0a14242bfa018f8f1df0d03665c"
+dependencies = [
+ "anyhow",
+ "collab",
+ "parking_lot 0.12.1",
+ "serde",
+ "serde_json",
+ "tokio",
+ "tokio-stream",
+ "tracing",
+]
+
 [[package]]
 name = "config"
 version = "0.10.1"
@@ -1667,6 +1682,7 @@ dependencies = [
  "collab",
  "collab-document",
  "collab-folder",
+ "collab-user",
  "diesel",
  "diesel_derives",
  "fake",

+ 8 - 6
frontend/rust-lib/Cargo.toml

@@ -38,12 +38,13 @@ opt-level = 3
 incremental = false
 
 [patch.crates-io]
-collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3881ba" }
-collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3881ba" }
-collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3881ba" }
-collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3881ba" }
-appflowy-integrate = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3881ba" }
-collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "3881ba" }
+collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7f26d5" }
+collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7f26d5" }
+collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7f26d5" }
+collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7f26d5" }
+appflowy-integrate = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7f26d5" }
+collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7f26d5" }
+collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "7f26d5" }
 
 #collab = { path = "../AppFlowy-Collab/collab" }
 #collab-folder = { path = "../AppFlowy-Collab/collab-folder" }
@@ -51,4 +52,5 @@ collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev =
 #collab-document = { path = "../AppFlowy-Collab/collab-document" }
 #collab-plugins = { path = "../AppFlowy-Collab/collab-plugins" }
 #appflowy-integrate = { path = "../AppFlowy-Collab/appflowy-integrate" }
+#collab-user = { path = "../AppFlowy-Collab/collab-user" }
 

+ 2 - 2
frontend/rust-lib/flowy-core/src/deps_resolve/collab_deps.rs

@@ -11,10 +11,10 @@ use flowy_sqlite::{
   prelude::*,
   schema::{collab_snapshot, collab_snapshot::dsl},
 };
-use flowy_user::services::UserSession;
+use flowy_user::manager::UserManager;
 use lib_infra::util::timestamp;
 
-pub struct SnapshotDBImpl(pub Weak<UserSession>);
+pub struct SnapshotDBImpl(pub Weak<UserManager>);
 
 impl SnapshotPersistence for SnapshotDBImpl {
   fn get_snapshots(&self, uid: i64, object_id: &str) -> Vec<CollabSnapshot> {

+ 4 - 4
frontend/rust-lib/flowy-core/src/deps_resolve/database_deps.rs

@@ -8,18 +8,18 @@ use flowy_database2::{DatabaseManager, DatabaseUser};
 use flowy_database_deps::cloud::DatabaseCloudService;
 use flowy_error::FlowyError;
 use flowy_task::TaskDispatcher;
-use flowy_user::services::UserSession;
+use flowy_user::manager::UserManager;
 
 pub struct DatabaseDepsResolver();
 
 impl DatabaseDepsResolver {
   pub async fn resolve(
-    user_session: Weak<UserSession>,
+    user_manager: Weak<UserManager>,
     task_scheduler: Arc<RwLock<TaskDispatcher>>,
     collab_builder: Arc<AppFlowyCollabBuilder>,
     cloud_service: Arc<dyn DatabaseCloudService>,
   ) -> Arc<DatabaseManager> {
-    let user = Arc::new(DatabaseUserImpl(user_session));
+    let user = Arc::new(DatabaseUserImpl(user_manager));
     Arc::new(DatabaseManager::new(
       user,
       task_scheduler,
@@ -29,7 +29,7 @@ impl DatabaseDepsResolver {
   }
 }
 
-struct DatabaseUserImpl(Weak<UserSession>);
+struct DatabaseUserImpl(Weak<UserManager>);
 impl DatabaseUser for DatabaseUserImpl {
   fn user_id(&self) -> Result<i64, FlowyError> {
     self

+ 4 - 4
frontend/rust-lib/flowy-core/src/deps_resolve/document_deps.rs

@@ -7,17 +7,17 @@ use flowy_database2::DatabaseManager;
 use flowy_document2::manager::{DocumentManager, DocumentUser};
 use flowy_document_deps::cloud::DocumentCloudService;
 use flowy_error::FlowyError;
-use flowy_user::services::UserSession;
+use flowy_user::manager::UserManager;
 
 pub struct DocumentDepsResolver();
 impl DocumentDepsResolver {
   pub fn resolve(
-    user_session: Weak<UserSession>,
+    user_manager: Weak<UserManager>,
     _database_manager: &Arc<DatabaseManager>,
     collab_builder: Arc<AppFlowyCollabBuilder>,
     cloud_service: Arc<dyn DocumentCloudService>,
   ) -> Arc<DocumentManager> {
-    let user: Arc<dyn DocumentUser> = Arc::new(DocumentUserImpl(user_session));
+    let user: Arc<dyn DocumentUser> = Arc::new(DocumentUserImpl(user_manager));
     Arc::new(DocumentManager::new(
       user.clone(),
       collab_builder,
@@ -26,7 +26,7 @@ impl DocumentDepsResolver {
   }
 }
 
-struct DocumentUserImpl(Weak<UserSession>);
+struct DocumentUserImpl(Weak<UserManager>);
 impl DocumentUser for DocumentUserImpl {
   fn user_id(&self) -> Result<i64, FlowyError> {
     self

+ 4 - 4
frontend/rust-lib/flowy-core/src/deps_resolve/folder_deps.rs

@@ -23,20 +23,20 @@ use flowy_folder2::view_operation::{
 };
 use flowy_folder2::ViewLayout;
 use flowy_folder_deps::cloud::FolderCloudService;
-use flowy_user::services::UserSession;
+use flowy_user::manager::UserManager;
 use lib_dispatch::prelude::ToBytes;
 use lib_infra::future::FutureResult;
 
 pub struct FolderDepsResolver();
 impl FolderDepsResolver {
   pub async fn resolve(
-    user_session: Weak<UserSession>,
+    user_manager: Weak<UserManager>,
     document_manager: &Arc<DocumentManager>,
     database_manager: &Arc<DatabaseManager>,
     collab_builder: Arc<AppFlowyCollabBuilder>,
     folder_cloud: Arc<dyn FolderCloudService>,
   ) -> Arc<FolderManager> {
-    let user: Arc<dyn FolderUser> = Arc::new(FolderUserImpl(user_session.clone()));
+    let user: Arc<dyn FolderUser> = Arc::new(FolderUserImpl(user_manager.clone()));
 
     let handlers = folder_operation_handlers(document_manager.clone(), database_manager.clone());
     Arc::new(
@@ -63,7 +63,7 @@ fn folder_operation_handlers(
   Arc::new(map)
 }
 
-struct FolderUserImpl(Weak<UserSession>);
+struct FolderUserImpl(Weak<UserManager>);
 impl FolderUser for FolderUserImpl {
   fn user_id(&self) -> Result<i64, FlowyError> {
     self

+ 5 - 1
frontend/rust-lib/flowy-core/src/integrate/server.rs

@@ -152,7 +152,7 @@ impl AppFlowyServerProvider {
 }
 
 impl UserCloudServiceProvider for AppFlowyServerProvider {
-  fn update_supabase_config(&self, supabase_config: &SupabaseConfiguration) {
+  fn set_supabase_config(&self, supabase_config: &SupabaseConfiguration) {
     self
       .supabase_config
       .write()
@@ -187,6 +187,10 @@ impl UserCloudServiceProvider for AppFlowyServerProvider {
     }
   }
 
+  fn set_device_id(&self, device_id: &str) {
+    *self.device_id.lock() = device_id.to_string();
+  }
+
   /// Returns the [UserService] base on the current [ServerProviderType].
   /// Creates a new [AppFlowyServer] if it doesn't exist.
   fn get_user_service(&self) -> Result<Arc<dyn UserService>, FlowyError> {

+ 28 - 33
frontend/rust-lib/flowy-core/src/lib.rs

@@ -1,5 +1,6 @@
 #![allow(unused_doc_comments)]
 
+use std::sync::Weak;
 use std::time::Duration;
 use std::{
   fmt,
@@ -20,7 +21,7 @@ use flowy_folder2::manager::{FolderInitializeData, FolderManager};
 use flowy_sqlite::kv::StorePreferences;
 use flowy_task::{TaskDispatcher, TaskRunner};
 use flowy_user::event_map::{SignUpContext, UserCloudServiceProvider, UserStatusCallback};
-use flowy_user::services::{get_supabase_config, UserSession, UserSessionConfig};
+use flowy_user::manager::{get_supabase_config, UserManager, UserSessionConfig};
 use flowy_user_deps::entities::{AuthType, UserProfile, UserWorkspace};
 use lib_dispatch::prelude::*;
 use lib_dispatch::runtime::tokio_default_runtime;
@@ -114,14 +115,14 @@ fn create_log_filter(level: String, with_crates: Vec<String>) -> String {
 pub struct AppFlowyCore {
   #[allow(dead_code)]
   pub config: AppFlowyCoreConfig,
-  pub user_session: Arc<UserSession>,
+  pub user_manager: Arc<UserManager>,
   pub document_manager: Arc<DocumentManager>,
   pub folder_manager: Arc<FolderManager>,
   pub database_manager: Arc<DatabaseManager>,
   pub event_dispatcher: Arc<AFPluginDispatcher>,
   pub server_provider: Arc<AppFlowyServerProvider>,
   pub task_dispatcher: Arc<RwLock<TaskDispatcher>>,
-  pub storage_preference: Arc<StorePreferences>,
+  pub store_preference: Arc<StorePreferences>,
 }
 
 impl AppFlowyCore {
@@ -153,23 +154,27 @@ impl AppFlowyCore {
     ));
 
     let (
-      user_session,
+      user_manager,
       folder_manager,
       server_provider,
       database_manager,
       document_manager,
       collab_builder,
     ) = runtime.block_on(async {
-      let user_session = mk_user_session(&config, &store_preference, server_provider.clone());
       /// The shared collab builder is used to build the [Collab] instance. The plugins will be loaded
       /// on demand based on the [CollabPluginConfig].
-      let collab_builder = Arc::new(AppFlowyCollabBuilder::new(
+      let collab_builder = Arc::new(AppFlowyCollabBuilder::new(server_provider.clone()));
+      let user_manager = mk_user_session(
+        &config,
+        &store_preference,
         server_provider.clone(),
-        Some(Arc::new(SnapshotDBImpl(Arc::downgrade(&user_session)))),
-      ));
+        Arc::downgrade(&collab_builder),
+      );
+      collab_builder
+        .set_snapshot_persistence(Arc::new(SnapshotDBImpl(Arc::downgrade(&user_manager))));
 
       let database_manager = DatabaseDepsResolver::resolve(
-        Arc::downgrade(&user_session),
+        Arc::downgrade(&user_manager),
         task_dispatcher.clone(),
         collab_builder.clone(),
         server_provider.clone(),
@@ -177,14 +182,14 @@ impl AppFlowyCore {
       .await;
 
       let document_manager = DocumentDepsResolver::resolve(
-        Arc::downgrade(&user_session),
+        Arc::downgrade(&user_manager),
         &database_manager,
         collab_builder.clone(),
         server_provider.clone(),
       );
 
       let folder_manager = FolderDepsResolver::resolve(
-        Arc::downgrade(&user_session),
+        Arc::downgrade(&user_manager),
         &document_manager,
         &database_manager,
         collab_builder.clone(),
@@ -193,7 +198,7 @@ impl AppFlowyCore {
       .await;
 
       (
-        user_session,
+        user_manager,
         folder_manager,
         server_provider,
         database_manager,
@@ -211,7 +216,7 @@ impl AppFlowyCore {
       config: config.clone(),
     };
 
-    let cloned_user_session = Arc::downgrade(&user_session);
+    let cloned_user_session = Arc::downgrade(&user_manager);
     runtime.block_on(async move {
       if let Some(user_session) = cloned_user_session.upgrade() {
         user_session.init(user_status_listener).await;
@@ -222,21 +227,21 @@ impl AppFlowyCore {
       make_plugins(
         Arc::downgrade(&folder_manager),
         Arc::downgrade(&database_manager),
-        Arc::downgrade(&user_session),
+        Arc::downgrade(&user_manager),
         Arc::downgrade(&document_manager),
       )
     }));
 
     Self {
       config,
-      user_session,
+      user_manager,
       document_manager,
       folder_manager,
       database_manager,
       event_dispatcher,
       server_provider,
       task_dispatcher,
-      storage_preference: store_preference,
+      store_preference,
     }
   }
 
@@ -260,12 +265,14 @@ fn mk_user_session(
   config: &AppFlowyCoreConfig,
   storage_preference: &Arc<StorePreferences>,
   user_cloud_service_provider: Arc<dyn UserCloudServiceProvider>,
-) -> Arc<UserSession> {
+  collab_builder: Weak<AppFlowyCollabBuilder>,
+) -> Arc<UserManager> {
   let user_config = UserSessionConfig::new(&config.name, &config.storage_path);
-  Arc::new(UserSession::new(
+  Arc::new(UserManager::new(
     user_config,
     user_cloud_service_provider,
     storage_preference.clone(),
+    collab_builder,
   ))
 }
 
@@ -286,7 +293,7 @@ impl UserStatusCallback for UserStatusCallbackImpl {
     &self,
     user_id: i64,
     user_workspace: &UserWorkspace,
-    device_id: &str,
+    _device_id: &str,
   ) -> Fut<FlowyResult<()>> {
     let user_id = user_id.to_owned();
     let user_workspace = user_workspace.clone();
@@ -295,9 +302,6 @@ impl UserStatusCallback for UserStatusCallbackImpl {
     let database_manager = self.database_manager.clone();
     let document_manager = self.document_manager.clone();
 
-    self.server_provider.set_sync_device(device_id);
-    self.collab_builder.set_sync_device(device_id.to_owned());
-
     to_fut(async move {
       collab_builder.initialize(user_workspace.id.clone());
       folder_manager
@@ -321,20 +325,15 @@ impl UserStatusCallback for UserStatusCallbackImpl {
     &self,
     user_id: i64,
     user_workspace: &UserWorkspace,
-    device_id: &str,
+    _device_id: &str,
   ) -> Fut<FlowyResult<()>> {
     let user_id = user_id.to_owned();
     let user_workspace = user_workspace.clone();
-    let collab_builder = self.collab_builder.clone();
     let folder_manager = self.folder_manager.clone();
     let database_manager = self.database_manager.clone();
     let document_manager = self.document_manager.clone();
 
-    self.server_provider.set_sync_device(device_id);
-    self.collab_builder.set_sync_device(device_id.to_owned());
-
     to_fut(async move {
-      collab_builder.initialize(user_workspace.id.clone());
       folder_manager
         .initialize_with_workspace_id(user_id, &user_workspace.id)
         .await?;
@@ -357,19 +356,15 @@ impl UserStatusCallback for UserStatusCallbackImpl {
     context: SignUpContext,
     user_profile: &UserProfile,
     user_workspace: &UserWorkspace,
-    device_id: &str,
+    _device_id: &str,
   ) -> Fut<FlowyResult<()>> {
     let user_profile = user_profile.clone();
-    let collab_builder = self.collab_builder.clone();
     let folder_manager = self.folder_manager.clone();
     let database_manager = self.database_manager.clone();
     let user_workspace = user_workspace.clone();
     let document_manager = self.document_manager.clone();
 
-    self.server_provider.set_sync_device(device_id);
-    self.collab_builder.set_sync_device(device_id.to_owned());
     to_fut(async move {
-      collab_builder.initialize(user_workspace.id.clone());
       folder_manager
         .initialize_with_new_user(
           user_profile.id,

+ 2 - 2
frontend/rust-lib/flowy-core/src/module.rs

@@ -3,13 +3,13 @@ use std::sync::Weak;
 use flowy_database2::DatabaseManager;
 use flowy_document2::manager::DocumentManager as DocumentManager2;
 use flowy_folder2::manager::FolderManager;
-use flowy_user::services::UserSession;
+use flowy_user::manager::UserManager;
 use lib_dispatch::prelude::AFPlugin;
 
 pub fn make_plugins(
   folder_manager: Weak<FolderManager>,
   database_manager: Weak<DatabaseManager>,
-  user_session: Weak<UserSession>,
+  user_session: Weak<UserManager>,
   document_manager2: Weak<DocumentManager2>,
 ) -> Vec<AFPlugin> {
   let store_preferences = user_session

+ 1 - 1
frontend/rust-lib/flowy-document2/tests/document/util.rs

@@ -81,7 +81,7 @@ pub fn db() -> Arc<RocksCollabDB> {
 }
 
 pub fn default_collab_builder() -> Arc<AppFlowyCollabBuilder> {
-  let builder = AppFlowyCollabBuilder::new(DefaultCollabStorageProvider(), None);
+  let builder = AppFlowyCollabBuilder::new(DefaultCollabStorageProvider());
   builder.set_sync_device(uuid::Uuid::new_v4().to_string());
   Arc::new(builder)
 }

+ 4 - 0
frontend/rust-lib/flowy-server/src/local_server/impls/user.rs

@@ -110,6 +110,10 @@ impl UserService for LocalServerUserAuthServiceImpl {
   ) -> FutureResult<(), Error> {
     FutureResult::new(async { Ok(()) })
   }
+
+  fn get_user_awareness_updates(&self, _uid: i64) -> FutureResult<Vec<Vec<u8>>, Error> {
+    FutureResult::new(async { Ok(vec![]) })
+  }
 }
 
 fn make_user_workspace() -> UserWorkspace {

+ 6 - 0
frontend/rust-lib/flowy-server/src/self_host/impls/user.rs

@@ -1,4 +1,5 @@
 use anyhow::Error;
+
 use flowy_error::{ErrorCode, FlowyError};
 use flowy_user_deps::cloud::UserService;
 use flowy_user_deps::entities::*;
@@ -121,6 +122,11 @@ impl UserService for SelfHostedUserAuthServiceImpl {
     // TODO(nathan): implement the RESTful API for this
     FutureResult::new(async { Ok(()) })
   }
+
+  fn get_user_awareness_updates(&self, _uid: i64) -> FutureResult<Vec<Vec<u8>>, Error> {
+    // TODO(nathan): implement the RESTful API for this
+    FutureResult::new(async { Ok(vec![]) })
+  }
 }
 
 pub async fn user_sign_up_request(

+ 23 - 3
frontend/rust-lib/flowy-server/src/supabase/api/user.rs

@@ -2,6 +2,7 @@ use std::str::FromStr;
 use std::sync::Arc;
 
 use anyhow::Error;
+use tokio::sync::oneshot::channel;
 use uuid::Uuid;
 
 use flowy_user_deps::cloud::*;
@@ -10,6 +11,7 @@ use flowy_user_deps::DEFAULT_USER_NAME;
 use lib_infra::box_any::BoxAny;
 use lib_infra::future::FutureResult;
 
+use crate::supabase::api::request::FetchObjectUpdateAction;
 use crate::supabase::api::util::{ExtendedResponse, InsertParamsBuilder};
 use crate::supabase::api::{PostgresWrapper, SupabaseServerService};
 use crate::supabase::define::*;
@@ -17,17 +19,17 @@ use crate::supabase::entities::GetUserProfileParams;
 use crate::supabase::entities::UidResponse;
 use crate::supabase::entities::UserProfileResponse;
 
-pub struct RESTfulSupabaseUserAuthServiceImpl<T> {
+pub struct SupabaseUserServiceImpl<T> {
   server: T,
 }
 
-impl<T> RESTfulSupabaseUserAuthServiceImpl<T> {
+impl<T> SupabaseUserServiceImpl<T> {
   pub fn new(server: T) -> Self {
     Self { server }
   }
 }
 
-impl<T> UserService for RESTfulSupabaseUserAuthServiceImpl<T>
+impl<T> UserService for SupabaseUserServiceImpl<T>
 where
   T: SupabaseServerService,
 {
@@ -201,6 +203,24 @@ where
   ) -> FutureResult<(), Error> {
     todo!()
   }
+
+  fn get_user_awareness_updates(&self, uid: i64) -> FutureResult<Vec<Vec<u8>>, Error> {
+    let try_get_postgrest = self.server.try_get_weak_postgrest();
+    let awareness_id = uid.to_string();
+    let (tx, rx) = channel();
+    tokio::spawn(async move {
+      tx.send(
+        async move {
+          let postgrest = try_get_postgrest?;
+          let action =
+            FetchObjectUpdateAction::new(awareness_id, CollabType::UserAwareness, postgrest);
+          action.run_with_fix_interval(5, 10).await
+        }
+        .await,
+      )
+    });
+    FutureResult::new(async { rx.await? })
+  }
 }
 
 async fn get_user_profile(

+ 1 - 0
frontend/rust-lib/flowy-server/src/supabase/define.rs

@@ -28,6 +28,7 @@ pub fn table_name(ty: &CollabType) -> String {
     CollabType::Database => format!("{}_database", AF_COLLAB_UPDATE_TABLE),
     CollabType::WorkspaceDatabase => format!("{}_w_database", AF_COLLAB_UPDATE_TABLE),
     CollabType::Folder => format!("{}_folder", AF_COLLAB_UPDATE_TABLE),
+    CollabType::UserAwareness => format!("{}_user_awareness", AF_COLLAB_UPDATE_TABLE),
   }
 }
 

+ 6 - 6
frontend/rust-lib/flowy-server/src/supabase/server.rs

@@ -12,9 +12,9 @@ use flowy_server_config::supabase_config::SupabaseConfiguration;
 use flowy_user_deps::cloud::UserService;
 
 use crate::supabase::api::{
-  RESTfulPostgresServer, RESTfulSupabaseUserAuthServiceImpl, SupabaseCollabStorageImpl,
-  SupabaseDatabaseServiceImpl, SupabaseDocumentServiceImpl, SupabaseFolderServiceImpl,
-  SupabaseServerServiceImpl,
+  RESTfulPostgresServer, SupabaseCollabStorageImpl, SupabaseDatabaseServiceImpl,
+  SupabaseDocumentServiceImpl, SupabaseFolderServiceImpl, SupabaseServerServiceImpl,
+  SupabaseUserServiceImpl,
 };
 use crate::supabase::entities::RealtimeCollabUpdateEvent;
 use crate::AppFlowyServer;
@@ -102,9 +102,9 @@ impl AppFlowyServer for SupabaseServer {
   }
 
   fn user_service(&self) -> Arc<dyn UserService> {
-    Arc::new(RESTfulSupabaseUserAuthServiceImpl::new(
-      SupabaseServerServiceImpl(self.restful_postgres.clone()),
-    ))
+    Arc::new(SupabaseUserServiceImpl::new(SupabaseServerServiceImpl(
+      self.restful_postgres.clone(),
+    )))
   }
 
   fn folder_service(&self) -> Arc<dyn FolderCloudService> {

+ 3 - 3
frontend/rust-lib/flowy-server/tests/supabase_test/util.rs

@@ -7,8 +7,8 @@ use uuid::Uuid;
 use flowy_database_deps::cloud::DatabaseCloudService;
 use flowy_folder_deps::cloud::FolderCloudService;
 use flowy_server::supabase::api::{
-  RESTfulPostgresServer, RESTfulSupabaseUserAuthServiceImpl, SupabaseCollabStorageImpl,
-  SupabaseDatabaseServiceImpl, SupabaseFolderServiceImpl, SupabaseServerServiceImpl,
+  RESTfulPostgresServer, SupabaseCollabStorageImpl, SupabaseDatabaseServiceImpl,
+  SupabaseFolderServiceImpl, SupabaseServerServiceImpl, SupabaseUserServiceImpl,
 };
 use flowy_server::supabase::define::{USER_EMAIL, USER_UUID};
 use flowy_server_config::supabase_config::SupabaseConfiguration;
@@ -42,7 +42,7 @@ pub fn database_service() -> Arc<dyn DatabaseCloudService> {
 pub fn user_auth_service() -> Arc<dyn UserService> {
   let config = SupabaseConfiguration::from_env().unwrap();
   let server = Arc::new(RESTfulPostgresServer::new(config));
-  Arc::new(RESTfulSupabaseUserAuthServiceImpl::new(
+  Arc::new(SupabaseUserServiceImpl::new(
     SupabaseServerServiceImpl::new(server),
   ))
 }

+ 1 - 1
frontend/rust-lib/flowy-test/tests/folder/supabase_test/helper.rs

@@ -44,7 +44,7 @@ impl FlowySupabaseFolderTest {
   pub async fn get_collab_update(&self, workspace_id: &str) -> Vec<u8> {
     let cloud_service = self.folder_manager.get_cloud_service().clone();
     let remote_updates = cloud_service
-      .get_folder_updates(workspace_id, self.user_session.user_id().unwrap())
+      .get_folder_updates(workspace_id, self.user_manager.user_id().unwrap())
       .await
       .unwrap();
 

+ 1 - 0
frontend/rust-lib/flowy-test/tests/user/local_test/mod.rs

@@ -1,3 +1,4 @@
 mod auth_test;
 mod helper;
+mod user_awareness_test;
 mod user_profile_test;

+ 33 - 0
frontend/rust-lib/flowy-test/tests/user/local_test/user_awareness_test.rs

@@ -0,0 +1,33 @@
+use flowy_test::event_builder::EventBuilder;
+use flowy_test::FlowyCoreTest;
+use flowy_user::entities::{ReminderPB, RepeatedReminderPB};
+use flowy_user::event_map::UserEvent::*;
+
+#[tokio::test]
+async fn user_update_with_name() {
+  let sdk = FlowyCoreTest::new();
+  let _ = sdk.sign_up_as_guest().await;
+  let payload = ReminderPB {
+    id: "".to_string(),
+    scheduled_at: 0,
+    is_ack: false,
+    ty: 0,
+    title: "".to_string(),
+    message: "".to_string(),
+    reminder_object_id: "".to_string(),
+  };
+  let _ = EventBuilder::new(sdk.clone())
+    .event(CreateReminder)
+    .payload(payload)
+    .async_send()
+    .await;
+
+  let reminders = EventBuilder::new(sdk.clone())
+    .event(GetAllReminders)
+    .async_send()
+    .await
+    .parse::<RepeatedReminderPB>()
+    .items;
+
+  assert_eq!(reminders.len(), 1);
+}

+ 2 - 0
frontend/rust-lib/flowy-user-deps/src/cloud.rs

@@ -58,6 +58,8 @@ pub trait UserService: Send + Sync {
     user_email: String,
     workspace_id: String,
   ) -> FutureResult<(), Error>;
+
+  fn get_user_awareness_updates(&self, uid: i64) -> FutureResult<Vec<Vec<u8>>, Error>;
 }
 
 pub fn third_party_params_from_box_any(any: BoxAny) -> Result<ThirdPartyParams, Error> {

+ 1 - 0
frontend/rust-lib/flowy-user/Cargo.toml

@@ -17,6 +17,7 @@ appflowy-integrate = { version = "0.1.0" }
 collab = { version = "0.1.0" }
 collab-folder = { version = "0.1.0" }
 collab-document = { version = "0.1.0" }
+collab-user = { version = "0.1.0" }
 flowy-user-deps = { path = "../flowy-user-deps" }
 
 tracing = { version = "0.1", features = ["log"] }

+ 2 - 0
frontend/rust-lib/flowy-user/src/entities/mod.rs

@@ -1,10 +1,12 @@
 pub use auth::*;
 pub use realtime::*;
+pub use reminder::*;
 pub use user_profile::*;
 pub use user_setting::*;
 
 pub mod auth;
 pub mod parser;
 pub mod realtime;
+mod reminder;
 mod user_profile;
 mod user_setting;

+ 67 - 0
frontend/rust-lib/flowy-user/src/entities/reminder.rs

@@ -0,0 +1,67 @@
+use collab_user::core::Reminder;
+
+use flowy_derive::ProtoBuf;
+
+#[derive(ProtoBuf, Default, Clone)]
+pub struct ReminderPB {
+  #[pb(index = 1)]
+  pub id: String,
+
+  #[pb(index = 2)]
+  pub scheduled_at: i64,
+
+  #[pb(index = 3)]
+  pub is_ack: bool,
+
+  #[pb(index = 4)]
+  pub ty: i64,
+
+  #[pb(index = 5)]
+  pub title: String,
+
+  #[pb(index = 6)]
+  pub message: String,
+
+  #[pb(index = 7)]
+  pub reminder_object_id: String,
+}
+
+#[derive(ProtoBuf, Default, Clone)]
+pub struct RepeatedReminderPB {
+  #[pb(index = 1)]
+  pub items: Vec<ReminderPB>,
+}
+
+impl From<ReminderPB> for Reminder {
+  fn from(value: ReminderPB) -> Self {
+    Self {
+      id: value.id,
+      scheduled_at: value.scheduled_at,
+      is_ack: value.is_ack,
+      ty: value.ty,
+      title: value.title,
+      message: value.message,
+      reminder_object_id: value.reminder_object_id,
+    }
+  }
+}
+
+impl From<Reminder> for ReminderPB {
+  fn from(value: Reminder) -> Self {
+    Self {
+      id: value.id,
+      scheduled_at: value.scheduled_at,
+      is_ack: value.is_ack,
+      ty: value.ty,
+      title: value.title,
+      message: value.message,
+      reminder_object_id: value.reminder_object_id,
+    }
+  }
+}
+
+impl From<Vec<ReminderPB>> for RepeatedReminderPB {
+  fn from(value: Vec<ReminderPB>) -> Self {
+    Self { items: value }
+  }
+}

+ 1 - 1
frontend/rust-lib/flowy-user/src/entities/user_profile.rs

@@ -6,7 +6,7 @@ use flowy_user_deps::entities::*;
 use crate::entities::parser::{UserEmail, UserIcon, UserName, UserOpenaiKey, UserPassword};
 use crate::entities::AuthTypePB;
 use crate::errors::ErrorCode;
-use crate::services::HistoricalUser;
+use crate::services::entities::HistoricalUser;
 
 #[derive(Default, ProtoBuf)]
 pub struct UserTokenPB {

+ 101 - 78
frontend/rust-lib/flowy-user/src/event_handler.rs

@@ -12,13 +12,13 @@ use lib_dispatch::prelude::*;
 use lib_infra::box_any::BoxAny;
 
 use crate::entities::*;
-use crate::services::{get_supabase_config, UserSession};
+use crate::manager::{get_supabase_config, UserManager};
 
-fn upgrade_session(session: AFPluginState<Weak<UserSession>>) -> FlowyResult<Arc<UserSession>> {
-  let session = session
+fn upgrade_manager(manager: AFPluginState<Weak<UserManager>>) -> FlowyResult<Arc<UserManager>> {
+  let manager = manager
     .upgrade()
     .ok_or(FlowyError::internal().context("The user session is already drop"))?;
-  Ok(session)
+  Ok(manager)
 }
 
 fn upgrade_store_preferences(
@@ -30,17 +30,17 @@ fn upgrade_store_preferences(
   Ok(store)
 }
 
-#[tracing::instrument(level = "debug", name = "sign_in", skip(data, session), fields(email = %data.email), err)]
+#[tracing::instrument(level = "debug", name = "sign_in", skip(data, manager), fields(email = %data.email), err)]
 pub async fn sign_in(
   data: AFPluginData<SignInPayloadPB>,
-  session: AFPluginState<Weak<UserSession>>,
+  manager: AFPluginState<Weak<UserManager>>,
 ) -> DataResult<UserProfilePB, FlowyError> {
-  let session = upgrade_session(session)?;
+  let manager = upgrade_manager(manager)?;
   let params: SignInParams = data.into_inner().try_into()?;
   let auth_type = params.auth_type.clone();
-  session.update_auth_type(&auth_type).await;
+  manager.update_auth_type(&auth_type).await;
 
-  let user_profile: UserProfilePB = session
+  let user_profile: UserProfilePB = manager
     .sign_in(BoxAny::new(params), auth_type)
     .await?
     .into();
@@ -50,7 +50,7 @@ pub async fn sign_in(
 #[tracing::instrument(
     level = "debug",
     name = "sign_up",
-    skip(data, session),
+    skip(data, manager),
     fields(
         email = %data.email,
         name = %data.name,
@@ -59,60 +59,60 @@ pub async fn sign_in(
 )]
 pub async fn sign_up(
   data: AFPluginData<SignUpPayloadPB>,
-  session: AFPluginState<Weak<UserSession>>,
+  manager: AFPluginState<Weak<UserManager>>,
 ) -> DataResult<UserProfilePB, FlowyError> {
-  let session = upgrade_session(session)?;
+  let manager = upgrade_manager(manager)?;
   let params: SignUpParams = data.into_inner().try_into()?;
   let auth_type = params.auth_type.clone();
-  session.update_auth_type(&auth_type).await;
+  manager.update_auth_type(&auth_type).await;
 
-  let user_profile = session.sign_up(auth_type, BoxAny::new(params)).await?;
+  let user_profile = manager.sign_up(auth_type, BoxAny::new(params)).await?;
   data_result_ok(user_profile.into())
 }
 
-#[tracing::instrument(level = "debug", skip(session))]
+#[tracing::instrument(level = "debug", skip(manager))]
 pub async fn init_user_handler(
-  session: AFPluginState<Weak<UserSession>>,
+  manager: AFPluginState<Weak<UserManager>>,
 ) -> Result<(), FlowyError> {
-  let session = upgrade_session(session)?;
-  session.init_user().await?;
+  let manager = upgrade_manager(manager)?;
+  manager.init_user().await?;
   Ok(())
 }
 
-#[tracing::instrument(level = "debug", skip(session))]
+#[tracing::instrument(level = "debug", skip(manager))]
 pub async fn check_user_handler(
-  session: AFPluginState<Weak<UserSession>>,
+  manager: AFPluginState<Weak<UserManager>>,
 ) -> Result<(), FlowyError> {
-  let session = upgrade_session(session)?;
-  session.check_user().await?;
+  let manager = upgrade_manager(manager)?;
+  manager.check_user().await?;
   Ok(())
 }
 
-#[tracing::instrument(level = "debug", skip(session))]
+#[tracing::instrument(level = "debug", skip(manager))]
 pub async fn get_user_profile_handler(
-  session: AFPluginState<Weak<UserSession>>,
+  manager: AFPluginState<Weak<UserManager>>,
 ) -> DataResult<UserProfilePB, FlowyError> {
-  let session = upgrade_session(session)?;
-  let uid = session.get_session()?.user_id;
-  let user_profile: UserProfilePB = session.get_user_profile(uid, true).await?.into();
+  let manager = upgrade_manager(manager)?;
+  let uid = manager.get_session()?.user_id;
+  let user_profile: UserProfilePB = manager.get_user_profile(uid, true).await?.into();
   data_result_ok(user_profile)
 }
 
-#[tracing::instrument(level = "debug", skip(session))]
-pub async fn sign_out(session: AFPluginState<Weak<UserSession>>) -> Result<(), FlowyError> {
-  let session = upgrade_session(session)?;
-  session.sign_out().await?;
+#[tracing::instrument(level = "debug", skip(manager))]
+pub async fn sign_out(manager: AFPluginState<Weak<UserManager>>) -> Result<(), FlowyError> {
+  let manager = upgrade_manager(manager)?;
+  manager.sign_out().await?;
   Ok(())
 }
 
-#[tracing::instrument(level = "debug", skip(data, session))]
+#[tracing::instrument(level = "debug", skip(data, manager))]
 pub async fn update_user_profile_handler(
   data: AFPluginData<UpdateUserProfilePayloadPB>,
-  session: AFPluginState<Weak<UserSession>>,
+  manager: AFPluginState<Weak<UserManager>>,
 ) -> Result<(), FlowyError> {
-  let session = upgrade_session(session)?;
+  let manager = upgrade_manager(manager)?;
   let params: UpdateUserProfileParams = data.into_inner().try_into()?;
-  session.update_user_profile(params).await?;
+  manager.update_user_profile(params).await?;
   Ok(())
 }
 
@@ -158,104 +158,103 @@ pub async fn get_appearance_setting(
 
 #[tracing::instrument(level = "debug", skip_all, err)]
 pub async fn get_user_setting(
-  session: AFPluginState<Weak<UserSession>>,
+  manager: AFPluginState<Weak<UserManager>>,
 ) -> DataResult<UserSettingPB, FlowyError> {
-  let session = upgrade_session(session)?;
-  let user_setting = session.user_setting()?;
+  let manager = upgrade_manager(manager)?;
+  let user_setting = manager.user_setting()?;
   data_result_ok(user_setting)
 }
 
 /// Only used for third party auth.
 /// Use [UserEvent::SignIn] or [UserEvent::SignUp] If the [AuthType] is Local or SelfHosted
-#[tracing::instrument(level = "debug", skip(data, session), err)]
+#[tracing::instrument(level = "debug", skip(data, manager), err)]
 pub async fn third_party_auth_handler(
   data: AFPluginData<ThirdPartyAuthPB>,
-  session: AFPluginState<Weak<UserSession>>,
+  manager: AFPluginState<Weak<UserManager>>,
 ) -> DataResult<UserProfilePB, FlowyError> {
-  let session = upgrade_session(session)?;
+  let manager = upgrade_manager(manager)?;
   let params = data.into_inner();
   let auth_type: AuthType = params.auth_type.into();
-  session.update_auth_type(&auth_type).await;
-  let user_profile = session.sign_up(auth_type, BoxAny::new(params.map)).await?;
+  manager.update_auth_type(&auth_type).await;
+  let user_profile = manager.sign_up(auth_type, BoxAny::new(params.map)).await?;
   data_result_ok(user_profile.into())
 }
 
-#[tracing::instrument(level = "debug", skip(data, session), err)]
+#[tracing::instrument(level = "debug", skip(data, manager), err)]
 pub async fn set_supabase_config_handler(
   data: AFPluginData<SupabaseConfigPB>,
-  session: AFPluginState<Weak<UserSession>>,
+  manager: AFPluginState<Weak<UserManager>>,
 ) -> Result<(), FlowyError> {
-  let session = upgrade_session(session)?;
+  let manager = upgrade_manager(manager)?;
   let config = SupabaseConfiguration::try_from(data.into_inner())?;
-  session.save_supabase_config(config);
+  manager.save_supabase_config(config);
   Ok(())
 }
 
 #[tracing::instrument(level = "debug", skip_all, err)]
 pub async fn get_supabase_config_handler(
   store_preferences: AFPluginState<Weak<StorePreferences>>,
-  _session: AFPluginState<Weak<UserSession>>,
 ) -> DataResult<SupabaseConfigPB, FlowyError> {
   let store_preferences = upgrade_store_preferences(store_preferences)?;
   let config = get_supabase_config(&store_preferences).unwrap_or_default();
   data_result_ok(config.into())
 }
 
-#[tracing::instrument(level = "debug", skip(session), err)]
+#[tracing::instrument(level = "debug", skip(manager), err)]
 pub async fn get_all_user_workspace_handler(
-  session: AFPluginState<Weak<UserSession>>,
+  manager: AFPluginState<Weak<UserManager>>,
 ) -> DataResult<RepeatedUserWorkspacePB, FlowyError> {
-  let session = upgrade_session(session)?;
-  let uid = session.get_session()?.user_id;
-  let user_workspaces = session.get_all_user_workspaces(uid)?;
+  let manager = upgrade_manager(manager)?;
+  let uid = manager.get_session()?.user_id;
+  let user_workspaces = manager.get_all_user_workspaces(uid)?;
   data_result_ok(user_workspaces.into())
 }
 
-#[tracing::instrument(level = "debug", skip(data, session), err)]
+#[tracing::instrument(level = "debug", skip(data, manager), err)]
 pub async fn open_workspace_handler(
   data: AFPluginData<UserWorkspacePB>,
-  session: AFPluginState<Weak<UserSession>>,
+  manager: AFPluginState<Weak<UserManager>>,
 ) -> Result<(), FlowyError> {
-  let session = upgrade_session(session)?;
+  let manager = upgrade_manager(manager)?;
   let params = data.into_inner();
-  session.open_workspace(&params.id).await?;
+  manager.open_workspace(&params.id).await?;
   Ok(())
 }
 
-#[tracing::instrument(level = "debug", skip(data, session), err)]
+#[tracing::instrument(level = "debug", skip(data, manager), err)]
 pub async fn add_user_to_workspace_handler(
   data: AFPluginData<AddWorkspaceUserPB>,
-  session: AFPluginState<Weak<UserSession>>,
+  manager: AFPluginState<Weak<UserManager>>,
 ) -> Result<(), FlowyError> {
-  let session = upgrade_session(session)?;
+  let manager = upgrade_manager(manager)?;
   let params = data.into_inner();
-  session
+  manager
     .add_user_to_workspace(params.email, params.workspace_id)
     .await?;
   Ok(())
 }
 
-#[tracing::instrument(level = "debug", skip(data, session), err)]
+#[tracing::instrument(level = "debug", skip(data, manager), err)]
 pub async fn remove_user_from_workspace_handler(
   data: AFPluginData<RemoveWorkspaceUserPB>,
-  session: AFPluginState<Weak<UserSession>>,
+  manager: AFPluginState<Weak<UserManager>>,
 ) -> Result<(), FlowyError> {
-  let session = upgrade_session(session)?;
+  let manager = upgrade_manager(manager)?;
   let params = data.into_inner();
-  session
+  manager
     .remove_user_to_workspace(params.email, params.workspace_id)
     .await?;
   Ok(())
 }
 
-#[tracing::instrument(level = "debug", skip(data, session), err)]
+#[tracing::instrument(level = "debug", skip(data, manager), err)]
 pub async fn update_network_state_handler(
   data: AFPluginData<NetworkStatePB>,
-  session: AFPluginState<Weak<UserSession>>,
+  manager: AFPluginState<Weak<UserManager>>,
 ) -> Result<(), FlowyError> {
-  let session = upgrade_session(session)?;
+  let manager = upgrade_manager(manager)?;
   let reachable = data.into_inner().ty.is_reachable();
-  session
+  manager
     .user_status_callback
     .read()
     .await
@@ -265,39 +264,63 @@ pub async fn update_network_state_handler(
 
 #[tracing::instrument(level = "debug", skip_all, err)]
 pub async fn get_historical_users_handler(
-  session: AFPluginState<Weak<UserSession>>,
+  manager: AFPluginState<Weak<UserManager>>,
 ) -> DataResult<RepeatedHistoricalUserPB, FlowyError> {
-  let session = upgrade_session(session)?;
-  let users = RepeatedHistoricalUserPB::from(session.get_historical_users());
+  let manager = upgrade_manager(manager)?;
+  let users = RepeatedHistoricalUserPB::from(manager.get_historical_users());
   data_result_ok(users)
 }
 
 #[tracing::instrument(level = "debug", skip_all, err)]
 pub async fn open_historical_users_handler(
   user: AFPluginData<HistoricalUserPB>,
-  session: AFPluginState<Weak<UserSession>>,
+  manager: AFPluginState<Weak<UserManager>>,
 ) -> Result<(), FlowyError> {
   let user = user.into_inner();
-  let session = upgrade_session(session)?;
+  let manager = upgrade_manager(manager)?;
   let auth_type = AuthType::from(user.auth_type);
-  session.open_historical_user(user.user_id, user.device_id, auth_type)?;
+  manager.open_historical_user(user.user_id, user.device_id, auth_type)?;
   Ok(())
 }
 
 #[tracing::instrument(level = "debug", skip_all, err)]
 pub async fn push_realtime_event_handler(
   payload: AFPluginData<RealtimePayloadPB>,
-  session: AFPluginState<Weak<UserSession>>,
+  manager: AFPluginState<Weak<UserManager>>,
 ) -> Result<(), FlowyError> {
   match serde_json::from_str::<Value>(&payload.into_inner().json_str) {
     Ok(json) => {
-      let session = upgrade_session(session)?;
-      session.receive_realtime_event(json).await;
+      let manager = upgrade_manager(manager)?;
+      manager.receive_realtime_event(json).await;
     },
     Err(e) => {
       tracing::error!("Deserialize RealtimePayload failed: {:?}", e);
     },
   }
+  Ok(())
+}
 
+#[tracing::instrument(level = "debug", skip_all, err)]
+pub async fn create_reminder_event_handler(
+  data: AFPluginData<ReminderPB>,
+  manager: AFPluginState<Weak<UserManager>>,
+) -> Result<(), FlowyError> {
+  let manager = upgrade_manager(manager)?;
+  let params = data.into_inner();
+  manager.add_reminder(params).await?;
   Ok(())
 }
+
+#[tracing::instrument(level = "debug", skip_all, err)]
+pub async fn get_all_reminder_event_handler(
+  manager: AFPluginState<Weak<UserManager>>,
+) -> DataResult<RepeatedReminderPB, FlowyError> {
+  let manager = upgrade_manager(manager)?;
+  let reminders = manager
+    .get_all_reminders()
+    .await
+    .into_iter()
+    .map(ReminderPB::from)
+    .collect::<Vec<_>>();
+  data_result_ok(reminders.into())
+}

+ 19 - 5
frontend/rust-lib/flowy-user/src/event_map.rs

@@ -12,10 +12,11 @@ use flowy_user_deps::entities::*;
 use lib_dispatch::prelude::*;
 use lib_infra::future::{to_fut, Fut};
 
+use crate::errors::FlowyError;
 use crate::event_handler::*;
-use crate::{errors::FlowyError, services::UserSession};
+use crate::manager::UserManager;
 
-pub fn init(user_session: Weak<UserSession>) -> AFPlugin {
+pub fn init(user_session: Weak<UserManager>) -> AFPlugin {
   let store_preferences = user_session
     .upgrade()
     .map(|session| session.get_store_preferences())
@@ -51,6 +52,8 @@ pub fn init(user_session: Weak<UserSession>) -> AFPlugin {
     .event(UserEvent::GetHistoricalUsers, get_historical_users_handler)
     .event(UserEvent::OpenHistoricalUser, open_historical_users_handler)
     .event(UserEvent::PushRealtimeEvent, push_realtime_event_handler)
+    .event(UserEvent::CreateReminder, create_reminder_event_handler)
+    .event(UserEvent::GetAllReminders, get_all_reminder_event_handler)
 }
 
 pub struct SignUpContext {
@@ -98,8 +101,9 @@ pub trait UserStatusCallback: Send + Sync + 'static {
 /// The user cloud service provider.
 /// The provider can be supabase, firebase, aws, or any other cloud service.
 pub trait UserCloudServiceProvider: Send + Sync + 'static {
-  fn update_supabase_config(&self, supabase_config: &SupabaseConfiguration);
+  fn set_supabase_config(&self, supabase_config: &SupabaseConfiguration);
   fn set_auth_type(&self, auth_type: AuthType);
+  fn set_device_id(&self, device_id: &str);
   fn get_user_service(&self) -> Result<Arc<dyn UserService>, FlowyError>;
   fn service_name(&self) -> String;
 }
@@ -108,14 +112,18 @@ impl<T> UserCloudServiceProvider for Arc<T>
 where
   T: UserCloudServiceProvider,
 {
-  fn update_supabase_config(&self, supabase_config: &SupabaseConfiguration) {
-    (**self).update_supabase_config(supabase_config)
+  fn set_supabase_config(&self, supabase_config: &SupabaseConfiguration) {
+    (**self).set_supabase_config(supabase_config)
   }
 
   fn set_auth_type(&self, auth_type: AuthType) {
     (**self).set_auth_type(auth_type)
   }
 
+  fn set_device_id(&self, device_id: &str) {
+    (**self).set_device_id(device_id)
+  }
+
   fn get_user_service(&self) -> Result<Arc<dyn UserService>, FlowyError> {
     (**self).get_user_service()
   }
@@ -247,4 +255,10 @@ pub enum UserEvent {
   /// when the auth type is: [AuthType::Supabase].
   #[event(input = "RealtimePayloadPB")]
   PushRealtimeEvent = 27,
+
+  #[event(input = "ReminderPB")]
+  CreateReminder = 28,
+
+  #[event(output = "RepeatedReminderPB")]
+  GetAllReminders = 29,
 }

+ 1 - 0
frontend/rust-lib/flowy-user/src/lib.rs

@@ -4,6 +4,7 @@ extern crate flowy_sqlite;
 pub mod entities;
 mod event_handler;
 pub mod event_map;
+pub mod manager;
 mod migrations;
 mod notification;
 pub mod protobuf;

+ 100 - 314
frontend/rust-lib/flowy-user/src/services/user_session.rs → frontend/rust-lib/flowy-user/src/manager.rs

@@ -1,25 +1,23 @@
-use std::convert::TryFrom;
 use std::string::ToString;
 use std::sync::{Arc, Weak};
 
+use appflowy_integrate::collab_builder::AppFlowyCollabBuilder;
 use appflowy_integrate::RocksCollabDB;
 use collab_folder::core::FolderData;
-use serde::{Deserialize, Serialize};
+use collab_user::core::MutexUserAwareness;
 use serde_json::Value;
-use tokio::sync::RwLock;
+use tokio::sync::{Mutex, RwLock};
 use uuid::Uuid;
 
-use flowy_error::{internal_error, ErrorCode, FlowyResult};
+use flowy_error::{internal_error, ErrorCode};
 use flowy_server_config::supabase_config::SupabaseConfiguration;
 use flowy_sqlite::kv::StorePreferences;
-use flowy_sqlite::schema::{user_table, user_workspace_table};
+use flowy_sqlite::schema::user_table;
 use flowy_sqlite::ConnectionPool;
 use flowy_sqlite::{query_dsl::*, DBConnection, ExpressionMethods};
 use flowy_user_deps::entities::*;
 use lib_infra::box_any::BoxAny;
-use lib_infra::util::timestamp;
 
-use crate::entities::{AuthTypePB, RepeatedUserWorkspacePB};
 use crate::entities::{UserProfilePB, UserSettingPB};
 use crate::event_map::{
   DefaultUserStatusCallback, SignUpContext, UserCloudServiceProvider, UserStatusCallback,
@@ -29,12 +27,12 @@ use crate::migrations::local_user_to_cloud::migration_user_to_cloud;
 use crate::migrations::migration::UserLocalDataMigration;
 use crate::migrations::UserMigrationContext;
 use crate::services::database::UserDB;
-use crate::services::session_serde::Session;
+use crate::services::entities::Session;
+use crate::services::user_awareness::UserAwarenessDataSource;
 use crate::services::user_sql::{UserTable, UserTableChangeset};
-use crate::services::user_workspace_sql::UserWorkspaceTable;
+use crate::services::user_workspace::save_user_workspaces;
 use crate::{errors::FlowyError, notification::*};
 
-const HISTORICAL_USER: &str = "af_historical_users";
 const SUPABASE_CONFIG_CACHE_KEY: &str = "af_supabase_config";
 
 pub struct UserSessionConfig {
@@ -56,19 +54,22 @@ impl UserSessionConfig {
   }
 }
 
-pub struct UserSession {
+pub struct UserManager {
   database: UserDB,
   session_config: UserSessionConfig,
-  cloud_services: Arc<dyn UserCloudServiceProvider>,
-  store_preferences: Arc<StorePreferences>,
+  pub(crate) cloud_services: Arc<dyn UserCloudServiceProvider>,
+  pub(crate) store_preferences: Arc<StorePreferences>,
+  pub(crate) user_awareness: Arc<Mutex<Option<MutexUserAwareness>>>,
   pub(crate) user_status_callback: RwLock<Arc<dyn UserStatusCallback>>,
+  pub(crate) collab_builder: Weak<AppFlowyCollabBuilder>,
 }
 
-impl UserSession {
+impl UserManager {
   pub fn new(
     session_config: UserSessionConfig,
     cloud_services: Arc<dyn UserCloudServiceProvider>,
     store_preferences: Arc<StorePreferences>,
+    collab_builder: Weak<AppFlowyCollabBuilder>,
   ) -> Self {
     let database = UserDB::new(&session_config.root_dir);
     let user_status_callback: RwLock<Arc<dyn UserStatusCallback>> =
@@ -78,7 +79,9 @@ impl UserSession {
       session_config,
       cloud_services,
       store_preferences,
+      user_awareness: Arc::new(Default::default()),
       user_status_callback,
+      collab_builder,
     }
   }
 
@@ -86,8 +89,15 @@ impl UserSession {
     Arc::downgrade(&self.store_preferences)
   }
 
+  /// Initializes the user session, including data migrations and user awareness configuration.
+  ///
+  /// This asynchronous function starts by retrieving the current session. If the session is successfully obtained,
+  /// it will attempt a local data migration for the user. After ensuring the user's data is migrated and up-to-date,
+  /// the function will set up the collaboration configuration and initialize the user's awareness. Upon successful
+  /// completion, a user status callback is invoked to signify that the initialization process is complete.
   pub async fn init<C: UserStatusCallback + 'static>(&self, user_status_callback: C) {
     if let Ok(session) = self.get_session() {
+      // Do the user data migration if needed
       match (
         self.database.get_collab_db(session.user_id),
         self.database.get_pool(session.user_id),
@@ -106,12 +116,16 @@ impl UserSession {
         },
         _ => tracing::error!("Failed to get collab db or sqlite pool"),
       }
-
+      self.set_collab_config(&session);
+      // Init the user awareness
+      self
+        .initialize_user_awareness(&session, UserAwarenessDataSource::Local)
+        .await;
       if let Err(e) = user_status_callback
         .did_init(session.user_id, &session.user_workspace, &session.device_id)
         .await
       {
-        tracing::error!("Failed to call did_sign_in callback: {:?}", e);
+        tracing::error!("Failed to call did_init callback: {:?}", e);
       }
     }
     *self.user_status_callback.write().await = Arc::new(user_status_callback);
@@ -121,12 +135,6 @@ impl UserSession {
     self.database.get_connection(uid)
   }
 
-  // The caller will be not 'Sync' before of the return value,
-  // PooledConnection<ConnectionManager> is not sync. You can use
-  // db_connection_pool function to require the ConnectionPool that is 'Sync'.
-  //
-  // let pool = self.db_connection_pool()?;
-  // let conn: PooledConnection<ConnectionManager> = pool.get()?;
   pub fn db_pool(&self, uid: i64) -> Result<Arc<ConnectionPool>, FlowyError> {
     self.database.get_pool(uid)
   }
@@ -138,17 +146,14 @@ impl UserSession {
       .map(|collab_db| Arc::downgrade(&collab_db))
   }
 
-  async fn migrate_local_user_to_cloud(
-    &self,
-    old_user: &UserMigrationContext,
-    new_user: &UserMigrationContext,
-  ) -> Result<Option<FolderData>, FlowyError> {
-    let old_collab_db = self.database.get_collab_db(old_user.session.user_id)?;
-    let new_collab_db = self.database.get_collab_db(new_user.session.user_id)?;
-    let folder_data = migration_user_to_cloud(old_user, &old_collab_db, new_user, &new_collab_db)?;
-    Ok(folder_data)
-  }
-
+  /// Performs a user sign-in, initializing user awareness and sending relevant notifications.
+  ///
+  /// This asynchronous function interacts with an external user service to authenticate and sign in a user
+  /// based on provided parameters. Once signed in, it updates the collaboration configuration, logs the user,
+  /// saves their workspaces, and initializes their user awareness.
+  ///
+  /// A sign-in notification is also sent after a successful sign-in.
+  ///
   #[tracing::instrument(level = "debug", skip(self, params))]
   pub async fn sign_in(
     &self,
@@ -163,29 +168,25 @@ impl UserSession {
     let session: Session = response.clone().into();
     let uid = session.user_id;
     let device_id = session.device_id.clone();
-    self.set_current_session(Some(session))?;
-
-    self.log_user(
+    self.set_collab_config(&session);
+    self.set_current_session(Some(session.clone()))?;
+    self.log_historical_user(
       uid,
       &response.device_id,
       response.name.clone(),
       &auth_type,
       self.user_dir(uid),
     );
-
     let user_workspace = response.latest_workspace.clone();
-    save_user_workspaces(
-      self.db_pool(uid)?,
-      response
-        .user_workspaces
-        .iter()
-        .flat_map(|user_workspace| UserWorkspaceTable::try_from((uid, user_workspace)).ok())
-        .collect(),
-    )?;
+    save_user_workspaces(uid, self.db_pool(uid)?, &response.user_workspaces)?;
     let user_profile: UserProfile = self
       .save_user(uid, (response, auth_type).into())
       .await?
       .into();
+    let _ = self
+      .initialize_user_awareness(&session, UserAwarenessDataSource::Remote)
+      .await;
+
     if let Err(e) = self
       .user_status_callback
       .read()
@@ -195,10 +196,10 @@ impl UserSession {
     {
       tracing::error!("Failed to call did_sign_in callback: {:?}", e);
     }
+
     send_sign_in_notification()
       .payload::<UserProfilePB>(user_profile.clone().into())
       .send();
-
     Ok(user_profile)
   }
 
@@ -212,6 +213,13 @@ impl UserSession {
     self.cloud_services.set_auth_type(auth_type.clone());
   }
 
+  /// Manages the user sign-up process, potentially migrating data if necessary.
+  ///
+  /// This asynchronous function interacts with an external authentication service to register and sign up a user
+  /// based on the provided parameters. Following a successful sign-up, it handles configuration updates, logging,
+  /// and saving workspace information. If a user is signing up with a new profile and previously had guest data,
+  /// this function may migrate that data over to the new account.
+  ///
   #[tracing::instrument(level = "debug", skip(self, params))]
   pub async fn sign_up(
     &self,
@@ -241,27 +249,25 @@ impl UserSession {
     };
     let new_session = Session::from(&response);
     self.set_current_session(Some(new_session.clone()))?;
+    self.set_collab_config(&new_session);
     let uid = response.user_id;
-    self.log_user(
+    self.log_historical_user(
       uid,
       &response.device_id,
       response.name.clone(),
       &auth_type,
       self.user_dir(uid),
     );
-    save_user_workspaces(
-      self.db_pool(uid)?,
-      response
-        .user_workspaces
-        .iter()
-        .flat_map(|user_workspace| UserWorkspaceTable::try_from((uid, user_workspace)).ok())
-        .collect(),
-    )?;
-    let user_table = self
+    save_user_workspaces(uid, self.db_pool(uid)?, &response.user_workspaces)?;
+    let new_user_profile: UserProfile = self
       .save_user(uid, (response, auth_type.clone()).into())
-      .await?;
-    let new_user_profile: UserProfile = user_table.into();
-
+      .await?
+      .into();
+    let user_awareness_source = if sign_up_context.is_new {
+      UserAwarenessDataSource::Local
+    } else {
+      UserAwarenessDataSource::Remote
+    };
     // Only migrate the data if the user is login in as a guest and sign up as a new user if the current
     // auth type is not [AuthType::Local].
     if sign_up_context.is_new {
@@ -271,7 +277,6 @@ impl UserSession {
             user_profile: new_user_profile.clone(),
             session: new_session.clone(),
           };
-
           tracing::info!(
             "Migrate old user data from {:?} to {:?}",
             old_user.user_profile.id,
@@ -281,13 +286,16 @@ impl UserSession {
             Ok(folder_data) => sign_up_context.local_folder = folder_data,
             Err(e) => tracing::error!("{:?}", e),
           }
-
           // close the old user db
           let _ = self.database.close(old_user.session.user_id);
         }
       }
     }
 
+    self
+      .initialize_user_awareness(&new_session, user_awareness_source)
+      .await;
+
     let _ = self
       .user_status_callback
       .read()
@@ -318,6 +326,12 @@ impl UserSession {
     Ok(())
   }
 
+  /// Updates the user's profile with the given parameters.
+  ///
+  /// This function modifies the user's profile based on the provided update parameters. After updating, it
+  /// sends a notification about the change. It's also responsible for handling interactions with the underlying
+  /// database and updates user profile.
+  ///
   #[tracing::instrument(level = "debug", skip(self))]
   pub async fn update_user_profile(
     &self,
@@ -366,50 +380,11 @@ impl UserSession {
     Ok(())
   }
 
-  pub async fn open_workspace(&self, workspace_id: &str) -> FlowyResult<()> {
-    let uid = self.user_id()?;
-    if let Some(user_workspace) = self.get_user_workspace(uid, workspace_id) {
-      if let Err(err) = self
-        .user_status_callback
-        .read()
-        .await
-        .open_workspace(uid, &user_workspace)
-        .await
-      {
-        tracing::error!("Open workspace failed: {:?}", err);
-      }
-    }
-    Ok(())
-  }
-
-  pub async fn add_user_to_workspace(
-    &self,
-    user_email: String,
-    to_workspace_id: String,
-  ) -> FlowyResult<()> {
-    self
-      .cloud_services
-      .get_user_service()?
-      .add_workspace_member(user_email, to_workspace_id)
-      .await?;
-    Ok(())
-  }
-
-  pub async fn remove_user_to_workspace(
-    &self,
-    user_email: String,
-    from_workspace_id: String,
-  ) -> FlowyResult<()> {
-    self
-      .cloud_services
-      .get_user_service()?
-      .remove_workspace_member(user_email, from_workspace_id)
-      .await?;
-    Ok(())
-  }
-
-  /// Get the user profile from the database
-  /// If the refresh is true, it will try to get the user profile from the server
+  /// Fetches the user profile for the given user ID.
+  ///
+  /// This function retrieves the user profile from the local database. If the `refresh` flag is set to `true`,
+  /// it also attempts to update the user profile from a cloud service, and then sends a notification about the
+  /// profile update.
   pub async fn get_user_profile(&self, uid: i64, refresh: bool) -> Result<UserProfile, FlowyError> {
     let user_id = uid.to_string();
     let user = user_table::dsl::user_table
@@ -467,7 +442,7 @@ impl UserSession {
   }
 
   pub fn save_supabase_config(&self, config: SupabaseConfiguration) {
-    self.cloud_services.update_supabase_config(&config);
+    self.cloud_services.set_supabase_config(&config);
     let _ = self
       .store_preferences
       .set_object(SUPABASE_CONFIG_CACHE_KEY, config);
@@ -507,78 +482,7 @@ impl UserSession {
     Ok(user)
   }
 
-  pub fn get_user_workspace(&self, uid: i64, workspace_id: &str) -> Option<UserWorkspace> {
-    let conn = self.db_connection(uid).ok()?;
-    let row = user_workspace_table::dsl::user_workspace_table
-      .filter(user_workspace_table::id.eq(workspace_id))
-      .first::<UserWorkspaceTable>(&*conn)
-      .ok()?;
-    Some(UserWorkspace::from(row))
-  }
-
-  pub fn get_all_user_workspaces(&self, uid: i64) -> FlowyResult<Vec<UserWorkspace>> {
-    let conn = self.db_connection(uid)?;
-    let rows = user_workspace_table::dsl::user_workspace_table
-      .filter(user_workspace_table::uid.eq(uid))
-      .load::<UserWorkspaceTable>(&*conn)?;
-
-    if let Ok(service) = self.cloud_services.get_user_service() {
-      if let Ok(pool) = self.db_pool(uid) {
-        tokio::spawn(async move {
-          if let Ok(new_user_workspaces) = service.get_user_workspaces(uid).await {
-            let _ = save_user_workspaces(
-              pool,
-              new_user_workspaces
-                .iter()
-                .flat_map(|user_workspace| UserWorkspaceTable::try_from((uid, user_workspace)).ok())
-                .collect(),
-            );
-
-            let repeated_workspace_pbs = RepeatedUserWorkspacePB::from(new_user_workspaces);
-            send_notification(&uid.to_string(), UserNotification::DidUpdateUserWorkspaces)
-              .payload(repeated_workspace_pbs)
-              .send();
-          }
-        });
-      }
-    }
-    Ok(rows.into_iter().map(UserWorkspace::from).collect())
-  }
-
-  pub async fn save_user_workspaces(
-    &self,
-    uid: i64,
-    user_workspaces: Vec<UserWorkspaceTable>,
-  ) -> FlowyResult<()> {
-    let conn = self.db_connection(uid)?;
-    conn.immediate_transaction(|| {
-      for user_workspace in user_workspaces {
-        if let Err(err) = diesel::update(
-          user_workspace_table::dsl::user_workspace_table
-            .filter(user_workspace_table::id.eq(user_workspace.id.clone())),
-        )
-        .set((
-          user_workspace_table::name.eq(&user_workspace.name),
-          user_workspace_table::created_at.eq(&user_workspace.created_at),
-          user_workspace_table::database_storage_id.eq(&user_workspace.database_storage_id),
-        ))
-        .execute(&*conn)
-        .and_then(|rows| {
-          if rows == 0 {
-            let _ = diesel::insert_into(user_workspace_table::table)
-              .values(user_workspace)
-              .execute(&*conn)?;
-          }
-          Ok(())
-        }) {
-          tracing::error!("Error saving user workspace: {:?}", err);
-        }
-      }
-      Ok::<(), FlowyError>(())
-    })
-  }
-
-  fn set_current_session(&self, session: Option<Session>) -> Result<(), FlowyError> {
+  pub(crate) fn set_current_session(&self, session: Option<Session>) -> Result<(), FlowyError> {
     tracing::debug!("Set current user: {:?}", session);
     match &session {
       None => self
@@ -594,63 +498,6 @@ impl UserSession {
     Ok(())
   }
 
-  fn log_user(
-    &self,
-    uid: i64,
-    device_id: &str,
-    user_name: String,
-    auth_type: &AuthType,
-    storage_path: String,
-  ) {
-    let mut logger_users = self
-      .store_preferences
-      .get_object::<HistoricalUsers>(HISTORICAL_USER)
-      .unwrap_or_default();
-    logger_users.add_user(HistoricalUser {
-      user_id: uid,
-      user_name,
-      auth_type: auth_type.clone(),
-      sign_in_timestamp: timestamp(),
-      storage_path,
-      device_id: device_id.to_string(),
-    });
-    let _ = self
-      .store_preferences
-      .set_object(HISTORICAL_USER, logger_users);
-  }
-
-  pub fn get_historical_users(&self) -> Vec<HistoricalUser> {
-    let mut users = self
-      .store_preferences
-      .get_object::<HistoricalUsers>(HISTORICAL_USER)
-      .unwrap_or_default()
-      .users;
-    users.sort_by(|a, b| b.sign_in_timestamp.cmp(&a.sign_in_timestamp));
-    users
-  }
-
-  pub fn open_historical_user(
-    &self,
-    uid: i64,
-    device_id: String,
-    auth_type: AuthType,
-  ) -> FlowyResult<()> {
-    let conn = self.db_connection(uid)?;
-    let row = user_workspace_table::dsl::user_workspace_table
-      .filter(user_workspace_table::uid.eq(uid))
-      .first::<UserWorkspaceTable>(&*conn)?;
-    let user_workspace = UserWorkspace::from(row);
-    let session = Session {
-      user_id: uid,
-      device_id,
-      user_workspace,
-    };
-    debug_assert!(auth_type.is_local());
-    self.cloud_services.set_auth_type(auth_type);
-    self.set_current_session(Some(session))?;
-    Ok(())
-  }
-
   pub async fn receive_realtime_event(&self, json: Value) {
     self
       .user_status_callback
@@ -672,6 +519,24 @@ impl UserSession {
       Some(session) => Ok(session),
     }
   }
+
+  fn set_collab_config(&self, session: &Session) {
+    let collab_builder = self.collab_builder.upgrade().unwrap();
+    collab_builder.set_sync_device(session.device_id.clone());
+    collab_builder.initialize(session.user_workspace.id.clone());
+    self.cloud_services.set_device_id(&session.device_id);
+  }
+
+  async fn migrate_local_user_to_cloud(
+    &self,
+    old_user: &UserMigrationContext,
+    new_user: &UserMigrationContext,
+  ) -> Result<Option<FolderData>, FlowyError> {
+    let old_collab_db = self.database.get_collab_db(old_user.session.user_id)?;
+    let new_collab_db = self.database.get_collab_db(new_user.session.user_id)?;
+    let folder_data = migration_user_to_cloud(old_user, &old_collab_db, new_user, &new_collab_db)?;
+    Ok(folder_data)
+  }
 }
 
 pub fn get_supabase_config(
@@ -682,82 +547,3 @@ pub fn get_supabase_config(
     .and_then(|s| serde_json::from_str(&s).ok())
     .unwrap_or_else(|| SupabaseConfiguration::from_env().ok())
 }
-
-pub fn save_user_workspaces(
-  pool: Arc<ConnectionPool>,
-  user_workspaces: Vec<UserWorkspaceTable>,
-) -> FlowyResult<()> {
-  let conn = pool.get()?;
-  conn.immediate_transaction(|| {
-    for user_workspace in user_workspaces {
-      if let Err(err) = diesel::update(
-        user_workspace_table::dsl::user_workspace_table
-          .filter(user_workspace_table::id.eq(user_workspace.id.clone())),
-      )
-      .set((
-        user_workspace_table::name.eq(&user_workspace.name),
-        user_workspace_table::created_at.eq(&user_workspace.created_at),
-        user_workspace_table::database_storage_id.eq(&user_workspace.database_storage_id),
-      ))
-      .execute(&*conn)
-      .and_then(|rows| {
-        if rows == 0 {
-          let _ = diesel::insert_into(user_workspace_table::table)
-            .values(user_workspace)
-            .execute(&*conn)?;
-        }
-        Ok(())
-      }) {
-        tracing::error!("Error saving user workspace: {:?}", err);
-      }
-    }
-    Ok::<(), FlowyError>(())
-  })
-}
-
-impl From<AuthTypePB> for AuthType {
-  fn from(pb: AuthTypePB) -> Self {
-    match pb {
-      AuthTypePB::Supabase => AuthType::Supabase,
-      AuthTypePB::Local => AuthType::Local,
-      AuthTypePB::SelfHosted => AuthType::SelfHosted,
-    }
-  }
-}
-
-impl From<AuthType> for AuthTypePB {
-  fn from(auth_type: AuthType) -> Self {
-    match auth_type {
-      AuthType::Supabase => AuthTypePB::Supabase,
-      AuthType::Local => AuthTypePB::Local,
-      AuthType::SelfHosted => AuthTypePB::SelfHosted,
-    }
-  }
-}
-
-#[derive(Debug, Clone, Default, Serialize, Deserialize)]
-pub struct HistoricalUsers {
-  pub(crate) users: Vec<HistoricalUser>,
-}
-
-impl HistoricalUsers {
-  pub fn add_user(&mut self, new_user: HistoricalUser) {
-    self.users.retain(|user| user.user_id != new_user.user_id);
-    self.users.push(new_user);
-  }
-}
-
-#[derive(Debug, Clone, Default, Serialize, Deserialize)]
-pub struct HistoricalUser {
-  pub user_id: i64,
-  #[serde(default = "flowy_user_deps::DEFAULT_USER_NAME")]
-  pub user_name: String,
-  #[serde(default = "DEFAULT_AUTH_TYPE")]
-  pub auth_type: AuthType,
-  pub sign_in_timestamp: i64,
-  pub storage_path: String,
-  #[serde(default)]
-  pub device_id: String,
-}
-
-const DEFAULT_AUTH_TYPE: fn() -> AuthType = || AuthType::Local;

+ 2 - 1
frontend/rust-lib/flowy-user/src/migrations/define.rs

@@ -1,6 +1,7 @@
-use crate::services::session_serde::Session;
 use flowy_user_deps::entities::UserProfile;
 
+use crate::services::entities::Session;
+
 pub struct UserMigrationContext {
   pub user_profile: UserProfile,
   pub session: Session,

+ 1 - 1
frontend/rust-lib/flowy-user/src/migrations/historical_document.rs

@@ -10,7 +10,7 @@ use collab_folder::core::Folder;
 use flowy_error::{internal_error, FlowyResult};
 
 use crate::migrations::migration::UserDataMigration;
-use crate::services::session_serde::Session;
+use crate::services::entities::Session;
 
 /// Migrate the first level documents of the workspace by inserting documents
 pub struct HistoricalEmptyDocumentMigration;

+ 5 - 2
frontend/rust-lib/flowy-user/src/migrations/migration.rs

@@ -1,11 +1,14 @@
-use crate::services::session_serde::Session;
+use std::sync::Arc;
+
 use appflowy_integrate::RocksCollabDB;
 use chrono::NaiveDateTime;
 use diesel::{RunQueryDsl, SqliteConnection};
+
 use flowy_error::FlowyResult;
 use flowy_sqlite::schema::user_data_migration_records;
 use flowy_sqlite::ConnectionPool;
-use std::sync::Arc;
+
+use crate::services::entities::Session;
 
 pub struct UserLocalDataMigration {
   session: Session,

+ 50 - 2
frontend/rust-lib/flowy-user/src/services/session_serde.rs → frontend/rust-lib/flowy-user/src/services/entities.rs

@@ -4,12 +4,14 @@ use base64::engine::general_purpose::STANDARD;
 use base64::Engine;
 use chrono::prelude::*;
 use serde::de::{Deserializer, MapAccess, Visitor};
-use serde::Deserialize;
-use serde::Serialize;
+use serde::{Deserialize, Serialize};
 use serde_json::Value;
 
+use flowy_user_deps::entities::AuthType;
 use flowy_user_deps::entities::{SignInResponse, SignUpResponse, UserWorkspace};
 
+use crate::entities::AuthTypePB;
+
 #[derive(Debug, Clone, Serialize)]
 pub struct Session {
   pub user_id: i64,
@@ -160,3 +162,49 @@ mod tests {
     );
   }
 }
+
+impl From<AuthTypePB> for AuthType {
+  fn from(pb: AuthTypePB) -> Self {
+    match pb {
+      AuthTypePB::Supabase => AuthType::Supabase,
+      AuthTypePB::Local => AuthType::Local,
+      AuthTypePB::SelfHosted => AuthType::SelfHosted,
+    }
+  }
+}
+
+impl From<AuthType> for AuthTypePB {
+  fn from(auth_type: AuthType) -> Self {
+    match auth_type {
+      AuthType::Supabase => AuthTypePB::Supabase,
+      AuthType::Local => AuthTypePB::Local,
+      AuthType::SelfHosted => AuthTypePB::SelfHosted,
+    }
+  }
+}
+
+#[derive(Debug, Clone, Default, Serialize, Deserialize)]
+pub struct HistoricalUsers {
+  pub(crate) users: Vec<HistoricalUser>,
+}
+
+impl HistoricalUsers {
+  pub fn add_user(&mut self, new_user: HistoricalUser) {
+    self.users.retain(|user| user.user_id != new_user.user_id);
+    self.users.push(new_user);
+  }
+}
+
+#[derive(Debug, Clone, Default, Serialize, Deserialize)]
+pub struct HistoricalUser {
+  pub user_id: i64,
+  #[serde(default = "flowy_user_deps::DEFAULT_USER_NAME")]
+  pub user_name: String,
+  #[serde(default = "DEFAULT_AUTH_TYPE")]
+  pub auth_type: AuthType,
+  pub sign_in_timestamp: i64,
+  pub storage_path: String,
+  #[serde(default)]
+  pub device_id: String,
+}
+const DEFAULT_AUTH_TYPE: fn() -> AuthType = || AuthType::Local;

+ 91 - 0
frontend/rust-lib/flowy-user/src/services/historical_user.rs

@@ -0,0 +1,91 @@
+use diesel::RunQueryDsl;
+
+use flowy_error::FlowyResult;
+use flowy_sqlite::schema::user_workspace_table;
+use flowy_sqlite::{query_dsl::*, ExpressionMethods};
+use flowy_user_deps::entities::{AuthType, UserWorkspace};
+use lib_infra::util::timestamp;
+
+use crate::manager::UserManager;
+use crate::services::entities::{HistoricalUser, HistoricalUsers, Session};
+use crate::services::user_workspace_sql::UserWorkspaceTable;
+
+const HISTORICAL_USER: &str = "af_historical_users";
+impl UserManager {
+  /// Logs a user's details for historical tracking.
+  ///
+  /// This function adds a user's details to a local historical tracking system, useful for
+  /// keeping track of past sign-ins or any other historical activities.
+  ///
+  /// # Parameters
+  /// - `uid`: The user ID.
+  /// - `device_id`: The ID of the device the user is using.
+  /// - `user_name`: The name of the user.
+  /// - `auth_type`: The type of authentication used.
+  /// - `storage_path`: Path where user data is stored.
+  ///
+  pub fn log_historical_user(
+    &self,
+    uid: i64,
+    device_id: &str,
+    user_name: String,
+    auth_type: &AuthType,
+    storage_path: String,
+  ) {
+    let mut logger_users = self
+      .store_preferences
+      .get_object::<HistoricalUsers>(HISTORICAL_USER)
+      .unwrap_or_default();
+    logger_users.add_user(HistoricalUser {
+      user_id: uid,
+      user_name,
+      auth_type: auth_type.clone(),
+      sign_in_timestamp: timestamp(),
+      storage_path,
+      device_id: device_id.to_string(),
+    });
+    let _ = self
+      .store_preferences
+      .set_object(HISTORICAL_USER, logger_users);
+  }
+
+  /// Fetches a list of historical users, sorted by their sign-in timestamp.
+  ///
+  /// This function retrieves a list of users who have previously been logged for historical tracking.
+  pub fn get_historical_users(&self) -> Vec<HistoricalUser> {
+    let mut users = self
+      .store_preferences
+      .get_object::<HistoricalUsers>(HISTORICAL_USER)
+      .unwrap_or_default()
+      .users;
+    users.sort_by(|a, b| b.sign_in_timestamp.cmp(&a.sign_in_timestamp));
+    users
+  }
+
+  /// Opens a historical user's session based on their user ID, device ID, and authentication type.
+  ///
+  /// This function facilitates the re-opening of a user's session from historical tracking.
+  /// It retrieves the user's workspace and establishes a new session for the user.
+  ///
+  pub fn open_historical_user(
+    &self,
+    uid: i64,
+    device_id: String,
+    auth_type: AuthType,
+  ) -> FlowyResult<()> {
+    let conn = self.db_connection(uid)?;
+    let row = user_workspace_table::dsl::user_workspace_table
+      .filter(user_workspace_table::uid.eq(uid))
+      .first::<UserWorkspaceTable>(&*conn)?;
+    let user_workspace = UserWorkspace::from(row);
+    let session = Session {
+      user_id: uid,
+      device_id,
+      user_workspace,
+    };
+    debug_assert!(auth_type.is_local());
+    self.cloud_services.set_auth_type(auth_type);
+    self.set_current_session(Some(session))?;
+    Ok(())
+  }
+}

+ 6 - 6
frontend/rust-lib/flowy-user/src/services/mod.rs

@@ -1,7 +1,7 @@
-pub use user_session::*;
-
 pub mod database;
-pub mod session_serde;
-mod user_session;
-mod user_sql;
-mod user_workspace_sql;
+pub mod entities;
+pub(crate) mod historical_user;
+pub(crate) mod user_awareness;
+pub(crate) mod user_sql;
+pub(crate) mod user_workspace;
+pub(crate) mod user_workspace_sql;

+ 168 - 0
frontend/rust-lib/flowy-user/src/services/user_awareness.rs

@@ -0,0 +1,168 @@
+use std::sync::{Arc, Weak};
+
+use appflowy_integrate::{CollabType, RocksCollabDB};
+use collab::core::collab::{CollabRawData, MutexCollab};
+use collab_user::core::{MutexUserAwareness, Reminder, UserAwareness};
+
+use flowy_error::{ErrorCode, FlowyError, FlowyResult};
+
+use crate::entities::ReminderPB;
+use crate::manager::UserManager;
+use crate::services::entities::Session;
+
+impl UserManager {
+  /// Adds a new reminder based on the given payload.
+  ///
+  /// This function creates a new `Reminder` from the provided payload and adds it to the user's reminders.
+  /// It leverages the `with_awareness` function to ensure the reminder is added in the context of the
+  /// current user's awareness.
+  ///
+  /// # Parameters
+  /// - `reminder_pb`: The pb for the new reminder.
+  ///
+  /// # Returns
+  /// - Returns `Ok(())` if the reminder is successfully added.
+  /// - May return errors of type `FlowyError` if any issues arise during the process.
+  ///
+  pub async fn add_reminder(&self, reminder_pb: ReminderPB) -> FlowyResult<()> {
+    let reminder = Reminder::from(reminder_pb);
+    self
+      .with_awareness((), |user_awareness| {
+        user_awareness.add_reminder(reminder);
+      })
+      .await;
+    Ok(())
+  }
+
+  /// Retrieves all reminders for the user.
+  ///
+  /// This function fetches all reminders associated with the current user. It leverages the
+  /// `with_awareness` function to ensure the reminders are retrieved in the context of the
+  /// current user's awareness.
+  ///
+  /// # Returns
+  /// - Returns a vector of `Reminder` objects containing all reminders for the user.
+  ///
+  pub async fn get_all_reminders(&self) -> Vec<Reminder> {
+    self
+      .with_awareness(vec![], |user_awareness| user_awareness.get_all_reminders())
+      .await
+  }
+
+  pub async fn initialize_user_awareness(
+    &self,
+    session: &Session,
+    source: UserAwarenessDataSource,
+  ) {
+    match self.try_initial_user_awareness(session, source).await {
+      Ok(_) => {
+        tracing::trace!("User awareness initialized");
+      },
+      Err(e) => {
+        tracing::error!("Failed to initialize user awareness: {:?}", e);
+      },
+    }
+  }
+
+  /// Initializes the user's awareness based on the specified data source.
+  ///
+  /// This asynchronous function attempts to initialize the user's awareness from either a local or remote data source.
+  /// Depending on the chosen source, it will either construct the user awareness from an empty dataset or fetch it
+  /// from a remote service. Once obtained, the user's awareness is stored in a shared mutex-protected structure.
+  ///
+  /// # Parameters
+  /// - `session`: The current user's session data.
+  /// - `source`: The source from which the user's awareness data should be obtained, either local or remote.
+  ///
+  /// # Returns
+  /// - Returns `Ok(())` if the user's awareness is successfully initialized.
+  /// - May return errors of type `FlowyError` if any issues arise during the initialization.
+  async fn try_initial_user_awareness(
+    &self,
+    session: &Session,
+    source: UserAwarenessDataSource,
+  ) -> FlowyResult<()> {
+    tracing::trace!("Initializing user awareness from {:?}", source);
+    let collab_db = self.get_collab_db(session.user_id)?;
+    let user_awareness = match source {
+      UserAwarenessDataSource::Local => {
+        let collab = self.collab_for_user_awareness(session, collab_db, vec![])?;
+        MutexUserAwareness::new(UserAwareness::create(collab, None))
+      },
+      UserAwarenessDataSource::Remote => {
+        let data = self
+          .cloud_services
+          .get_user_service()?
+          .get_user_awareness_updates(session.user_id)
+          .await?;
+        let collab = self.collab_for_user_awareness(session, collab_db, data)?;
+        MutexUserAwareness::new(UserAwareness::create(collab, None))
+      },
+    };
+    self.user_awareness.lock().await.replace(user_awareness);
+    Ok(())
+  }
+
+  /// Creates a collaboration instance tailored for user awareness.
+  ///
+  /// This function constructs a collaboration instance based on the given session and raw data,
+  /// using a collaboration builder. This instance is specifically geared towards handling
+  /// user awareness.
+  fn collab_for_user_awareness(
+    &self,
+    session: &Session,
+    collab_db: Weak<RocksCollabDB>,
+    raw_data: CollabRawData,
+  ) -> Result<Arc<MutexCollab>, FlowyError> {
+    let collab_builder = self.collab_builder.upgrade().ok_or(FlowyError::new(
+      ErrorCode::Internal,
+      "Unexpected error: collab builder is not available",
+    ))?;
+    let collab = collab_builder.build(
+      session.user_id,
+      &session.user_id.to_string(),
+      CollabType::UserAwareness,
+      raw_data,
+      collab_db,
+    )?;
+    Ok(collab)
+  }
+
+  /// Executes a function with user awareness.
+  ///
+  /// This function takes an asynchronous closure `f` that accepts a reference to a `UserAwareness`
+  /// and returns an `Output`. If the current user awareness is set (i.e., is `Some`), it invokes
+  /// the closure `f` with the user awareness. If the user awareness is not set (i.e., is `None`),
+  /// it attempts to initialize the user awareness via a remote session. If the session fetch
+  /// or user awareness initialization fails, it returns the provided `default_value`.
+  ///
+  /// # Parameters
+  /// - `default_value`: A default value to return if the user awareness is `None` and cannot be initialized.
+  /// - `f`: The asynchronous closure to execute with the user awareness.
+  async fn with_awareness<F, Output>(&self, default_value: Output, f: F) -> Output
+  where
+    F: FnOnce(&UserAwareness) -> Output,
+  {
+    let user_awareness = self.user_awareness.lock().await;
+    match &*user_awareness {
+      None => {
+        if let Ok(session) = self.get_session() {
+          self
+            .initialize_user_awareness(&session, UserAwarenessDataSource::Remote)
+            .await;
+        }
+        default_value
+      },
+      Some(user_awareness) => f(&user_awareness.lock()),
+    }
+  }
+}
+
+/// Indicate using which data source to initialize the user awareness
+/// If the user is not a new user, the local data source is used. Otherwise, the remote data source is used.
+/// When using the remote data source, the user awareness will be initialized from the remote server.
+#[derive(Debug)]
+pub enum UserAwarenessDataSource {
+  Local,
+  Remote,
+}

+ 125 - 0
frontend/rust-lib/flowy-user/src/services/user_workspace.rs

@@ -0,0 +1,125 @@
+use std::convert::TryFrom;
+use std::sync::Arc;
+
+use flowy_error::{FlowyError, FlowyResult};
+use flowy_sqlite::schema::user_workspace_table;
+use flowy_sqlite::{query_dsl::*, ConnectionPool, ExpressionMethods};
+use flowy_user_deps::entities::UserWorkspace;
+
+use crate::entities::RepeatedUserWorkspacePB;
+use crate::manager::UserManager;
+use crate::notification::{send_notification, UserNotification};
+use crate::services::user_workspace_sql::UserWorkspaceTable;
+
+impl UserManager {
+  pub async fn open_workspace(&self, workspace_id: &str) -> FlowyResult<()> {
+    let uid = self.user_id()?;
+    if let Some(user_workspace) = self.get_user_workspace(uid, workspace_id) {
+      if let Err(err) = self
+        .user_status_callback
+        .read()
+        .await
+        .open_workspace(uid, &user_workspace)
+        .await
+      {
+        tracing::error!("Open workspace failed: {:?}", err);
+      }
+    }
+    Ok(())
+  }
+
+  pub async fn add_user_to_workspace(
+    &self,
+    user_email: String,
+    to_workspace_id: String,
+  ) -> FlowyResult<()> {
+    self
+      .cloud_services
+      .get_user_service()?
+      .add_workspace_member(user_email, to_workspace_id)
+      .await?;
+    Ok(())
+  }
+
+  pub async fn remove_user_to_workspace(
+    &self,
+    user_email: String,
+    from_workspace_id: String,
+  ) -> FlowyResult<()> {
+    self
+      .cloud_services
+      .get_user_service()?
+      .remove_workspace_member(user_email, from_workspace_id)
+      .await?;
+    Ok(())
+  }
+
+  pub fn get_user_workspace(&self, uid: i64, workspace_id: &str) -> Option<UserWorkspace> {
+    let conn = self.db_connection(uid).ok()?;
+    let row = user_workspace_table::dsl::user_workspace_table
+      .filter(user_workspace_table::id.eq(workspace_id))
+      .first::<UserWorkspaceTable>(&*conn)
+      .ok()?;
+    Some(UserWorkspace::from(row))
+  }
+
+  pub fn get_all_user_workspaces(&self, uid: i64) -> FlowyResult<Vec<UserWorkspace>> {
+    let conn = self.db_connection(uid)?;
+    let rows = user_workspace_table::dsl::user_workspace_table
+      .filter(user_workspace_table::uid.eq(uid))
+      .load::<UserWorkspaceTable>(&*conn)?;
+
+    if let Ok(service) = self.cloud_services.get_user_service() {
+      if let Ok(pool) = self.db_pool(uid) {
+        tokio::spawn(async move {
+          if let Ok(new_user_workspaces) = service.get_user_workspaces(uid).await {
+            let _ = save_user_workspaces(uid, pool, &new_user_workspaces);
+            let repeated_workspace_pbs = RepeatedUserWorkspacePB::from(new_user_workspaces);
+            send_notification(&uid.to_string(), UserNotification::DidUpdateUserWorkspaces)
+              .payload(repeated_workspace_pbs)
+              .send();
+          }
+        });
+      }
+    }
+    Ok(rows.into_iter().map(UserWorkspace::from).collect())
+  }
+}
+
+pub fn save_user_workspaces(
+  uid: i64,
+  pool: Arc<ConnectionPool>,
+  user_workspaces: &[UserWorkspace],
+) -> FlowyResult<()> {
+  let user_workspaces = user_workspaces
+    .iter()
+    .flat_map(|user_workspace| UserWorkspaceTable::try_from((uid, user_workspace)).ok())
+    .collect::<Vec<UserWorkspaceTable>>();
+
+  let conn = pool.get()?;
+  conn.immediate_transaction(|| {
+    for user_workspace in user_workspaces {
+      if let Err(err) = diesel::update(
+        user_workspace_table::dsl::user_workspace_table
+          .filter(user_workspace_table::id.eq(user_workspace.id.clone())),
+      )
+      .set((
+        user_workspace_table::name.eq(&user_workspace.name),
+        user_workspace_table::created_at.eq(&user_workspace.created_at),
+        user_workspace_table::database_storage_id.eq(&user_workspace.database_storage_id),
+      ))
+      .execute(&*conn)
+      .and_then(|rows| {
+        if rows == 0 {
+          let _ = diesel::insert_into(user_workspace_table::table)
+            .values(user_workspace)
+            .execute(&*conn)?;
+        }
+        Ok(())
+      }) {
+        tracing::error!("Error saving user workspace: {:?}", err);
+      }
+    }
+    Ok::<(), FlowyError>(())
+  })
+}