| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556 | import 'dart:isolate';import 'dart:async';import 'dart:typed_data';import 'dart:ffi';import 'package:appflowy_backend/log.dart';import 'protobuf/flowy-notification/subject.pb.dart';typedef ObserverCallback = void Function(SubscribeObject observable);class RustStreamReceiver {  static RustStreamReceiver shared = RustStreamReceiver._internal();  late RawReceivePort _ffiPort;  late StreamController<Uint8List> _streamController;  late StreamController<SubscribeObject> _observableController;  late StreamSubscription<Uint8List> _ffiSubscription;  int get port => _ffiPort.sendPort.nativePort;  StreamController<SubscribeObject> get observable => _observableController;  RustStreamReceiver._internal() {    _ffiPort = RawReceivePort();    _streamController = StreamController();    _observableController = StreamController.broadcast();    _ffiPort.handler = _streamController.add;    _ffiSubscription = _streamController.stream.listen(_streamCallback);  }  factory RustStreamReceiver() {    return shared;  }  static StreamSubscription<SubscribeObject> listen(      void Function(SubscribeObject subject) callback) {    return RustStreamReceiver.shared.observable.stream.listen(callback);  }  void _streamCallback(Uint8List bytes) {    try {      final observable = SubscribeObject.fromBuffer(bytes);      _observableController.add(observable);    } catch (e, s) {      Log.error(          'RustStreamReceiver SubscribeObject deserialize error: ${e.runtimeType}');      Log.error('Stack trace \n $s');      rethrow;    }  }  Future<void> dispose() async {    await _ffiSubscription.cancel();    await _streamController.close();    await _observableController.close();    _ffiPort.close();  }}
 |