manager.rs 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. use crate::{
  2. entities::NetworkType,
  3. services::ws::{local_web_socket, FlowyWebSocket, FlowyWsSender},
  4. };
  5. use flowy_error::{internal_error, FlowyError};
  6. use lib_infra::future::FutureResult;
  7. use lib_ws::{WSConnectState, WSController, WSMessageReceiver, WSSender, WebScoketRawMessage};
  8. use parking_lot::RwLock;
  9. use std::sync::Arc;
  10. use tokio::sync::{broadcast, broadcast::Receiver};
  11. pub struct WsManager {
  12. inner: Arc<dyn FlowyWebSocket>,
  13. connect_type: RwLock<NetworkType>,
  14. status_notifier: broadcast::Sender<NetworkType>,
  15. addr: String,
  16. }
  17. impl WsManager {
  18. pub fn new(addr: String) -> Self {
  19. let ws: Arc<dyn FlowyWebSocket> = if cfg!(feature = "http_server") {
  20. Arc::new(Arc::new(WSController::new()))
  21. } else {
  22. local_web_socket()
  23. };
  24. let (status_notifier, _) = broadcast::channel(10);
  25. WsManager {
  26. inner: ws,
  27. connect_type: RwLock::new(NetworkType::default()),
  28. status_notifier,
  29. addr,
  30. }
  31. }
  32. pub async fn start(&self, token: String) -> Result<(), FlowyError> {
  33. let addr = format!("{}/{}", self.addr, token);
  34. self.inner.stop_connect().await?;
  35. let _ = self.inner.start_connect(addr).await?;
  36. Ok(())
  37. }
  38. pub async fn stop(&self) { let _ = self.inner.stop_connect().await; }
  39. pub fn update_network_type(&self, new_type: &NetworkType) {
  40. tracing::debug!("Network new state: {:?}", new_type);
  41. let old_type = self.connect_type.read().clone();
  42. let _ = self.status_notifier.send(new_type.clone());
  43. if &old_type != new_type {
  44. tracing::debug!("Connect type switch from {:?} to {:?}", old_type, new_type);
  45. match (old_type.is_connect(), new_type.is_connect()) {
  46. (false, true) => {
  47. let ws_controller = self.inner.clone();
  48. tokio::spawn(async move { retry_connect(ws_controller, 100).await });
  49. },
  50. (true, false) => {
  51. //
  52. },
  53. _ => {},
  54. }
  55. *self.connect_type.write() = new_type.clone();
  56. }
  57. }
  58. pub fn subscribe_websocket_state(&self) -> broadcast::Receiver<WSConnectState> {
  59. self.inner.subscribe_connect_state()
  60. }
  61. pub fn subscribe_network_ty(&self) -> broadcast::Receiver<NetworkType> { self.status_notifier.subscribe() }
  62. pub fn add_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
  63. let _ = self.inner.add_message_receiver(handler)?;
  64. Ok(())
  65. }
  66. pub fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError> { self.inner.ws_sender() }
  67. }
  68. #[tracing::instrument(level = "debug", skip(manager))]
  69. pub fn listen_on_websocket(manager: Arc<WsManager>) {
  70. if cfg!(feature = "http_server") {
  71. let ws = manager.inner.clone();
  72. let mut notify = manager.inner.subscribe_connect_state();
  73. let _ = tokio::spawn(async move {
  74. loop {
  75. match notify.recv().await {
  76. Ok(state) => {
  77. tracing::info!("Websocket state changed: {}", state);
  78. match state {
  79. WSConnectState::Init => {},
  80. WSConnectState::Connected => {},
  81. WSConnectState::Connecting => {},
  82. WSConnectState::Disconnected => retry_connect(ws.clone(), 100).await,
  83. }
  84. },
  85. Err(e) => {
  86. tracing::error!("Websocket state notify error: {:?}", e);
  87. break;
  88. },
  89. }
  90. }
  91. });
  92. } else {
  93. // do nothing
  94. };
  95. }
  96. async fn retry_connect(ws: Arc<dyn FlowyWebSocket>, count: usize) {
  97. match ws.reconnect(count).await {
  98. Ok(_) => {},
  99. Err(e) => {
  100. tracing::error!("websocket connect failed: {:?}", e);
  101. },
  102. }
  103. }
  104. impl FlowyWebSocket for Arc<WSController> {
  105. fn start_connect(&self, addr: String) -> FutureResult<(), FlowyError> {
  106. let cloned_ws = self.clone();
  107. FutureResult::new(async move {
  108. let _ = cloned_ws.start(addr).await.map_err(internal_error)?;
  109. Ok(())
  110. })
  111. }
  112. fn stop_connect(&self) -> FutureResult<(), FlowyError> {
  113. let controller = self.clone();
  114. FutureResult::new(async move {
  115. controller.stop().await;
  116. Ok(())
  117. })
  118. }
  119. fn subscribe_connect_state(&self) -> Receiver<WSConnectState> { self.subscribe_state() }
  120. fn reconnect(&self, count: usize) -> FutureResult<(), FlowyError> {
  121. let cloned_ws = self.clone();
  122. FutureResult::new(async move {
  123. let _ = cloned_ws.retry(count).await.map_err(internal_error)?;
  124. Ok(())
  125. })
  126. }
  127. fn add_message_receiver(&self, handler: Arc<dyn WSMessageReceiver>) -> Result<(), FlowyError> {
  128. let _ = self.add_receiver(handler).map_err(internal_error)?;
  129. Ok(())
  130. }
  131. fn ws_sender(&self) -> Result<Arc<dyn FlowyWsSender>, FlowyError> {
  132. let sender = self.sender().map_err(internal_error)?;
  133. Ok(sender)
  134. }
  135. }
  136. impl FlowyWsSender for WSSender {
  137. fn send(&self, msg: WebScoketRawMessage) -> Result<(), FlowyError> {
  138. let _ = self.send_msg(msg).map_err(internal_error)?;
  139. Ok(())
  140. }
  141. }