ws.rs 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. use crate::{
  2. connect::{WsConnectionFuture, WsStream},
  3. errors::WsError,
  4. WsMessage,
  5. WsModule,
  6. };
  7. use bytes::Bytes;
  8. use dashmap::DashMap;
  9. use flowy_net::errors::ServerError;
  10. use futures_channel::mpsc::{UnboundedReceiver, UnboundedSender};
  11. use futures_core::{ready, Stream};
  12. use parking_lot::RwLock;
  13. use pin_project::pin_project;
  14. use std::{
  15. convert::TryFrom,
  16. future::Future,
  17. pin::Pin,
  18. sync::Arc,
  19. task::{Context, Poll},
  20. };
  21. use tokio::sync::{broadcast, oneshot};
  22. use tokio_tungstenite::tungstenite::{
  23. protocol::{frame::coding::CloseCode, CloseFrame},
  24. Message,
  25. };
  26. pub type MsgReceiver = UnboundedReceiver<Message>;
  27. pub type MsgSender = UnboundedSender<Message>;
  28. type Handlers = DashMap<WsModule, Arc<dyn WsMessageHandler>>;
  29. pub trait WsMessageHandler: Sync + Send + 'static {
  30. fn source(&self) -> WsModule;
  31. fn receive_message(&self, msg: WsMessage);
  32. }
  33. #[derive(Clone)]
  34. pub enum WsState {
  35. Init,
  36. Connected(Arc<WsSender>),
  37. Disconnected(WsError),
  38. }
  39. pub struct WsController {
  40. handlers: Handlers,
  41. state_notify: Arc<broadcast::Sender<WsState>>,
  42. sender: RwLock<Option<Arc<WsSender>>>,
  43. }
  44. impl WsController {
  45. pub fn new() -> Self {
  46. let (state_notify, _) = broadcast::channel(16);
  47. let controller = Self {
  48. handlers: DashMap::new(),
  49. sender: RwLock::new(None),
  50. state_notify: Arc::new(state_notify),
  51. };
  52. controller
  53. }
  54. pub fn add_handler(&self, handler: Arc<dyn WsMessageHandler>) -> Result<(), WsError> {
  55. let source = handler.source();
  56. if self.handlers.contains_key(&source) {
  57. log::error!("WsSource's {:?} is already registered", source);
  58. }
  59. self.handlers.insert(source, handler);
  60. Ok(())
  61. }
  62. pub async fn connect(&self, addr: String) -> Result<(), ServerError> {
  63. let (ret, rx) = oneshot::channel::<Result<(), ServerError>>();
  64. self._connect(addr.clone(), ret);
  65. rx.await?
  66. }
  67. #[allow(dead_code)]
  68. pub fn state_subscribe(&self) -> broadcast::Receiver<WsState> { self.state_notify.subscribe() }
  69. pub fn sender(&self) -> Result<Arc<WsSender>, WsError> {
  70. match &*self.sender.read() {
  71. None => Err(WsError::internal().context("WsSender is not initialized, should call connect first")),
  72. Some(sender) => Ok(sender.clone()),
  73. }
  74. }
  75. fn _connect(&self, addr: String, ret: oneshot::Sender<Result<(), ServerError>>) {
  76. log::debug!("๐Ÿด ws connect: {}", &addr);
  77. let (connection, handlers) = self.make_connect(addr.clone());
  78. let state_notify = self.state_notify.clone();
  79. let sender = self
  80. .sender
  81. .read()
  82. .clone()
  83. .expect("Sender should be not empty after calling make_connect");
  84. tokio::spawn(async move {
  85. match connection.await {
  86. Ok(stream) => {
  87. let _ = state_notify.send(WsState::Connected(sender));
  88. let _ = ret.send(Ok(()));
  89. spawn_steam_and_handlers(stream, handlers, state_notify).await;
  90. },
  91. Err(e) => {
  92. let _ = state_notify.send(WsState::Disconnected(e.clone()));
  93. let _ = ret.send(Err(ServerError::internal().context(e)));
  94. },
  95. }
  96. });
  97. }
  98. fn make_connect(&self, addr: String) -> (WsConnectionFuture, WsHandlerFuture) {
  99. // Stream User
  100. // โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
  101. // โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
  102. // โ”‚Serverโ”‚โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ–ถโ”‚ ws_read โ”‚โ”€โ”€โ”ผโ”€โ”€โ”€โ–ถโ”‚ msg_tx โ”‚โ”€โ”€โ”€โ”ผโ”€โ–ถโ”‚ msg_rx โ”‚ โ”‚
  103. // โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
  104. // โ–ฒ โ”‚ โ”‚ โ”‚ โ”‚
  105. // โ”‚ โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
  106. // โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”‚ws_write โ”‚โ—€โ”€โ”ผโ”€โ”€โ”€โ”€โ”‚ ws_rx โ”‚โ—€โ”€โ”€โ”ผโ”€โ”€โ”‚ ws_tx โ”‚ โ”‚
  107. // โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
  108. // โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
  109. let (msg_tx, msg_rx) = futures_channel::mpsc::unbounded();
  110. let (ws_tx, ws_rx) = futures_channel::mpsc::unbounded();
  111. let handlers = self.handlers.clone();
  112. *self.sender.write() = Some(Arc::new(WsSender { ws_tx }));
  113. (
  114. WsConnectionFuture::new(msg_tx, ws_rx, addr),
  115. WsHandlerFuture::new(handlers, msg_rx),
  116. )
  117. }
  118. }
  119. async fn spawn_steam_and_handlers(
  120. stream: WsStream,
  121. handlers: WsHandlerFuture,
  122. state_notify: Arc<broadcast::Sender<WsState>>,
  123. ) {
  124. tokio::select! {
  125. result = stream => {
  126. match result {
  127. Ok(_) => {},
  128. Err(e) => {
  129. // TODO: retry?
  130. log::error!("ws stream error {:?}", e);
  131. let _ = state_notify.send(WsState::Disconnected(e));
  132. }
  133. }
  134. },
  135. result = handlers => log::debug!("handlers completed {:?}", result),
  136. };
  137. }
  138. #[pin_project]
  139. pub struct WsHandlerFuture {
  140. #[pin]
  141. msg_rx: MsgReceiver,
  142. // Opti: Hashmap would be better
  143. handlers: Handlers,
  144. }
  145. impl WsHandlerFuture {
  146. fn new(handlers: Handlers, msg_rx: MsgReceiver) -> Self { Self { msg_rx, handlers } }
  147. fn handler_ws_message(&self, message: Message) {
  148. match message {
  149. Message::Binary(bytes) => self.handle_binary_message(bytes),
  150. _ => {},
  151. }
  152. }
  153. fn handle_binary_message(&self, bytes: Vec<u8>) {
  154. let bytes = Bytes::from(bytes);
  155. match WsMessage::try_from(bytes) {
  156. Ok(message) => match self.handlers.get(&message.module) {
  157. None => log::error!("Can't find any handler for message: {:?}", message),
  158. Some(handler) => handler.receive_message(message.clone()),
  159. },
  160. Err(e) => {
  161. log::error!("Deserialize binary ws message failed: {:?}", e);
  162. },
  163. }
  164. }
  165. }
  166. impl Future for WsHandlerFuture {
  167. type Output = ();
  168. fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
  169. loop {
  170. match ready!(self.as_mut().project().msg_rx.poll_next(cx)) {
  171. None => {
  172. return Poll::Ready(());
  173. },
  174. Some(message) => self.handler_ws_message(message),
  175. }
  176. }
  177. }
  178. }
  179. #[derive(Debug, Clone)]
  180. pub struct WsSender {
  181. ws_tx: MsgSender,
  182. }
  183. impl WsSender {
  184. pub fn send_msg<T: Into<WsMessage>>(&self, msg: T) -> Result<(), WsError> {
  185. let msg = msg.into();
  186. let _ = self
  187. .ws_tx
  188. .unbounded_send(msg.into())
  189. .map_err(|e| WsError::internal().context(e))?;
  190. Ok(())
  191. }
  192. pub fn send_text(&self, source: &WsModule, text: &str) -> Result<(), WsError> {
  193. let msg = WsMessage {
  194. module: source.clone(),
  195. data: text.as_bytes().to_vec(),
  196. };
  197. self.send_msg(msg)
  198. }
  199. pub fn send_binary(&self, source: &WsModule, bytes: Vec<u8>) -> Result<(), WsError> {
  200. let msg = WsMessage {
  201. module: source.clone(),
  202. data: bytes,
  203. };
  204. self.send_msg(msg)
  205. }
  206. pub fn send_disconnect(&self, reason: &str) -> Result<(), WsError> {
  207. let frame = CloseFrame {
  208. code: CloseCode::Normal,
  209. reason: reason.to_owned().into(),
  210. };
  211. let msg = Message::Close(Some(frame));
  212. let _ = self
  213. .ws_tx
  214. .unbounded_send(msg)
  215. .map_err(|e| WsError::internal().context(e))?;
  216. Ok(())
  217. }
  218. }
  219. // #[cfg(test)]
  220. // mod tests {
  221. // use super::WsController;
  222. //
  223. // #[tokio::test]
  224. // async fn connect() {
  225. // std::env::set_var("RUST_LOG", "Debug");
  226. // env_logger::init();
  227. //
  228. // let mut controller = WsController::new();
  229. // let addr = format!("{}/123", flowy_net::config::WS_ADDR.as_str());
  230. // let (a, b) = controller.make_connect(addr);
  231. // tokio::select! {
  232. // r = a => println!("write completed {:?}", r),
  233. // _ = b => println!("read completed"),
  234. // };
  235. // }
  236. // }