http_ws.rs 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. use crate::ws::connection::{FlowyRawWebSocket, FlowyWebSocket};
  2. use flowy_error::internal_error;
  3. pub use flowy_error::FlowyError;
  4. use lib_infra::future::FutureResult;
  5. pub use lib_ws::{WSConnectState, WSMessageReceiver, WebSocketRawMessage};
  6. use lib_ws::{WSController, WSSender};
  7. use futures_util::future::BoxFuture;
  8. use std::sync::Arc;
  9. use tokio::sync::broadcast::Receiver;
  10. impl FlowyRawWebSocket for Arc<WSController> {
  11. fn initialize(&self) -> FutureResult<(), FlowyError> {
  12. FutureResult::new(async { Ok(()) })
  13. }
  14. fn start_connect(&self, addr: String, _user_id: String) -> FutureResult<(), FlowyError> {
  15. let cloned_ws = self.clone();
  16. FutureResult::new(async move {
  17. let _ = cloned_ws.start(addr).await.map_err(internal_error)?;
  18. Ok(())
  19. })
  20. }
  21. fn stop_connect(&self) -> FutureResult<(), FlowyError> {
  22. let controller = self.clone();
  23. FutureResult::new(async move {
  24. controller.stop().await;
  25. Ok(())
  26. })
  27. }
  28. fn subscribe_connect_state(&self) -> BoxFuture<Receiver<WSConnectState>> {
  29. let cloned_ws = self.clone();
  30. Box::pin(async move { cloned_ws.subscribe_state().await })
  31. }
  32. fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError> {
  33. let cloned_ws = self.clone();
  34. FutureResult::new(async move {
  35. let _ = cloned_ws.retry(count).await.map_err(internal_error)?;
  36. Ok(())
  37. })
  38. }
  39. fn add_msg_receiver(&self, receiver: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
  40. let _ = self.add_ws_message_receiver(receiver).map_err(internal_error)?;
  41. Ok(())
  42. }
  43. fn ws_msg_sender(&self) -> FutureResult<Option<Arc<dyn FlowyWebSocket>>, FlowyError> {
  44. let cloned_self = self.clone();
  45. FutureResult::new(async move {
  46. match cloned_self.ws_message_sender().await.map_err(internal_error)? {
  47. None => Ok(None),
  48. Some(sender) => {
  49. let sender = sender as Arc<dyn FlowyWebSocket>;
  50. Ok(Some(sender))
  51. }
  52. }
  53. })
  54. }
  55. }
  56. impl FlowyWebSocket for WSSender {
  57. fn send(&self, msg: WebSocketRawMessage) -> Result<(), FlowyError> {
  58. let _ = self.send_msg(msg).map_err(internal_error)?;
  59. Ok(())
  60. }
  61. }