ws.rs 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. use crate::connection::{FlowyRawWebSocket, FlowyWebSocket};
  2. use crate::WSErrorCode;
  3. use futures_util::future::BoxFuture;
  4. use lib_infra::future::FutureResult;
  5. pub use lib_ws::{WSConnectState, WSMessageReceiver, WebSocketRawMessage};
  6. use lib_ws::{WSController, WSSender};
  7. use std::sync::Arc;
  8. use tokio::sync::broadcast::Receiver;
  9. impl FlowyRawWebSocket for Arc<WSController> {
  10. fn initialize(&self) -> FutureResult<(), WSErrorCode> {
  11. FutureResult::new(async { Ok(()) })
  12. }
  13. fn start_connect(&self, addr: String, _user_id: i64) -> FutureResult<(), WSErrorCode> {
  14. let cloned_ws = self.clone();
  15. FutureResult::new(async move {
  16. cloned_ws.start(addr).await.map_err(internal_error)?;
  17. Ok(())
  18. })
  19. }
  20. fn stop_connect(&self) -> FutureResult<(), WSErrorCode> {
  21. let controller = self.clone();
  22. FutureResult::new(async move {
  23. controller.stop().await;
  24. Ok(())
  25. })
  26. }
  27. fn subscribe_connect_state(&self) -> BoxFuture<Receiver<WSConnectState>> {
  28. let cloned_ws = self.clone();
  29. Box::pin(async move { cloned_ws.subscribe_state().await })
  30. }
  31. fn reconnect(&self, count: usize) -> FutureResult<(), WSErrorCode> {
  32. let cloned_ws = self.clone();
  33. FutureResult::new(async move {
  34. cloned_ws.retry(count).await.map_err(internal_error)?;
  35. Ok(())
  36. })
  37. }
  38. fn add_msg_receiver(&self, receiver: Arc<dyn WSMessageReceiver>) -> Result<(), WSErrorCode> {
  39. self
  40. .add_ws_message_receiver(receiver)
  41. .map_err(internal_error)?;
  42. Ok(())
  43. }
  44. fn ws_msg_sender(&self) -> FutureResult<Option<Arc<dyn FlowyWebSocket>>, WSErrorCode> {
  45. let cloned_self = self.clone();
  46. FutureResult::new(async move {
  47. match cloned_self
  48. .ws_message_sender()
  49. .await
  50. .map_err(internal_error)?
  51. {
  52. None => Ok(None),
  53. Some(sender) => {
  54. let sender = sender as Arc<dyn FlowyWebSocket>;
  55. Ok(Some(sender))
  56. },
  57. }
  58. })
  59. }
  60. }
  61. impl FlowyWebSocket for WSSender {
  62. fn send(&self, msg: WebSocketRawMessage) -> Result<(), WSErrorCode> {
  63. self.send_msg(msg).map_err(internal_error)?;
  64. Ok(())
  65. }
  66. }
  67. fn internal_error<T>(_e: T) -> WSErrorCode
  68. where
  69. T: std::fmt::Debug,
  70. {
  71. WSErrorCode::Internal
  72. }