stream_sender.rs 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. use crate::entities::SubscribeObject;
  2. use bytes::Bytes;
  3. use lazy_static::lazy_static;
  4. use std::{convert::TryInto, sync::RwLock};
  5. lazy_static! {
  6. static ref DART_STREAM_SENDER: RwLock<DartStreamSender> = RwLock::new(DartStreamSender::new());
  7. }
  8. pub struct DartStreamSender {
  9. #[allow(dead_code)]
  10. isolate: Option<allo_isolate::Isolate>,
  11. }
  12. impl DartStreamSender {
  13. fn new() -> Self {
  14. Self { isolate: None }
  15. }
  16. fn inner_set_port(&mut self, port: i64) {
  17. log::info!("Setup rust to flutter stream with port {}", port);
  18. self.isolate = Some(allo_isolate::Isolate::new(port));
  19. }
  20. #[allow(dead_code)]
  21. fn inner_post(&self, observable_subject: SubscribeObject) -> Result<(), String> {
  22. match self.isolate {
  23. Some(ref isolate) => {
  24. let bytes: Bytes = observable_subject.try_into().unwrap();
  25. isolate.post(bytes.to_vec());
  26. Ok(())
  27. }
  28. None => Err("Isolate is not set".to_owned()),
  29. }
  30. }
  31. pub fn set_port(port: i64) {
  32. match DART_STREAM_SENDER.write() {
  33. Ok(mut stream) => stream.inner_set_port(port),
  34. Err(e) => {
  35. let msg = format!("Get rust to flutter stream lock fail. {:?}", e);
  36. log::error!("{:?}", msg);
  37. }
  38. }
  39. }
  40. pub fn post(_observable_subject: SubscribeObject) -> Result<(), String> {
  41. #[cfg(feature = "dart")]
  42. match DART_STREAM_SENDER.read() {
  43. Ok(stream) => stream.inner_post(_observable_subject),
  44. Err(e) => Err(format!("Get rust to flutter stream lock fail. {:?}", e)),
  45. }
  46. #[cfg(not(feature = "dart"))]
  47. Ok(())
  48. }
  49. }