i_user_impl.dart 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. import 'dart:typed_data';
  2. import 'package:app_flowy/workspace/infrastructure/repos/helper.dart';
  3. import 'package:dartz/dartz.dart';
  4. import 'package:app_flowy/workspace/domain/i_user.dart';
  5. import 'package:app_flowy/workspace/infrastructure/repos/user_repo.dart';
  6. import 'package:flowy_sdk/protobuf/flowy-observable/protobuf.dart';
  7. import 'package:flowy_sdk/protobuf/flowy-user/errors.pb.dart' as user_error;
  8. import 'package:flowy_sdk/protobuf/flowy-user/observable.pb.dart' as user;
  9. import 'package:flowy_sdk/protobuf/flowy-workspace/errors.pb.dart';
  10. import 'package:flowy_sdk/protobuf/flowy-workspace/observable.pb.dart';
  11. export 'package:app_flowy/workspace/domain/i_user.dart';
  12. export 'package:app_flowy/workspace/infrastructure/repos/user_repo.dart';
  13. import 'package:flowy_sdk/rust_stream.dart';
  14. import 'dart:async';
  15. class IUserImpl extends IUser {
  16. UserRepo repo;
  17. IUserImpl({
  18. required this.repo,
  19. });
  20. @override
  21. Future<Either<Unit, WorkspaceError>> deleteWorkspace(String workspaceId) {
  22. return repo.deleteWorkspace(workspaceId: workspaceId);
  23. }
  24. @override
  25. Future<Either<UserProfile, UserError>> fetchUserProfile(String userId) {
  26. return repo.fetchUserProfile(userId: userId);
  27. }
  28. @override
  29. Future<Either<Unit, UserError>> signOut() {
  30. return repo.signOut();
  31. }
  32. @override
  33. UserProfile get user => repo.user;
  34. @override
  35. Future<Either<List<Workspace>, WorkspaceError>> fetchWorkspaces() {
  36. return repo.getWorkspaces();
  37. }
  38. }
  39. class IUserWatchImpl extends IUserWatch {
  40. StreamSubscription<ObservableSubject>? _subscription;
  41. WorkspacesUpdatedCallback? _workspacesUpdated;
  42. AuthChangedCallback? _authChanged;
  43. UserProfileUpdateCallback? _profileUpdated;
  44. late WorkspaceObservableParser _workspaceParser;
  45. late UserObservableParser _userParser;
  46. late UserProfile _user;
  47. IUserWatchImpl({
  48. required UserProfile user,
  49. }) {
  50. _user = user;
  51. }
  52. @override
  53. void startWatching() {
  54. _workspaceParser = WorkspaceObservableParser(
  55. id: _user.token, callback: _workspaceObservableCallback);
  56. _userParser = UserObservableParser(
  57. id: _user.token, callback: _userObservableCallback);
  58. _subscription = RustStreamReceiver.listen((observable) {
  59. _workspaceParser.parse(observable);
  60. _userParser.parse(observable);
  61. });
  62. }
  63. @override
  64. Future<void> stopWatching() async {
  65. await _subscription?.cancel();
  66. }
  67. @override
  68. void setAuthCallback(AuthChangedCallback authCallback) {
  69. _authChanged = authCallback;
  70. }
  71. @override
  72. void setProfileCallback(UserProfileUpdateCallback profileCallback) {
  73. _profileUpdated = profileCallback;
  74. }
  75. @override
  76. void setWorkspacesCallback(WorkspacesUpdatedCallback workspacesCallback) {
  77. _workspacesUpdated = workspacesCallback;
  78. }
  79. void _workspaceObservableCallback(
  80. WorkspaceObservable ty, Either<Uint8List, WorkspaceError> result) {
  81. switch (ty) {
  82. case WorkspaceObservable.UserCreateWorkspace:
  83. case WorkspaceObservable.UserDeleteWorkspace:
  84. case WorkspaceObservable.WorkspaceListUpdated:
  85. if (_workspacesUpdated != null) {
  86. result.fold(
  87. (payload) {
  88. final workspaces = RepeatedWorkspace.fromBuffer(payload);
  89. _workspacesUpdated!(left(workspaces.items));
  90. },
  91. (error) => _workspacesUpdated!(right(error)),
  92. );
  93. }
  94. break;
  95. case WorkspaceObservable.UserUnauthorized:
  96. if (_authChanged != null) {
  97. result.fold(
  98. (_) {},
  99. (error) => {
  100. _authChanged!(right(UserError.create()
  101. ..code = user_error.ErrorCode.UserUnauthorized))
  102. },
  103. );
  104. }
  105. break;
  106. default:
  107. break;
  108. }
  109. }
  110. void _userObservableCallback(
  111. user.UserObservable ty, Either<Uint8List, UserError> result) {
  112. switch (ty) {
  113. case user.UserObservable.UserUnauthorized:
  114. if (_profileUpdated != null) {
  115. result.fold(
  116. (payload) {
  117. final userProfile = UserProfile.fromBuffer(payload);
  118. _profileUpdated!(left(userProfile));
  119. },
  120. (error) => _profileUpdated!(right(error)),
  121. );
  122. }
  123. break;
  124. default:
  125. break;
  126. }
  127. }
  128. }