use crate::{entities::ws::WsDocumentData, errors::DocError}; use bytes::Bytes; use dashmap::DashMap; use flowy_ws::WsState; use std::{collections::HashMap, convert::TryInto, sync::Arc}; use tokio::sync::broadcast::error::RecvError; pub(crate) trait WsDocumentHandler: Send + Sync { fn receive(&self, data: WsDocumentData); fn state_changed(&self, state: &WsState); } pub type WsStateReceiver = tokio::sync::broadcast::Receiver; pub trait DocumentWebSocket: Send + Sync { fn send(&self, data: WsDocumentData) -> Result<(), DocError>; fn state_notify(&self) -> WsStateReceiver; } pub struct WsDocumentManager { ws: Arc, // key: the document id handlers: Arc>>, } impl WsDocumentManager { pub fn new(ws: Arc) -> Self { let handlers: Arc>> = Arc::new(DashMap::new()); listen_ws_state_changed(ws.clone(), handlers.clone()); Self { ws, handlers } } pub(crate) fn register_handler(&self, id: &str, handler: Arc) { if self.handlers.contains_key(id) { log::error!("Duplicate handler registered for {:?}", id); } self.handlers.insert(id.to_string(), handler); } pub(crate) fn remove_handler(&self, id: &str) { self.handlers.remove(id); } pub fn handle_ws_data(&self, data: Bytes) { let data: WsDocumentData = data.try_into().unwrap(); match self.handlers.get(&data.doc_id) { None => { log::error!("Can't find any source handler for {:?}", data.doc_id); }, Some(handler) => { handler.receive(data); }, } } pub fn ws(&self) -> Arc { self.ws.clone() } } #[tracing::instrument(level = "debug", skip(ws, handlers))] fn listen_ws_state_changed(ws: Arc, handlers: Arc>>) { let mut notify = ws.state_notify(); tokio::spawn(async move { loop { match notify.recv().await { Ok(state) => { handlers.iter().for_each(|handle| { handle.value().state_changed(&state); }); }, Err(e) => { log::error!("Websocket state notify error: {:?}", e); break; }, } } }); }