Ver código fonte

chore: refactor user listenr

appflowy 2 anos atrás
pai
commit
957b83ecd5
23 arquivos alterados com 304 adições e 250 exclusões
  1. 39 0
      frontend/app_flowy/lib/core/folder_notification.dart
  2. 39 0
      frontend/app_flowy/lib/core/grid_notification.dart
  3. 0 62
      frontend/app_flowy/lib/core/notification_helper.dart
  4. 39 0
      frontend/app_flowy/lib/core/user_notification.dart
  5. 5 8
      frontend/app_flowy/lib/startup/deps_resolver.dart
  6. 90 57
      frontend/app_flowy/lib/user/application/user_listener.dart
  7. 0 3
      frontend/app_flowy/lib/user/presentation/skip_log_in_screen.dart
  8. 1 1
      frontend/app_flowy/lib/workspace/application/app/app_listener.dart
  9. 1 2
      frontend/app_flowy/lib/workspace/application/grid/block/block_listener.dart
  10. 1 1
      frontend/app_flowy/lib/workspace/application/grid/cell/cell_listener.dart
  11. 1 1
      frontend/app_flowy/lib/workspace/application/grid/field/field_listener.dart
  12. 1 1
      frontend/app_flowy/lib/workspace/application/grid/field/grid_listenr.dart
  13. 1 1
      frontend/app_flowy/lib/workspace/application/grid/row/row_listener.dart
  14. 4 6
      frontend/app_flowy/lib/workspace/application/home/home_bloc.dart
  15. 1 1
      frontend/app_flowy/lib/workspace/application/menu/menu_bloc.dart
  16. 21 26
      frontend/app_flowy/lib/workspace/application/menu/menu_user_bloc.dart
  17. 1 1
      frontend/app_flowy/lib/workspace/application/trash/trash_listener.dart
  18. 1 1
      frontend/app_flowy/lib/workspace/application/view/view_listener.dart
  19. 5 5
      frontend/app_flowy/lib/workspace/application/workspace/welcome_bloc.dart
  20. 36 60
      frontend/app_flowy/lib/workspace/application/workspace/workspace_listener.dart
  21. 2 2
      frontend/app_flowy/lib/workspace/presentation/home/menu/menu_user.dart
  22. 1 1
      frontend/rust-lib/flowy-user/src/handlers/user_handler.rs
  23. 14 10
      frontend/rust-lib/flowy-user/src/services/user_session.rs

+ 39 - 0
frontend/app_flowy/lib/core/folder_notification.dart

@@ -0,0 +1,39 @@
+import 'dart:async';
+import 'dart:typed_data';
+import 'package:flowy_sdk/protobuf/dart-notify/protobuf.dart';
+import 'package:dartz/dartz.dart';
+import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
+import 'package:flowy_sdk/protobuf/flowy-folder/dart_notification.pb.dart';
+import 'package:flowy_sdk/rust_stream.dart';
+
+import 'notification_helper.dart';
+
+// Folder
+typedef FolderNotificationCallback = void Function(FolderNotification, Either<Uint8List, FlowyError>);
+
+class FolderNotificationParser extends NotificationParser<FolderNotification, FlowyError> {
+  FolderNotificationParser({String? id, required FolderNotificationCallback callback})
+      : super(
+          id: id,
+          callback: callback,
+          tyParser: (ty) => FolderNotification.valueOf(ty),
+          errorParser: (bytes) => FlowyError.fromBuffer(bytes),
+        );
+}
+
+typedef FolderNotificationHandler = Function(FolderNotification ty, Either<Uint8List, FlowyError> result);
+
+class FolderNotificationListener {
+  StreamSubscription<SubscribeObject>? _subscription;
+  FolderNotificationParser? _parser;
+
+  FolderNotificationListener({required String objectId, required FolderNotificationHandler handler})
+      : _parser = FolderNotificationParser(id: objectId, callback: handler) {
+    _subscription = RustStreamReceiver.listen((observable) => _parser?.parse(observable));
+  }
+
+  Future<void> stop() async {
+    _parser = null;
+    await _subscription?.cancel();
+  }
+}

+ 39 - 0
frontend/app_flowy/lib/core/grid_notification.dart

@@ -0,0 +1,39 @@
+import 'dart:async';
+import 'dart:typed_data';
+import 'package:flowy_sdk/protobuf/dart-notify/protobuf.dart';
+import 'package:dartz/dartz.dart';
+import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
+import 'package:flowy_sdk/protobuf/flowy-grid/dart_notification.pb.dart';
+import 'package:flowy_sdk/rust_stream.dart';
+
+import 'notification_helper.dart';
+
+// Grid
+typedef GridNotificationCallback = void Function(GridNotification, Either<Uint8List, FlowyError>);
+
+class GridNotificationParser extends NotificationParser<GridNotification, FlowyError> {
+  GridNotificationParser({String? id, required GridNotificationCallback callback})
+      : super(
+          id: id,
+          callback: callback,
+          tyParser: (ty) => GridNotification.valueOf(ty),
+          errorParser: (bytes) => FlowyError.fromBuffer(bytes),
+        );
+}
+
+typedef GridNotificationHandler = Function(GridNotification ty, Either<Uint8List, FlowyError> result);
+
+class GridNotificationListener {
+  StreamSubscription<SubscribeObject>? _subscription;
+  GridNotificationParser? _parser;
+
+  GridNotificationListener({required String objectId, required GridNotificationHandler handler})
+      : _parser = GridNotificationParser(id: objectId, callback: handler) {
+    _subscription = RustStreamReceiver.listen((observable) => _parser?.parse(observable));
+  }
+
+  Future<void> stop() async {
+    _parser = null;
+    await _subscription?.cancel();
+  }
+}

