i_user_impl.dart 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. import 'dart:typed_data';
  2. import 'package:app_flowy/workspace/infrastructure/repos/helper.dart';
  3. import 'package:dartz/dartz.dart';
  4. import 'package:app_flowy/workspace/domain/i_user.dart';
  5. import 'package:app_flowy/workspace/infrastructure/repos/user_repo.dart';
  6. import 'package:flowy_sdk/protobuf/flowy-dart-notify/protobuf.dart';
  7. import 'package:flowy_sdk/protobuf/flowy-user-infra/errors.pb.dart';
  8. // import 'package:flowy_sdk/protobuf/flowy-user/errors.pb.dart' as user_error;
  9. import 'package:flowy_sdk/protobuf/flowy-user/observable.pb.dart' as user;
  10. import 'package:flowy_sdk/protobuf/flowy-workspace-infra/workspace_create.pb.dart';
  11. import 'package:flowy_sdk/protobuf/flowy-workspace/errors.pb.dart';
  12. import 'package:flowy_sdk/protobuf/flowy-workspace/observable.pb.dart';
  13. export 'package:app_flowy/workspace/domain/i_user.dart';
  14. export 'package:app_flowy/workspace/infrastructure/repos/user_repo.dart';
  15. import 'package:flowy_sdk/rust_stream.dart';
  16. import 'dart:async';
  17. class IUserImpl extends IUser {
  18. UserRepo repo;
  19. IUserImpl({
  20. required this.repo,
  21. });
  22. @override
  23. Future<Either<Unit, WorkspaceError>> deleteWorkspace(String workspaceId) {
  24. return repo.deleteWorkspace(workspaceId: workspaceId);
  25. }
  26. @override
  27. Future<Either<UserProfile, UserError>> fetchUserProfile(String userId) {
  28. return repo.fetchUserProfile(userId: userId);
  29. }
  30. @override
  31. Future<Either<Unit, UserError>> signOut() {
  32. return repo.signOut();
  33. }
  34. @override
  35. UserProfile get user => repo.user;
  36. @override
  37. Future<Either<List<Workspace>, WorkspaceError>> fetchWorkspaces() {
  38. return repo.getWorkspaces();
  39. }
  40. @override
  41. Future<Either<Unit, UserError>> initUser() {
  42. return repo.initUser();
  43. }
  44. }
  45. class IUserListenerImpl extends IUserListener {
  46. StreamSubscription<SubscribeObject>? _subscription;
  47. WorkspacesUpdatedCallback? _workspacesUpdated;
  48. AuthChangedCallback? _authChanged;
  49. UserProfileUpdateCallback? _profileUpdated;
  50. late WorkspaceNotificationParser _workspaceParser;
  51. late UserNotificationParser _userParser;
  52. late UserProfile _user;
  53. IUserListenerImpl({
  54. required UserProfile user,
  55. }) {
  56. _user = user;
  57. }
  58. @override
  59. void start() {
  60. _workspaceParser = WorkspaceNotificationParser(id: _user.token, callback: _notificationCallback);
  61. _userParser = UserNotificationParser(id: _user.token, callback: _userNotificationCallback);
  62. _subscription = RustStreamReceiver.listen((observable) {
  63. _workspaceParser.parse(observable);
  64. _userParser.parse(observable);
  65. });
  66. }
  67. @override
  68. Future<void> stop() async {
  69. await _subscription?.cancel();
  70. }
  71. @override
  72. void setAuthCallback(AuthChangedCallback authCallback) {
  73. _authChanged = authCallback;
  74. }
  75. @override
  76. void setProfileCallback(UserProfileUpdateCallback profileCallback) {
  77. _profileUpdated = profileCallback;
  78. }
  79. @override
  80. void setWorkspacesCallback(WorkspacesUpdatedCallback workspacesCallback) {
  81. _workspacesUpdated = workspacesCallback;
  82. }
  83. void _notificationCallback(WorkspaceNotification ty, Either<Uint8List, WorkspaceError> result) {
  84. switch (ty) {
  85. case WorkspaceNotification.UserCreateWorkspace:
  86. case WorkspaceNotification.UserDeleteWorkspace:
  87. case WorkspaceNotification.WorkspaceListUpdated:
  88. if (_workspacesUpdated != null) {
  89. result.fold(
  90. (payload) {
  91. final workspaces = RepeatedWorkspace.fromBuffer(payload);
  92. _workspacesUpdated!(left(workspaces.items));
  93. },
  94. (error) => _workspacesUpdated!(right(error)),
  95. );
  96. }
  97. break;
  98. case WorkspaceNotification.UserUnauthorized:
  99. if (_authChanged != null) {
  100. result.fold(
  101. (_) {},
  102. (error) => {_authChanged!(right(UserError.create()..code = ErrorCode.UserUnauthorized.value))},
  103. );
  104. }
  105. break;
  106. default:
  107. break;
  108. }
  109. }
  110. void _userNotificationCallback(user.UserNotification ty, Either<Uint8List, UserError> result) {
  111. switch (ty) {
  112. case user.UserNotification.UserUnauthorized:
  113. if (_profileUpdated != null) {
  114. result.fold(
  115. (payload) {
  116. final userProfile = UserProfile.fromBuffer(payload);
  117. _profileUpdated!(left(userProfile));
  118. },
  119. (error) => _profileUpdated!(right(error)),
  120. );
  121. }
  122. break;
  123. default:
  124. break;
  125. }
  126. }
  127. }