rust_stream.dart 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. import 'dart:isolate';
  2. import 'dart:async';
  3. import 'dart:typed_data';
  4. import 'dart:ffi';
  5. import 'package:flowy_sdk/log.dart';
  6. import 'protobuf/dart-notify/subject.pb.dart';
  7. typedef ObserverCallback = void Function(SubscribeObject observable);
  8. class RustStreamReceiver {
  9. static RustStreamReceiver shared = RustStreamReceiver._internal();
  10. late RawReceivePort _ffiPort;
  11. late StreamController<Uint8List> _streamController;
  12. late StreamController<SubscribeObject> _observableController;
  13. late StreamSubscription<Uint8List> _ffiSubscription;
  14. int get port => _ffiPort.sendPort.nativePort;
  15. StreamController<SubscribeObject> get observable => _observableController;
  16. RustStreamReceiver._internal() {
  17. _ffiPort = RawReceivePort();
  18. _streamController = StreamController();
  19. _observableController = StreamController.broadcast();
  20. _ffiPort.handler = _streamController.add;
  21. _ffiSubscription = _streamController.stream.listen(streamCallback);
  22. }
  23. factory RustStreamReceiver() {
  24. return shared;
  25. }
  26. static StreamSubscription<SubscribeObject> listen(void Function(SubscribeObject subject) callback) {
  27. return RustStreamReceiver.shared.observable.stream.listen(callback);
  28. }
  29. void streamCallback(Uint8List bytes) {
  30. try {
  31. final observable = SubscribeObject.fromBuffer(bytes);
  32. _observableController.add(observable);
  33. } catch (e, s) {
  34. Log.error('RustStreamReceiver SubscribeObject deserialize error: ${e.runtimeType}');
  35. Log.error('Stack trace \n $s');
  36. rethrow;
  37. }
  38. }
  39. Future<void> dispose() async {
  40. await _ffiSubscription.cancel();
  41. await _streamController.close();
  42. await _observableController.close();
  43. _ffiPort.close();
  44. }
  45. }