12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455 |
- import 'dart:isolate';
- import 'dart:async';
- import 'dart:typed_data';
- import 'dart:ffi';
- import 'package:flowy_log/flowy_log.dart';
- import 'protobuf/flowy-dart-notify/subject.pb.dart';
- typedef ObserverCallback = void Function(ObservableSubject observable);
- class RustStreamReceiver {
- static RustStreamReceiver shared = RustStreamReceiver._internal();
- late RawReceivePort _ffiPort;
- late StreamController<Uint8List> _streamController;
- late StreamController<ObservableSubject> _observableController;
- late StreamSubscription<Uint8List> _ffiSubscription;
- int get port => _ffiPort.sendPort.nativePort;
- StreamController<ObservableSubject> get observable => _observableController;
- RustStreamReceiver._internal() {
- _ffiPort = RawReceivePort();
- _streamController = StreamController();
- _observableController = StreamController.broadcast();
- _ffiPort.handler = _streamController.add;
- _ffiSubscription = _streamController.stream.listen(streamCallback);
- }
- factory RustStreamReceiver() {
- return shared;
- }
- static listen(void Function(ObservableSubject subject) callback) {
- RustStreamReceiver.shared.observable.stream.listen(callback);
- }
- void streamCallback(Uint8List bytes) {
- try {
- final observable = ObservableSubject.fromBuffer(bytes);
- _observableController.add(observable);
- } catch (e, s) {
- Log.error(
- 'RustStreamReceiver ObservableSubject deserialize error: ${e.runtimeType}');
- Log.error('Stack trace \n $s');
- rethrow;
- }
- }
- Future<void> dispose() async {
- await _ffiSubscription.cancel();
- await _streamController.close();
- await _observableController.close();
- _ffiPort.close();
- }
- }
|