| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 | import 'dart:typed_data';import 'package:app_flowy/workspace/infrastructure/repos/helper.dart';import 'package:dartz/dartz.dart';import 'package:app_flowy/workspace/domain/i_user.dart';import 'package:app_flowy/workspace/infrastructure/repos/user_repo.dart';import 'package:flowy_sdk/protobuf/flowy-observable/protobuf.dart';import 'package:flowy_sdk/protobuf/flowy-user/errors.pb.dart' as user_error;import 'package:flowy_sdk/protobuf/flowy-user/observable.pb.dart' as user;import 'package:flowy_sdk/protobuf/flowy-workspace/errors.pb.dart';import 'package:flowy_sdk/protobuf/flowy-workspace/observable.pb.dart';export 'package:app_flowy/workspace/domain/i_user.dart';export 'package:app_flowy/workspace/infrastructure/repos/user_repo.dart';import 'package:flowy_sdk/rust_stream.dart';import 'dart:async';class IUserImpl extends IUser {  UserRepo repo;  IUserImpl({    required this.repo,  });  @override  Future<Either<Unit, WorkspaceError>> deleteWorkspace(String workspaceId) {    return repo.deleteWorkspace(workspaceId: workspaceId);  }  @override  Future<Either<UserProfile, UserError>> fetchUserProfile(String userId) {    return repo.fetchUserProfile(userId: userId);  }  @override  Future<Either<Unit, UserError>> signOut() {    return repo.signOut();  }  @override  UserProfile get user => repo.user;  @override  Future<Either<List<Workspace>, WorkspaceError>> fetchWorkspaces() {    return repo.getWorkspaces();  }}class IUserWatchImpl extends IUserWatch {  StreamSubscription<ObservableSubject>? _subscription;  WorkspacesUpdatedCallback? _workspacesUpdated;  AuthChangedCallback? _authChanged;  UserProfileUpdateCallback? _profileUpdated;  late WorkspaceObservableParser _workspaceParser;  late UserObservableParser _userParser;  late UserProfile _user;  IUserWatchImpl({    required UserProfile user,  }) {    _user = user;  }  @override  void startWatching() {    _workspaceParser = WorkspaceObservableParser(        id: _user.token, callback: _workspaceObservableCallback);    _userParser = UserObservableParser(        id: _user.token, callback: _userObservableCallback);    _subscription = RustStreamReceiver.listen((observable) {      _workspaceParser.parse(observable);      _userParser.parse(observable);    });  }  @override  Future<void> stopWatching() async {    await _subscription?.cancel();  }  @override  void setAuthCallback(AuthChangedCallback authCallback) {    _authChanged = authCallback;  }  @override  void setProfileCallback(UserProfileUpdateCallback profileCallback) {    _profileUpdated = profileCallback;  }  @override  void setWorkspacesCallback(WorkspacesUpdatedCallback workspacesCallback) {    _workspacesUpdated = workspacesCallback;  }  void _workspaceObservableCallback(      WorkspaceObservable ty, Either<Uint8List, WorkspaceError> result) {    switch (ty) {      case WorkspaceObservable.UserCreateWorkspace:      case WorkspaceObservable.UserDeleteWorkspace:      case WorkspaceObservable.WorkspaceListUpdated:        if (_workspacesUpdated != null) {          result.fold(            (payload) {              final workspaces = RepeatedWorkspace.fromBuffer(payload);              _workspacesUpdated!(left(workspaces.items));            },            (error) => _workspacesUpdated!(right(error)),          );        }        break;      case WorkspaceObservable.UserUnauthorized:        if (_authChanged != null) {          result.fold(            (_) {},            (error) => {              _authChanged!(right(UserError.create()                ..code = user_error.ErrorCode.UserUnauthorized))            },          );        }        break;      default:        break;    }  }  void _userObservableCallback(      user.UserObservable ty, Either<Uint8List, UserError> result) {    switch (ty) {      case user.UserObservable.UserUnauthorized:        if (_profileUpdated != null) {          result.fold(            (payload) {              final userProfile = UserProfile.fromBuffer(payload);              _profileUpdated!(left(userProfile));            },            (error) => _profileUpdated!(right(error)),          );        }        break;      default:        break;    }  }}
 |