rust_stream.dart 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. import 'dart:isolate';
  2. import 'dart:async';
  3. import 'dart:typed_data';
  4. import 'dart:ffi';
  5. import 'package:appflowy_backend/log.dart';
  6. import 'protobuf/flowy-notification/subject.pb.dart';
  7. typedef ObserverCallback = void Function(SubscribeObject observable);
  8. class RustStreamReceiver {
  9. static RustStreamReceiver shared = RustStreamReceiver._internal();
  10. late RawReceivePort _ffiPort;
  11. late StreamController<Uint8List> _streamController;
  12. late StreamController<SubscribeObject> _observableController;
  13. late StreamSubscription<Uint8List> _ffiSubscription;
  14. int get port => _ffiPort.sendPort.nativePort;
  15. StreamController<SubscribeObject> get observable => _observableController;
  16. RustStreamReceiver._internal() {
  17. _ffiPort = RawReceivePort();
  18. _streamController = StreamController();
  19. _observableController = StreamController.broadcast();
  20. _ffiPort.handler = _streamController.add;
  21. _ffiSubscription = _streamController.stream.listen(streamCallback);
  22. }
  23. factory RustStreamReceiver() {
  24. return shared;
  25. }
  26. static StreamSubscription<SubscribeObject> listen(
  27. void Function(SubscribeObject subject) callback) {
  28. return RustStreamReceiver.shared.observable.stream.listen(callback);
  29. }
  30. void streamCallback(Uint8List bytes) {
  31. try {
  32. final observable = SubscribeObject.fromBuffer(bytes);
  33. _observableController.add(observable);
  34. } catch (e, s) {
  35. Log.error(
  36. 'RustStreamReceiver SubscribeObject deserialize error: ${e.runtimeType}');
  37. Log.error('Stack trace \n $s');
  38. rethrow;
  39. }
  40. }
  41. Future<void> dispose() async {
  42. await _ffiSubscription.cancel();
  43. await _streamController.close();
  44. await _observableController.close();
  45. _ffiPort.close();
  46. }
  47. }