ws.rs 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. use crate::{
  2. connect::{WsConnectionFuture, WsStream},
  3. errors::WsError,
  4. WsMessage,
  5. WsModule,
  6. };
  7. use bytes::Bytes;
  8. use dashmap::DashMap;
  9. use flowy_net::errors::{internal_error, ServerError};
  10. use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
  11. use futures_core::{future::BoxFuture, ready, Stream};
  12. use parking_lot::RwLock;
  13. use pin_project::pin_project;
  14. use std::{
  15. collections::HashMap,
  16. convert::TryFrom,
  17. future::Future,
  18. pin::Pin,
  19. sync::Arc,
  20. task::{Context, Poll},
  21. };
  22. use tokio::{
  23. sync::{broadcast, oneshot},
  24. task::JoinHandle,
  25. };
  26. use tokio_tungstenite::tungstenite::{
  27. protocol::{frame::coding::CloseCode, CloseFrame},
  28. Message,
  29. };
  30. pub type MsgReceiver = UnboundedReceiver<Message>;
  31. pub type MsgSender = UnboundedSender<Message>;
  32. type Handlers = DashMap<WsModule, Arc<dyn WsMessageHandler>>;
  33. pub trait WsMessageHandler: Sync + Send + 'static {
  34. fn source(&self) -> WsModule;
  35. fn receive_message(&self, msg: WsMessage);
  36. }
  37. type NotifyCallback = Arc<dyn Fn(&WsState) + Send + Sync + 'static>;
  38. struct WsStateNotify {
  39. #[allow(dead_code)]
  40. state: WsState,
  41. callback: Option<NotifyCallback>,
  42. }
  43. impl WsStateNotify {
  44. fn update_state(&mut self, state: WsState) {
  45. if let Some(f) = &self.callback {
  46. f(&state);
  47. }
  48. self.state = state;
  49. }
  50. }
  51. #[derive(Clone)]
  52. pub enum WsState {
  53. Init,
  54. Connected(Arc<WsSender>),
  55. Disconnected(WsError),
  56. }
  57. pub struct WsController {
  58. handlers: Handlers,
  59. state_notify: Arc<broadcast::Sender<WsState>>,
  60. sender: RwLock<Option<Arc<WsSender>>>,
  61. }
  62. impl WsController {
  63. pub fn new() -> Self {
  64. let (state_notify, _) = broadcast::channel(16);
  65. let controller = Self {
  66. handlers: DashMap::new(),
  67. sender: RwLock::new(None),
  68. state_notify: Arc::new(state_notify),
  69. };
  70. controller
  71. }
  72. pub fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), WsError> {
  73. let source = handler.source();
  74. if self.handlers.contains_key(&source) {
  75. log::error!("WsSource's {:?} is already registered", source);
  76. }
  77. self.handlers.insert(source, handler);
  78. Ok(())
  79. }
  80. pub async fn connect(&self, addr: String) -> Result<(), ServerError> {
  81. let (ret, rx) = oneshot::channel::<Result<(), ServerError>>();
  82. self._connect(addr.clone(), ret);
  83. rx.await?
  84. }
  85. #[allow(dead_code)]
  86. pub fn state_subscribe(&self) -> broadcast::Receiver<WsState> { self.state_notify.subscribe() }
  87. pub fn sender(&self) -> Result<Arc<WsSender>, WsError> {
  88. match &*self.sender.read() {
  89. None => Err(WsError::internal().context("WsSender is not initialized, should call connect first")),
  90. Some(sender) => Ok(sender.clone()),
  91. }
  92. }
  93. fn _connect(&self, addr: String, ret: oneshot::Sender<Result<(), ServerError>>) {
  94. log::debug!("🐴 ws connect: {}", &addr);
  95. let (connection, handlers) = self.make_connect(addr.clone());
  96. let state_notify = self.state_notify.clone();
  97. let sender = self
  98. .sender
  99. .read()
  100. .clone()
  101. .expect("Sender should be not empty after calling make_connect");
  102. tokio::spawn(async move {
  103. match connection.await {
  104. Ok(stream) => {
  105. state_notify.send(WsState::Connected(sender));
  106. ret.send(Ok(()));
  107. spawn_steam_and_handlers(stream, handlers, state_notify).await;
  108. },
  109. Err(e) => {
  110. state_notify.send(WsState::Disconnected(e.clone()));
  111. ret.send(Err(ServerError::internal().context(e)));
  112. },
  113. }
  114. });
  115. }
  116. fn make_connect(&self, addr: String) -> (WsConnectionFuture, WsHandlerFuture) {
  117. // Stream User
  118. // ┌───────────────┐ ┌──────────────┐
  119. // ┌──────┐ │ ┌─────────┐ │ ┌────────┐ │ ┌────────┐ │
  120. // │Server│──────┼─▶│ ws_read │──┼───▶│ msg_tx │───┼─▶│ msg_rx │ │
  121. // └──────┘ │ └─────────┘ │ └────────┘ │ └────────┘ │
  122. // ▲ │ │ │ │
  123. // │ │ ┌─────────┐ │ ┌────────┐ │ ┌────────┐ │
  124. // └─────────┼──│ws_write │◀─┼────│ ws_rx │◀──┼──│ ws_tx │ │
  125. // │ └─────────┘ │ └────────┘ │ └────────┘ │
  126. // └───────────────┘ └──────────────┘
  127. let (msg_tx, msg_rx) = futures_channel::mpsc::unbounded();
  128. let (ws_tx, ws_rx) = futures_channel::mpsc::unbounded();
  129. let handlers = self.handlers.clone();
  130. *self.sender.write() = Some(Arc::new(WsSender { ws_tx }));
  131. (
  132. WsConnectionFuture::new(msg_tx, ws_rx, addr),
  133. WsHandlerFuture::new(handlers, msg_rx),
  134. )
  135. }
  136. }
  137. async fn spawn_steam_and_handlers(
  138. stream: WsStream,
  139. handlers: WsHandlerFuture,
  140. state_notify: Arc<broadcast::Sender<WsState>>,
  141. ) {
  142. tokio::select! {
  143. result = stream => {
  144. match result {
  145. Ok(_) => {},
  146. Err(e) => {
  147. // TODO: retry?
  148. log::error!("ws stream error {:?}", e);
  149. state_notify.send(WsState::Disconnected(e));
  150. }
  151. }
  152. },
  153. result = handlers => log::debug!("handlers completed {:?}", result),
  154. };
  155. }
  156. #[pin_project]
  157. pub struct WsHandlerFuture {
  158. #[pin]
  159. msg_rx: MsgReceiver,
  160. // Opti: Hashmap would be better
  161. handlers: Handlers,
  162. }
  163. impl WsHandlerFuture {
  164. fn new(handlers: Handlers, msg_rx: MsgReceiver) -> Self { Self { msg_rx, handlers } }
  165. fn handler_ws_message(&self, message: Message) {
  166. match message {
  167. Message::Binary(bytes) => self.handle_binary_message(bytes),
  168. _ => {},
  169. }
  170. }
  171. fn handle_binary_message(&self, bytes: Vec<u8>) {
  172. let bytes = Bytes::from(bytes);
  173. match WsMessage::try_from(bytes) {
  174. Ok(message) => match self.handlers.get(&message.module) {
  175. None => log::error!("Can't find any handler for message: {:?}", message),
  176. Some(handler) => handler.receive_message(message.clone()),
  177. },
  178. Err(e) => {
  179. log::error!("Deserialize binary ws message failed: {:?}", e);
  180. },
  181. }
  182. }
  183. }
  184. impl Future for WsHandlerFuture {
  185. type Output = ();
  186. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  187. loop {
  188. match ready!(self.as_mut().project().msg_rx.poll_next(cx)) {
  189. None => {
  190. return Poll::Ready(());
  191. },
  192. Some(message) => self.handler_ws_message(message),
  193. }
  194. }
  195. }
  196. }
  197. #[derive(Debug, Clone)]
  198. pub struct WsSender {
  199. ws_tx: MsgSender,
  200. }
  201. impl WsSender {
  202. pub fn send_msg<T: Into<WsMessage>>(&self, msg: T) -> Result<(), WsError> {
  203. let msg = msg.into();
  204. let _ = self
  205. .ws_tx
  206. .unbounded_send(msg.into())
  207. .map_err(|e| WsError::internal().context(e))?;
  208. Ok(())
  209. }
  210. pub fn send_text(&self, source: &WsModule, text: &str) -> Result<(), WsError> {
  211. let msg = WsMessage {
  212. module: source.clone(),
  213. data: text.as_bytes().to_vec(),
  214. };
  215. self.send_msg(msg)
  216. }
  217. pub fn send_binary(&self, source: &WsModule, bytes: Vec<u8>) -> Result<(), WsError> {
  218. let msg = WsMessage {
  219. module: source.clone(),
  220. data: bytes,
  221. };
  222. self.send_msg(msg)
  223. }
  224. pub fn send_disconnect(&self, reason: &str) -> Result<(), WsError> {
  225. let frame = CloseFrame {
  226. code: CloseCode::Normal,
  227. reason: reason.to_owned().into(),
  228. };
  229. let msg = Message::Close(Some(frame));
  230. let _ = self
  231. .ws_tx
  232. .unbounded_send(msg)
  233. .map_err(|e| WsError::internal().context(e))?;
  234. Ok(())
  235. }
  236. }
  237. // #[cfg(test)]
  238. // mod tests {
  239. // use super::WsController;
  240. //
  241. // #[tokio::test]
  242. // async fn connect() {
  243. // std::env::set_var("RUST_LOG", "Debug");
  244. // env_logger::init();
  245. //
  246. // let mut controller = WsController::new();
  247. // let addr = format!("{}/123", flowy_net::config::WS_ADDR.as_str());
  248. // let (a, b) = controller.make_connect(addr);
  249. // tokio::select! {
  250. // r = a => println!("write completed {:?}", r),
  251. // _ = b => println!("read completed"),
  252. // };
  253. // }
  254. // }