+ 0 - 62
frontend/app_flowy/lib/core/notification_helper.dart

@@ -1,68 +1,6 @@
-import 'dart:async';
 import 'dart:typed_data';
 import 'package:flowy_sdk/protobuf/dart-notify/protobuf.dart';
-import 'package:flowy_sdk/protobuf/flowy-user/protobuf.dart';
 import 'package:dartz/dartz.dart';
-import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
-import 'package:flowy_sdk/protobuf/flowy-folder/dart_notification.pb.dart';
-import 'package:flowy_sdk/protobuf/flowy-grid/dart_notification.pb.dart';
-import 'package:flowy_sdk/rust_stream.dart';
-
-// User
-typedef UserNotificationCallback = void Function(UserNotification, Either<Uint8List, FlowyError>);
-
-class UserNotificationParser extends NotificationParser<UserNotification, FlowyError> {
-  UserNotificationParser({required String id, required UserNotificationCallback callback})
-      : super(
-          id: id,
-          callback: callback,
-          tyParser: (ty) => UserNotification.valueOf(ty),
-          errorParser: (bytes) => FlowyError.fromBuffer(bytes),
-        );
-}
-
-// Folder
-typedef FolderNotificationCallback = void Function(FolderNotification, Either<Uint8List, FlowyError>);
-
-class FolderNotificationParser extends NotificationParser<FolderNotification, FlowyError> {
-  FolderNotificationParser({String? id, required FolderNotificationCallback callback})
-      : super(
-          id: id,
-          callback: callback,
-          tyParser: (ty) => FolderNotification.valueOf(ty),
-          errorParser: (bytes) => FlowyError.fromBuffer(bytes),
-        );
-}
-
-// Grid
-typedef GridNotificationCallback = void Function(GridNotification, Either<Uint8List, FlowyError>);
-
-class GridNotificationParser extends NotificationParser<GridNotification, FlowyError> {
-  GridNotificationParser({String? id, required GridNotificationCallback callback})
-      : super(
-          id: id,
-          callback: callback,
-          tyParser: (ty) => GridNotification.valueOf(ty),
-          errorParser: (bytes) => FlowyError.fromBuffer(bytes),
-        );
-}
-
-typedef GridNotificationHandler = Function(GridNotification ty, Either<Uint8List, FlowyError> result);
-
-class GridNotificationListener {
-  StreamSubscription<SubscribeObject>? _subscription;
-  GridNotificationParser? _parser;
-
-  GridNotificationListener({required String objectId, required GridNotificationHandler handler})
-      : _parser = GridNotificationParser(id: objectId, callback: handler) {
-    _subscription = RustStreamReceiver.listen((observable) => _parser?.parse(observable));
-  }
-
-  Future<void> stop() async {
-    _parser = null;
-    await _subscription?.cancel();
-  }
-}
 
 class NotificationParser<T, E> {
   String? id;

+ 39 - 0
frontend/app_flowy/lib/core/user_notification.dart

@@ -0,0 +1,39 @@
+import 'dart:async';
+import 'dart:typed_data';
+import 'package:flowy_sdk/protobuf/dart-notify/protobuf.dart';
+import 'package:flowy_sdk/protobuf/flowy-user/protobuf.dart';
+import 'package:dartz/dartz.dart';
+import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
+import 'package:flowy_sdk/rust_stream.dart';
+
+import 'notification_helper.dart';
+
+// User
+typedef UserNotificationCallback = void Function(UserNotification, Either<Uint8List, FlowyError>);
+
+class UserNotificationParser extends NotificationParser<UserNotification, FlowyError> {
+  UserNotificationParser({required String id, required UserNotificationCallback callback})
+      : super(
+          id: id,
+          callback: callback,
+          tyParser: (ty) => UserNotification.valueOf(ty),
+          errorParser: (bytes) => FlowyError.fromBuffer(bytes),
+        );
+}
+
+typedef UserNotificationHandler = Function(UserNotification ty, Either<Uint8List, FlowyError> result);
+
+class UserNotificationListener {
+  StreamSubscription<SubscribeObject>? _subscription;
+  UserNotificationParser? _parser;
+
+  UserNotificationListener({required String objectId, required UserNotificationHandler handler})
+      : _parser = UserNotificationParser(id: objectId, callback: handler) {
+    _subscription = RustStreamReceiver.listen((observable) => _parser?.parse(observable));
+  }
+
+  Future<void> stop() async {
+    _parser = null;
+    await _subscription?.cancel();
+  }
+}

+ 5 - 8
frontend/app_flowy/lib/startup/deps_resolver.dart

@@ -52,7 +52,7 @@ void _resolveHomeDeps(GetIt getIt) {
   getIt.registerSingleton(MenuSharedState());
 
   getIt.registerFactoryParam<UserListener, UserProfile, void>(
-    (user, _) => UserListener(user: user),
+    (user, _) => UserListener(userProfile: user),
   );
 
   //
@@ -61,7 +61,7 @@ void _resolveHomeDeps(GetIt getIt) {
   getIt.registerFactoryParam<WelcomeBloc, UserProfile, void>(
     (user, _) => WelcomeBloc(
       userService: UserService(userId: user.id),
-      userListener: getIt<UserListener>(param1: user),
+      userWorkspaceListener: UserWorkspaceListener(userProfile: user),
     ),
   );
 
@@ -73,8 +73,8 @@ void _resolveHomeDeps(GetIt getIt) {
 
 void _resolveFolderDeps(GetIt getIt) {
   //workspace
-  getIt.registerFactoryParam<WorkspaceListener, UserProfile, String>((user, workspaceId) =>
-      WorkspaceListener(service: WorkspaceListenerService(user: user, workspaceId: workspaceId)));
+  getIt.registerFactoryParam<WorkspaceListener, UserProfile, String>(
+      (user, workspaceId) => WorkspaceListener(user: user, workspaceId: workspaceId));
 
   // View
   getIt.registerFactoryParam<ViewListener, View, void>(
@@ -98,10 +98,7 @@ void _resolveFolderDeps(GetIt getIt) {
   );
 
   getIt.registerFactoryParam<MenuUserBloc, UserProfile, void>(
-    (user, _) => MenuUserBloc(
-      user,
-      getIt<UserListener>(param1: user),
-    ),
+    (user, _) => MenuUserBloc(user),
   );
 
   // App

+ 90 - 57
frontend/app_flowy/lib/user/application/user_listener.dart

@@ -1,10 +1,11 @@
 import 'dart:async';
+import 'package:app_flowy/core/folder_notification.dart';
+import 'package:app_flowy/core/user_notification.dart';
 import 'package:dartz/dartz.dart';
 import 'package:flowy_sdk/protobuf/flowy-error-code/code.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-folder-data-model/workspace.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
 import 'dart:typed_data';
-import 'package:app_flowy/core/notification_helper.dart';
 import 'package:flowy_infra/notifier.dart';
 import 'package:flowy_sdk/protobuf/dart-notify/protobuf.dart';
 import 'package:flowy_sdk/protobuf/flowy-folder/dart_notification.pb.dart';
@@ -14,108 +15,140 @@ import 'package:flowy_sdk/rust_stream.dart';
 
 typedef UserProfileNotifyValue = Either<UserProfile, FlowyError>;
 typedef AuthNotifyValue = Either<Unit, FlowyError>;
-typedef WorkspaceListNotifyValue = Either<List<Workspace>, FlowyError>;
-typedef WorkspaceSettingNotifyValue = Either<CurrentWorkspaceSetting, FlowyError>;
 
 class UserListener {
   StreamSubscription<SubscribeObject>? _subscription;
-  final _profileNotifier = PublishNotifier<UserProfileNotifyValue>();
-  final _authNotifier = PublishNotifier<AuthNotifyValue>();
-  final _workspaceListNotifier = PublishNotifier<WorkspaceListNotifyValue>();
-  final _workSettingNotifier = PublishNotifier<WorkspaceSettingNotifyValue>();
+  PublishNotifier<AuthNotifyValue>? _authNotifier = PublishNotifier();
+  PublishNotifier<UserProfileNotifyValue>? _profileNotifier = PublishNotifier();
 
-  FolderNotificationParser? _workspaceParser;
   UserNotificationParser? _userParser;
-  final UserProfile _user;
+  final UserProfile _userProfile;
   UserListener({
-    required UserProfile user,
-  }) : _user = user;
+    required UserProfile userProfile,
+  }) : _userProfile = userProfile;
 
   void start({
     void Function(AuthNotifyValue)? onAuthChanged,
     void Function(UserProfileNotifyValue)? onProfileUpdated,
-    void Function(WorkspaceListNotifyValue)? onWorkspaceListUpdated,
-    void Function(WorkspaceSettingNotifyValue)? onWorkspaceSettingUpdated,
   }) {
-    if (onAuthChanged != null) {
-      _authNotifier.addListener(() {
-        onAuthChanged(_authNotifier.currentValue!);
-      });
-    }
-
     if (onProfileUpdated != null) {
-      _profileNotifier.addListener(() {
-        onProfileUpdated(_profileNotifier.currentValue!);
-      });
-    }
-
-    if (onWorkspaceListUpdated != null) {
-      _workspaceListNotifier.addListener(() {
-        onWorkspaceListUpdated(_workspaceListNotifier.currentValue!);
-      });
+      _profileNotifier?.addPublishListener(onProfileUpdated);
     }
 
-    if (onWorkspaceSettingUpdated != null) {
-      _workSettingNotifier.addListener(() {
-        onWorkspaceSettingUpdated(_workSettingNotifier.currentValue!);
-      });
+    if (onAuthChanged != null) {
+      _authNotifier?.addPublishListener(onAuthChanged);
     }
 
-    _workspaceParser = FolderNotificationParser(id: _user.token, callback: _notificationCallback);
-    _userParser = UserNotificationParser(id: _user.token, callback: _userNotificationCallback);
+    _userParser = UserNotificationParser(id: _userProfile.token, callback: _userNotificationCallback);
     _subscription = RustStreamReceiver.listen((observable) {
-      _workspaceParser?.parse(observable);
       _userParser?.parse(observable);
     });
   }
 
   Future<void> stop() async {
-    _workspaceParser = null;
     _userParser = null;
     await _subscription?.cancel();
-    _profileNotifier.dispose();
-    _authNotifier.dispose();
-    _workspaceListNotifier.dispose();
+    _profileNotifier?.dispose();
+    _profileNotifier = null;
+
+    _authNotifier?.dispose();
+    _authNotifier = null;
+  }
+
+  void _userNotificationCallback(user.UserNotification ty, Either<Uint8List, FlowyError> result) {
+    switch (ty) {
+      case user.UserNotification.UserUnauthorized:
+        result.fold(
+          (_) {},
+          (error) => _authNotifier?.value = right(error),
+        );
+        break;
+      case user.UserNotification.UserProfileUpdated:
+        result.fold(
+          (payload) => _profileNotifier?.value = left(UserProfile.fromBuffer(payload)),
+          (error) => _profileNotifier?.value = right(error),
+        );
+        break;
+      default:
+        break;
+    }
+  }
+}
+
+typedef WorkspaceListNotifyValue = Either<List<Workspace>, FlowyError>;
+typedef WorkspaceSettingNotifyValue = Either<CurrentWorkspaceSetting, FlowyError>;
+
+class UserWorkspaceListener {
+  PublishNotifier<AuthNotifyValue>? _authNotifier = PublishNotifier();
+  PublishNotifier<WorkspaceListNotifyValue>? _workspacesChangedNotifier = PublishNotifier();
+  PublishNotifier<WorkspaceSettingNotifyValue>? _settingChangedNotifier = PublishNotifier();
+
+  FolderNotificationListener? _listener;
+  final UserProfile _userProfile;
+
+  UserWorkspaceListener({
+    required UserProfile userProfile,
+  }) : _userProfile = userProfile;
+
+  void start({
+    void Function(AuthNotifyValue)? onAuthChanged,
+    void Function(WorkspaceListNotifyValue)? onWorkspacesUpdated,
+    void Function(WorkspaceSettingNotifyValue)? onSettingUpdated,
+  }) {
+    if (onAuthChanged != null) {
+      _authNotifier?.addPublishListener(onAuthChanged);
+    }
+
+    if (onWorkspacesUpdated != null) {
+      _workspacesChangedNotifier?.addPublishListener(onWorkspacesUpdated);
+    }
+
+    if (onSettingUpdated != null) {
+      _settingChangedNotifier?.addPublishListener(onSettingUpdated);
+    }
+
+    _listener = FolderNotificationListener(
+      objectId: _userProfile.token,
+      handler: _handleObservableType,
+    );
   }
 
-  void _notificationCallback(FolderNotification ty, Either<Uint8List, FlowyError> result) {
+  void _handleObservableType(FolderNotification ty, Either<Uint8List, FlowyError> result) {
     switch (ty) {
       case FolderNotification.UserCreateWorkspace:
       case FolderNotification.UserDeleteWorkspace:
       case FolderNotification.WorkspaceListUpdated:
         result.fold(
-          (payload) => _workspaceListNotifier.value = left(RepeatedWorkspace.fromBuffer(payload).items),
-          (error) => _workspaceListNotifier.value = right(error),
+          (payload) => _workspacesChangedNotifier?.value = left(RepeatedWorkspace.fromBuffer(payload).items),
+          (error) => _workspacesChangedNotifier?.value = right(error),
         );
         break;
       case FolderNotification.WorkspaceSetting:
         result.fold(
-          (payload) => _workSettingNotifier.value = left(CurrentWorkspaceSetting.fromBuffer(payload)),
-          (error) => _workSettingNotifier.value = right(error),
+          (payload) => _settingChangedNotifier?.value = left(CurrentWorkspaceSetting.fromBuffer(payload)),
+          (error) => _settingChangedNotifier?.value = right(error),
         );
         break;
       case FolderNotification.UserUnauthorized:
         result.fold(
           (_) {},
-          (error) => _authNotifier.value = right(FlowyError.create()..code = ErrorCode.UserUnauthorized.value),
+          (error) => _authNotifier?.value = right(FlowyError.create()..code = ErrorCode.UserUnauthorized.value),
         );
         break;
-
       default:
         break;
     }
   }
 
-  void _userNotificationCallback(user.UserNotification ty, Either<Uint8List, FlowyError> result) {
-    switch (ty) {
-      case user.UserNotification.UserUnauthorized:
-        result.fold(
-          (payload) => _profileNotifier.value = left(UserProfile.fromBuffer(payload)),
-          (error) => _profileNotifier.value = right(error),
-        );
-        break;
-      default:
-        break;
-    }
+  Future<void> stop() async {
+    await _listener?.stop();
+    _workspacesChangedNotifier?.dispose();
+    _workspacesChangedNotifier = null;
+
+    _settingChangedNotifier?.dispose();
+    _settingChangedNotifier = null;
+
+    _authNotifier?.dispose();
+    _authNotifier = null;
   }
 }

+ 0 - 3
frontend/app_flowy/lib/user/presentation/skip_log_in_screen.dart

@@ -1,5 +1,4 @@
 import 'package:app_flowy/user/application/auth_service.dart';
-import 'package:app_flowy/user/application/user_listener.dart';
 import 'package:app_flowy/user/presentation/router.dart';
 import 'package:app_flowy/user/presentation/widgets/background.dart';
 import 'package:easy_localization/easy_localization.dart';
@@ -34,8 +33,6 @@ class SkipLogInScreen extends StatefulWidget {
 }
 
 class _SkipLogInScreenState extends State<SkipLogInScreen> {
-  UserListener? userListener;
-
   @override
   Widget build(BuildContext context) {
     return Scaffold(

+ 1 - 1
frontend/app_flowy/lib/workspace/application/app/app_listener.dart

@@ -1,7 +1,7 @@
 import 'dart:async';
 import 'dart:typed_data';
+import 'package:app_flowy/core/folder_notification.dart';
 import 'package:dartz/dartz.dart';
-import 'package:app_flowy/core/notification_helper.dart';
 import 'package:flowy_sdk/log.dart';
 import 'package:flowy_sdk/protobuf/dart-notify/subject.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';

+ 1 - 2
frontend/app_flowy/lib/workspace/application/grid/block/block_listener.dart

@@ -1,7 +1,6 @@
 import 'dart:async';
 import 'dart:typed_data';
-
-import 'package:app_flowy/core/notification_helper.dart';
+import 'package:app_flowy/core/grid_notification.dart';
 import 'package:dartz/dartz.dart';
 import 'package:flowy_infra/notifier.dart';
 import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';

+ 1 - 1
frontend/app_flowy/lib/workspace/application/grid/cell/cell_listener.dart

@@ -1,10 +1,10 @@
+import 'package:app_flowy/core/grid_notification.dart';
 import 'package:dartz/dartz.dart';
 import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-grid/dart_notification.pb.dart';
 import 'package:flowy_infra/notifier.dart';
 import 'dart:async';
 import 'dart:typed_data';
-import 'package:app_flowy/core/notification_helper.dart';
 
 typedef UpdateFieldNotifiedValue = Either<Unit, FlowyError>;
 

+ 1 - 1
frontend/app_flowy/lib/workspace/application/grid/field/field_listener.dart

@@ -1,10 +1,10 @@
+import 'package:app_flowy/core/grid_notification.dart';
 import 'package:dartz/dartz.dart';
 import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-grid/dart_notification.pb.dart';
 import 'package:flowy_infra/notifier.dart';
 import 'dart:async';
 import 'dart:typed_data';
-import 'package:app_flowy/core/notification_helper.dart';
 import 'package:flowy_sdk/protobuf/flowy-grid/field_entities.pb.dart';
 
 typedef UpdateFieldNotifiedValue = Either<Field, FlowyError>;

+ 1 - 1
frontend/app_flowy/lib/workspace/application/grid/field/grid_listenr.dart

@@ -1,10 +1,10 @@
+import 'package:app_flowy/core/grid_notification.dart';
 import 'package:dartz/dartz.dart';
 import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-grid/dart_notification.pb.dart';
 import 'package:flowy_infra/notifier.dart';
 import 'dart:async';
 import 'dart:typed_data';
-import 'package:app_flowy/core/notification_helper.dart';
 import 'package:flowy_sdk/protobuf/flowy-grid/field_entities.pb.dart';
 
 typedef UpdateFieldNotifiedValue = Either<GridFieldChangeset, FlowyError>;

+ 1 - 1
frontend/app_flowy/lib/workspace/application/grid/row/row_listener.dart

@@ -1,10 +1,10 @@
+import 'package:app_flowy/core/grid_notification.dart';
 import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-grid/block_entities.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-grid/dart_notification.pb.dart';
 import 'package:flowy_infra/notifier.dart';
 import 'dart:async';
 import 'dart:typed_data';
-import 'package:app_flowy/core/notification_helper.dart';
 import 'package:dartz/dartz.dart';
 import 'package:flowy_sdk/protobuf/flowy-grid/field_entities.pb.dart';
 

+ 4 - 6
frontend/app_flowy/lib/workspace/application/home/home_bloc.dart

@@ -11,19 +11,17 @@ import 'package:dartz/dartz.dart';
 part 'home_bloc.freezed.dart';
 
 class HomeBloc extends Bloc<HomeEvent, HomeState> {
-  final UserListener _listener;
+  final UserWorkspaceListener _listener;
 
   HomeBloc(UserProfile user, CurrentWorkspaceSetting workspaceSetting)
-      : _listener = UserListener(user: user),
+      : _listener = UserWorkspaceListener(userProfile: user),
         super(HomeState.initial(workspaceSetting)) {
     on<HomeEvent>((event, emit) async {
       await event.map(
         initial: (_Initial value) {
           _listener.start(
-            onAuthChanged: (result) {
-              _authDidChanged(result);
-            },
-            onWorkspaceSettingUpdated: (result) {
+            onAuthChanged: (result) => _authDidChanged(result),
+            onSettingUpdated: (result) {
               result.fold(
                 (setting) => add(HomeEvent.didReceiveWorkspaceSetting(setting)),
                 (r) => Log.error(r),

+ 1 - 1
frontend/app_flowy/lib/workspace/application/menu/menu_bloc.dart

@@ -22,7 +22,7 @@ class MenuBloc extends Bloc<MenuEvent, MenuState> {
     on<MenuEvent>((event, emit) async {
       await event.map(
         initial: (e) async {
-          listener.start(addAppCallback: _handleAppsOrFail);
+          listener.start(appsChanged: _handleAppsOrFail);
           await _fetchApps(emit);
         },
         openPage: (e) async {

+ 21 - 26
frontend/app_flowy/lib/workspace/application/menu/menu_user_bloc.dart

@@ -12,29 +12,33 @@ part 'menu_user_bloc.freezed.dart';
 
 class MenuUserBloc extends Bloc<MenuUserEvent, MenuUserState> {
   final UserService _userService;
-  final UserListener userListener;
+  final UserListener _userListener;
+  final UserWorkspaceListener _userWorkspaceListener;
   final UserProfile userProfile;
 
-  MenuUserBloc(this.userProfile, this.userListener)
-      : _userService = UserService(userId: userProfile.id),
+  MenuUserBloc(this.userProfile)
+      : _userListener = UserListener(userProfile: userProfile),
+        _userWorkspaceListener = UserWorkspaceListener(userProfile: userProfile),
+        _userService = UserService(userId: userProfile.id),
         super(MenuUserState.initial(userProfile)) {
     on<MenuUserEvent>((event, emit) async {
-      await event.map(
-        initial: (_) async {
-          userListener.start(
-            onProfileUpdated: _profileUpdated,
-            onWorkspaceListUpdated: _workspaceListUpdated,
-          );
+      await event.when(
+        initial: () async {
+          _userListener.start(onProfileUpdated: _profileUpdated);
+          _userWorkspaceListener.start(onWorkspacesUpdated: _workspaceListUpdated);
           await _initUser();
         },
-        fetchWorkspaces: (_FetchWorkspaces value) async {},
+        fetchWorkspaces: () async {
+          //
+        },
       );
     });
   }
 
   @override
   Future<void> close() async {
-    await userListener.stop();
+    await _userListener.stop();
+    await _userWorkspaceListener.stop();
     super.close();
   }
 
@@ -43,19 +47,10 @@ class MenuUserBloc extends Bloc<MenuUserEvent, MenuUserState> {
     result.fold((l) => null, (error) => Log.error(error));
   }
 
-  void _profileUpdated(Either<UserProfile, FlowyError> userOrFailed) {}
+  void _profileUpdated(Either<UserProfile, FlowyError> userProfileOrFailed) {}
+
   void _workspaceListUpdated(Either<List<Workspace>, FlowyError> workspacesOrFailed) {
-    // fetch workspaces
-    // iUserImpl.fetchWorkspaces().then((result) {
-    //   result.fold(
-    //     (workspaces) async* {
-    //       yield state.copyWith(workspaces: some(workspaces));
-    //     },
-    //     (error) async* {
-    //       yield state.copyWith(successOrFailure: right(error.msg));
-    //     },
-    //   );
-    // });
+    // Do nothing by now
   }
 }
 
@@ -68,13 +63,13 @@ class MenuUserEvent with _$MenuUserEvent {
 @freezed
 class MenuUserState with _$MenuUserState {
   const factory MenuUserState({
-    required UserProfile user,
+    required UserProfile userProfile,
     required Option<List<Workspace>> workspaces,
     required Either<Unit, String> successOrFailure,
   }) = _MenuUserState;
 
-  factory MenuUserState.initial(UserProfile user) => MenuUserState(
-        user: user,
+  factory MenuUserState.initial(UserProfile userProfile) => MenuUserState(
+        userProfile: userProfile,
         workspaces: none(),
         successOrFailure: left(unit),
       );

+ 1 - 1
frontend/app_flowy/lib/workspace/application/trash/trash_listener.dart

@@ -1,7 +1,7 @@
 import 'dart:async';
 import 'dart:typed_data';
+import 'package:app_flowy/core/folder_notification.dart';
 import 'package:dartz/dartz.dart';
-import 'package:app_flowy/core/notification_helper.dart';
 import 'package:flowy_sdk/protobuf/dart-notify/subject.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-folder/dart_notification.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';

+ 1 - 1
frontend/app_flowy/lib/workspace/application/view/view_listener.dart

@@ -1,6 +1,6 @@
 import 'dart:async';
 import 'dart:typed_data';
-import 'package:app_flowy/core/notification_helper.dart';
+import 'package:app_flowy/core/folder_notification.dart';
 import 'package:dartz/dartz.dart';
 import 'package:flowy_sdk/protobuf/dart-notify/subject.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-folder-data-model/view.pb.dart';

+ 5 - 5
frontend/app_flowy/lib/workspace/application/workspace/welcome_bloc.dart

@@ -11,13 +11,13 @@ part 'welcome_bloc.freezed.dart';
 
 class WelcomeBloc extends Bloc<WelcomeEvent, WelcomeState> {
   final UserService userService;
-  final UserListener userListener;
-  WelcomeBloc({required this.userService, required this.userListener}) : super(WelcomeState.initial()) {
+  final UserWorkspaceListener userWorkspaceListener;
+  WelcomeBloc({required this.userService, required this.userWorkspaceListener}) : super(WelcomeState.initial()) {
     on<WelcomeEvent>(
       (event, emit) async {
         await event.map(initial: (e) async {
-          userListener.start(
-            onWorkspaceListUpdated: (result) => add(WelcomeEvent.workspacesReveived(result)),
+          userWorkspaceListener.start(
+            onWorkspacesUpdated: (result) => add(WelcomeEvent.workspacesReveived(result)),
           );
           //
           await _fetchWorkspaces(emit);
@@ -37,7 +37,7 @@ class WelcomeBloc extends Bloc<WelcomeEvent, WelcomeState> {
 
   @override
   Future<void> close() async {
-    await userListener.stop();
+    await userWorkspaceListener.stop();
     super.close();
   }
 

+ 36 - 60
frontend/app_flowy/lib/workspace/application/workspace/workspace_listener.dart

@@ -1,97 +1,73 @@
 import 'dart:async';
 import 'dart:typed_data';
-
-import 'package:app_flowy/core/notification_helper.dart';
+import 'package:app_flowy/core/folder_notification.dart';
 import 'package:dartz/dartz.dart';
-import 'package:flowy_sdk/log.dart';
-import 'package:flowy_sdk/protobuf/dart-notify/subject.pb.dart';
+import 'package:flowy_infra/notifier.dart';
 import 'package:flowy_sdk/protobuf/flowy-user-data-model/protobuf.dart' show UserProfile;
 import 'package:flowy_sdk/protobuf/flowy-folder-data-model/app.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-folder-data-model/workspace.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-folder/dart_notification.pb.dart';
-import 'package:flowy_sdk/rust_stream.dart';
 
-typedef WorkspaceAppsChangedCallback = void Function(Either<List<App>, FlowyError> appsOrFail);
-typedef WorkspaceUpdatedCallback = void Function(String name, String desc);
+typedef AppListNotifyValue = Either<List<App>, FlowyError>;
+typedef WorkspaceNotifyValue = Either<Workspace, FlowyError>;
 
 class WorkspaceListener {
-  WorkspaceListenerService service;
-  WorkspaceListener({
-    required this.service,
-  });
-
-  void start({WorkspaceAppsChangedCallback? addAppCallback, WorkspaceUpdatedCallback? updatedCallback}) {
-    service.startListening(appsChanged: addAppCallback, update: updatedCallback);
-  }
-
-  Future<void> stop() async {
-    await service.close();
-  }
-}
+  PublishNotifier<AppListNotifyValue>? _appsChangedNotifier = PublishNotifier();
+  PublishNotifier<WorkspaceNotifyValue>? _workspaceUpdatedNotifier = PublishNotifier();
 
-class WorkspaceListenerService {
-  StreamSubscription<SubscribeObject>? _subscription;
-  WorkspaceAppsChangedCallback? _appsChanged;
-  WorkspaceUpdatedCallback? _update;
-  FolderNotificationParser? _parser;
+  FolderNotificationListener? _listener;
   final UserProfile user;
   final String workspaceId;
 
-  WorkspaceListenerService({
+  WorkspaceListener({
     required this.user,
     required this.workspaceId,
   });
 
-  void startListening({
-    WorkspaceAppsChangedCallback? appsChanged,
-    WorkspaceUpdatedCallback? update,
+  void start({
+    void Function(AppListNotifyValue)? appsChanged,
+    void Function(WorkspaceNotifyValue)? onWorkspaceUpdated,
   }) {
-    _appsChanged = appsChanged;
-    _update = update;
+    if (appsChanged != null) {
+      _appsChangedNotifier?.addPublishListener(appsChanged);
+    }
 
-    _parser = FolderNotificationParser(
-      id: workspaceId,
-      callback: (ty, result) {
-        _handleObservableType(ty, result);
-      },
-    );
+    if (onWorkspaceUpdated != null) {
+      _workspaceUpdatedNotifier?.addPublishListener(onWorkspaceUpdated);
+    }
 
-    _subscription = RustStreamReceiver.listen((observable) => _parser?.parse(observable));
+    _listener = FolderNotificationListener(
+      objectId: workspaceId,
+      handler: _handleObservableType,
+    );
   }
 
   void _handleObservableType(FolderNotification ty, Either<Uint8List, FlowyError> result) {
     switch (ty) {
       case FolderNotification.WorkspaceUpdated:
-        if (_update != null) {
-          result.fold(
-            (payload) {
-              final workspace = Workspace.fromBuffer(payload);
-              _update!(workspace.name, workspace.desc);
-            },
-            (error) => Log.error(error),
-          );
-        }
+        result.fold(
+          (payload) => _workspaceUpdatedNotifier?.value = left(Workspace.fromBuffer(payload)),
+          (error) => _workspaceUpdatedNotifier?.value = right(error),
+        );
         break;
       case FolderNotification.WorkspaceAppsChanged:
-        if (_appsChanged != null) {
-          result.fold(
-            (payload) => _appsChanged!(
-              left(RepeatedApp.fromBuffer(payload).items),
-            ),
-            (error) => _appsChanged!(right(error)),
-          );
-        }
+        result.fold(
+          (payload) => _appsChangedNotifier?.value = left(RepeatedApp.fromBuffer(payload).items),
+          (error) => _appsChangedNotifier?.value = right(error),
+        );
         break;
       default:
         break;
     }
   }
 
-  Future<void> close() async {
-    _parser = null;
-    await _subscription?.cancel();
-    // _appsChanged = null;
-    // _update = null;
+  Future<void> stop() async {
+    await _listener?.stop();
+    _appsChangedNotifier?.dispose();
+    _appsChangedNotifier = null;
+
+    _workspaceUpdatedNotifier?.dispose();
+    _workspaceUpdatedNotifier = null;
   }
 }

+ 2 - 2
frontend/app_flowy/lib/workspace/presentation/home/menu/menu_user.dart

@@ -58,9 +58,9 @@ class MenuUser extends StatelessWidget {
   }
 
   Widget _renderUserName(BuildContext context) {
-    String name = context.read<MenuUserBloc>().state.user.name;
+    String name = context.read<MenuUserBloc>().state.userProfile.name;
     if (name.isEmpty) {
-      name = context.read<MenuUserBloc>().state.user.email;
+      name = context.read<MenuUserBloc>().state.userProfile.email;
     }
     return FlowyText(name, fontSize: 12);
   }

+ 1 - 1
frontend/rust-lib/flowy-user/src/handlers/user_handler.rs

@@ -20,7 +20,7 @@ pub async fn check_user_handler(session: AppData<Arc<UserSession>>) -> DataResul
 
 #[tracing::instrument(level = "debug", skip(session))]
 pub async fn get_user_profile_handler(session: AppData<Arc<UserSession>>) -> DataResult<UserProfile, FlowyError> {
-    let user_profile = session.user_profile().await?;
+    let user_profile = session.get_user_profile().await?;
     data_result(user_profile)
 }
 

+ 14 - 10
frontend/rust-lib/flowy-user/src/services/user_session.rs

@@ -82,7 +82,7 @@ impl UserSession {
     #[tracing::instrument(level = "debug", skip(self))]
     pub async fn sign_in(&self, params: SignInParams) -> Result<UserProfile, FlowyError> {
         if self.is_user_login(&params.email) {
-            self.user_profile().await
+            self.get_user_profile().await
         } else {
             let resp = self.cloud_service.sign_in(params).await?;
             let session: Session = resp.clone().into();
@@ -97,7 +97,7 @@ impl UserSession {
     #[tracing::instrument(level = "debug", skip(self))]
     pub async fn sign_up(&self, params: SignUpParams) -> Result<UserProfile, FlowyError> {
         if self.is_user_login(&params.email) {
-            self.user_profile().await
+            self.get_user_profile().await
         } else {
             let resp = self.cloud_service.sign_up(params).await?;
             let session: Session = resp.clone().into();
@@ -131,6 +131,10 @@ impl UserSession {
         let changeset = UserTableChangeset::new(params.clone());
         diesel_update_table!(user_table, changeset, &*self.db_connection()?);
 
+        let user_profile = self.get_user_profile().await?;
+        dart_notify(&session.token, UserNotification::UserProfileUpdated)
+            .payload(user_profile)
+            .send();
         let _ = self.update_user_on_server(&session.token, params).await?;
         Ok(())
     }
@@ -150,7 +154,7 @@ impl UserSession {
         Ok(user.into())
     }
 
-    pub async fn user_profile(&self) -> Result<UserProfile, FlowyError> {
+    pub async fn get_user_profile(&self) -> Result<UserProfile, FlowyError> {
         let (user_id, token) = self.get_session()?.into_part();
         let user = dsl::user_table
             .filter(user_table::id.eq(&user_id))
@@ -185,14 +189,14 @@ impl UserSession {
         tokio::spawn(async move {
             match server.get_user(&token).await {
                 Ok(profile) => {
-                    dart_notify(&token, UserNotification::UserProfileUpdated)
-                        .payload(profile)
-                        .send();
+                    // dart_notify(&token, UserNotification::UserProfileUpdated)
+                    //     .payload(profile)
+                    //     .send();
                 }
-                Err(e) => {
-                    dart_notify(&token, UserNotification::UserProfileUpdated)
-                        .error(e)
-                        .send();
+                Err(_e) => {
+                    // dart_notify(&token, UserNotification::UserProfileUpdated)
+                    //     .error(e)
+                    //     .send();
                 }
             }
         });