connection.rs 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. use futures_util::future::BoxFuture;
  2. use lib_infra::future::FutureResult;
  3. use lib_ws::WSController;
  4. pub use lib_ws::{WSConnectState, WSMessageReceiver, WebSocketRawMessage};
  5. use parking_lot::RwLock;
  6. use serde_repr::*;
  7. use std::sync::Arc;
  8. use thiserror::Error;
  9. use tokio::sync::broadcast;
  10. #[derive(Debug, Clone, PartialEq, Eq, Error, Serialize_repr, Deserialize_repr)]
  11. #[repr(u8)]
  12. pub enum WSErrorCode {
  13. #[error("Internal error")]
  14. Internal = 0,
  15. }
  16. pub trait FlowyRawWebSocket: Send + Sync {
  17. fn initialize(&self) -> FutureResult<(), WSErrorCode>;
  18. fn start_connect(&self, addr: String, user_id: String) -> FutureResult<(), WSErrorCode>;
  19. fn stop_connect(&self) -> FutureResult<(), WSErrorCode>;
  20. fn subscribe_connect_state(&self) -> BoxFuture<broadcast::Receiver<WSConnectState>>;
  21. fn reconnect(&self, count: usize) -> FutureResult<(), WSErrorCode>;
  22. fn add_msg_receiver(&self, receiver: Arc<dyn WSMessageReceiver>) -> Result<(), WSErrorCode>;
  23. fn ws_msg_sender(&self) -> FutureResult<Option<Arc<dyn FlowyWebSocket>>, WSErrorCode>;
  24. }
  25. pub trait FlowyWebSocket: Send + Sync {
  26. fn send(&self, msg: WebSocketRawMessage) -> Result<(), WSErrorCode>;
  27. }
  28. #[derive(Debug, Clone, Eq, PartialEq)]
  29. pub enum NetworkType {
  30. Unknown = 0,
  31. Wifi = 1,
  32. Cell = 2,
  33. Ethernet = 3,
  34. Bluetooth = 4,
  35. VPN = 5,
  36. }
  37. impl std::default::Default for NetworkType {
  38. fn default() -> Self {
  39. NetworkType::Unknown
  40. }
  41. }
  42. impl NetworkType {
  43. pub fn is_connect(&self) -> bool {
  44. !matches!(self, NetworkType::Unknown | NetworkType::Bluetooth)
  45. }
  46. }
  47. pub struct FlowyWebSocketConnect {
  48. inner: Arc<dyn FlowyRawWebSocket>,
  49. connect_type: RwLock<NetworkType>,
  50. status_notifier: broadcast::Sender<NetworkType>,
  51. addr: String,
  52. }
  53. impl FlowyWebSocketConnect {
  54. pub fn new(addr: String) -> Self {
  55. let ws = Arc::new(Arc::new(WSController::new()));
  56. let (status_notifier, _) = broadcast::channel(10);
  57. FlowyWebSocketConnect {
  58. inner: ws,
  59. connect_type: RwLock::new(NetworkType::default()),
  60. status_notifier,
  61. addr,
  62. }
  63. }
  64. pub fn from_local(addr: String, ws: Arc<dyn FlowyRawWebSocket>) -> Self {
  65. let (status_notifier, _) = broadcast::channel(10);
  66. FlowyWebSocketConnect {
  67. inner: ws,
  68. connect_type: RwLock::new(NetworkType::default()),
  69. status_notifier,
  70. addr,
  71. }
  72. }
  73. pub async fn init(&self) {
  74. match self.inner.initialize().await {
  75. Ok(_) => {},
  76. Err(e) => tracing::error!("FlowyWebSocketConnect init error: {:?}", e),
  77. }
  78. }
  79. pub async fn start(&self, token: String, user_id: String) -> Result<(), WSErrorCode> {
  80. let addr = format!("{}/{}", self.addr, &token);
  81. self.inner.stop_connect().await?;
  82. self.inner.start_connect(addr, user_id).await?;
  83. Ok(())
  84. }
  85. pub async fn stop(&self) {
  86. let _ = self.inner.stop_connect().await;
  87. }
  88. pub fn update_network_type(&self, new_type: NetworkType) {
  89. tracing::debug!("Network new state: {:?}", new_type);
  90. let old_type = self.connect_type.read().clone();
  91. let _ = self.status_notifier.send(new_type.clone());
  92. if old_type != new_type {
  93. tracing::debug!("Connect type switch from {:?} to {:?}", old_type, new_type);
  94. match (old_type.is_connect(), new_type.is_connect()) {
  95. (false, true) => {
  96. let ws_controller = self.inner.clone();
  97. tokio::spawn(async move { retry_connect(ws_controller, 100).await });
  98. },
  99. (true, false) => {
  100. //
  101. },
  102. _ => {},
  103. }
  104. *self.connect_type.write() = new_type;
  105. }
  106. }
  107. pub async fn subscribe_websocket_state(&self) -> broadcast::Receiver<WSConnectState> {
  108. self.inner.subscribe_connect_state().await
  109. }
  110. pub fn subscribe_network_ty(&self) -> broadcast::Receiver<NetworkType> {
  111. self.status_notifier.subscribe()
  112. }
  113. pub fn add_ws_message_receiver(
  114. &self,
  115. receiver: Arc<dyn WSMessageReceiver>,
  116. ) -> Result<(), WSErrorCode> {
  117. self.inner.add_msg_receiver(receiver)?;
  118. Ok(())
  119. }
  120. pub async fn web_socket(&self) -> Result<Option<Arc<dyn FlowyWebSocket>>, WSErrorCode> {
  121. self.inner.ws_msg_sender().await
  122. }
  123. }
  124. #[tracing::instrument(level = "debug", skip(ws_conn))]
  125. pub fn listen_on_websocket(ws_conn: Arc<FlowyWebSocketConnect>) {
  126. let raw_web_socket = ws_conn.inner.clone();
  127. let _ = tokio::spawn(async move {
  128. let mut notify = ws_conn.inner.subscribe_connect_state().await;
  129. loop {
  130. match notify.recv().await {
  131. Ok(state) => {
  132. tracing::info!("Websocket state changed: {}", state);
  133. match state {
  134. WSConnectState::Init => {},
  135. WSConnectState::Connected => {},
  136. WSConnectState::Connecting => {},
  137. WSConnectState::Disconnected => retry_connect(raw_web_socket.clone(), 100).await,
  138. }
  139. },
  140. Err(e) => {
  141. tracing::error!("Websocket state notify error: {:?}", e);
  142. break;
  143. },
  144. }
  145. }
  146. });
  147. }
  148. async fn retry_connect(ws: Arc<dyn FlowyRawWebSocket>, count: usize) {
  149. match ws.reconnect(count).await {
  150. Ok(_) => {},
  151. Err(e) => {
  152. tracing::error!("websocket connect failed: {:?}", e);
  153. },
  154. }
  155. }