rust_stream.dart 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. import 'dart:isolate';
  2. import 'dart:async';
  3. import 'dart:typed_data';
  4. import 'dart:ffi';
  5. import 'package:flowy_log/flowy_log.dart';
  6. import 'protobuf/flowy-observable/subject.pb.dart';
  7. typedef ObserverCallback = void Function(ObservableSubject observable);
  8. class RustStreamReceiver {
  9. static RustStreamReceiver shared = RustStreamReceiver._internal();
  10. late RawReceivePort _ffiPort;
  11. late StreamController<Uint8List> _streamController;
  12. late StreamController<ObservableSubject> _observableController;
  13. late StreamSubscription<Uint8List> _ffiSubscription;
  14. int get port => _ffiPort.sendPort.nativePort;
  15. StreamController<ObservableSubject> get observable => _observableController;
  16. RustStreamReceiver._internal() {
  17. _ffiPort = RawReceivePort();
  18. _streamController = StreamController();
  19. _observableController = StreamController.broadcast();
  20. _ffiPort.handler = _streamController.add;
  21. _ffiSubscription = _streamController.stream.listen(streamCallback);
  22. }
  23. factory RustStreamReceiver() {
  24. return shared;
  25. }
  26. static listen(void Function(ObservableSubject subject) callback) {
  27. RustStreamReceiver.shared.observable.stream.listen(callback);
  28. }
  29. void streamCallback(Uint8List bytes) {
  30. try {
  31. final observable = ObservableSubject.fromBuffer(bytes);
  32. _observableController.add(observable);
  33. } catch (e, s) {
  34. Log.error(
  35. 'RustStreamReceiver ObservableSubject deserialize error: ${e.runtimeType}');
  36. Log.error('Stack trace \n $s');
  37. rethrow;
  38. }
  39. }
  40. Future<void> dispose() async {
  41. await _ffiSubscription.cancel();
  42. await _streamController.close();
  43. await _observableController.close();
  44. _ffiPort.close();
  45. }
  46. }