import 'dart:typed_data'; import 'package:app_flowy/workspace/infrastructure/repos/helper.dart'; import 'package:dartz/dartz.dart'; import 'package:app_flowy/workspace/domain/i_user.dart'; import 'package:app_flowy/workspace/infrastructure/repos/user_repo.dart'; import 'package:flowy_sdk/protobuf/flowy-dart-notify/protobuf.dart'; import 'package:flowy_sdk/protobuf/flowy-user/errors.pb.dart' as user_error; import 'package:flowy_sdk/protobuf/flowy-user/observable.pb.dart' as user; import 'package:flowy_sdk/protobuf/flowy-workspace/errors.pb.dart'; import 'package:flowy_sdk/protobuf/flowy-workspace/observable.pb.dart'; export 'package:app_flowy/workspace/domain/i_user.dart'; export 'package:app_flowy/workspace/infrastructure/repos/user_repo.dart'; import 'package:flowy_sdk/rust_stream.dart'; import 'dart:async'; class IUserImpl extends IUser { UserRepo repo; IUserImpl({ required this.repo, }); @override Future> deleteWorkspace(String workspaceId) { return repo.deleteWorkspace(workspaceId: workspaceId); } @override Future> fetchUserProfile(String userId) { return repo.fetchUserProfile(userId: userId); } @override Future> signOut() { return repo.signOut(); } @override UserProfile get user => repo.user; @override Future, WorkspaceError>> fetchWorkspaces() { return repo.getWorkspaces(); } @override Future> initUser() { return repo.initUser(); } } class IUserWatchImpl extends IUserWatch { StreamSubscription? _subscription; WorkspacesUpdatedCallback? _workspacesUpdated; AuthChangedCallback? _authChanged; UserProfileUpdateCallback? _profileUpdated; late WorkspaceObservableParser _workspaceParser; late UserObservableParser _userParser; late UserProfile _user; IUserWatchImpl({ required UserProfile user, }) { _user = user; } @override void startWatching() { _workspaceParser = WorkspaceObservableParser( id: _user.token, callback: _workspaceObservableCallback); _userParser = UserObservableParser( id: _user.token, callback: _userObservableCallback); _subscription = RustStreamReceiver.listen((observable) { _workspaceParser.parse(observable); _userParser.parse(observable); }); } @override Future stopWatching() async { await _subscription?.cancel(); } @override void setAuthCallback(AuthChangedCallback authCallback) { _authChanged = authCallback; } @override void setProfileCallback(UserProfileUpdateCallback profileCallback) { _profileUpdated = profileCallback; } @override void setWorkspacesCallback(WorkspacesUpdatedCallback workspacesCallback) { _workspacesUpdated = workspacesCallback; } void _workspaceObservableCallback( WorkspaceObservable ty, Either result) { switch (ty) { case WorkspaceObservable.UserCreateWorkspace: case WorkspaceObservable.UserDeleteWorkspace: case WorkspaceObservable.WorkspaceListUpdated: if (_workspacesUpdated != null) { result.fold( (payload) { final workspaces = RepeatedWorkspace.fromBuffer(payload); _workspacesUpdated!(left(workspaces.items)); }, (error) => _workspacesUpdated!(right(error)), ); } break; case WorkspaceObservable.UserUnauthorized: if (_authChanged != null) { result.fold( (_) {}, (error) => { _authChanged!(right(UserError.create() ..code = user_error.ErrorCode.UserUnauthorized)) }, ); } break; default: break; } } void _userObservableCallback( user.UserObservable ty, Either result) { switch (ty) { case user.UserObservable.UserUnauthorized: if (_profileUpdated != null) { result.fold( (payload) { final userProfile = UserProfile.fromBuffer(payload); _profileUpdated!(left(userProfile)); }, (error) => _profileUpdated!(right(error)), ); } break; default: break; } } }