12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- use crate::{entities::ws::WsDocumentData, errors::DocError};
- use bytes::Bytes;
- use dashmap::DashMap;
- use flowy_ws::WsState;
- use std::{collections::HashMap, convert::TryInto, sync::Arc};
- use tokio::sync::broadcast::error::RecvError;
- pub(crate) trait WsDocumentHandler: Send + Sync {
- fn receive(&self, data: WsDocumentData);
- fn state_changed(&self, state: &WsState);
- }
- pub type WsStateReceiver = tokio::sync::broadcast::Receiver<WsState>;
- pub trait DocumentWebSocket: Send + Sync {
- fn send(&self, data: WsDocumentData) -> Result<(), DocError>;
- fn state_notify(&self) -> WsStateReceiver;
- }
- pub struct WsDocumentManager {
- ws: Arc<dyn DocumentWebSocket>,
- // key: the document id
- handlers: Arc<DashMap<String, Arc<dyn WsDocumentHandler>>>,
- }
- impl WsDocumentManager {
- pub fn new(ws: Arc<dyn DocumentWebSocket>) -> Self {
- let handlers: Arc<DashMap<String, Arc<dyn WsDocumentHandler>>> = Arc::new(DashMap::new());
- listen_ws_state_changed(ws.clone(), handlers.clone());
- Self { ws, handlers }
- }
- pub(crate) fn register_handler(&self, id: &str, handler: Arc<dyn WsDocumentHandler>) {
- if self.handlers.contains_key(id) {
- log::error!("Duplicate handler registered for {:?}", id);
- }
- self.handlers.insert(id.to_string(), handler);
- }
- pub(crate) fn remove_handler(&self, id: &str) { self.handlers.remove(id); }
- pub fn handle_ws_data(&self, data: Bytes) {
- let data: WsDocumentData = data.try_into().unwrap();
- match self.handlers.get(&data.doc_id) {
- None => {
- log::error!("Can't find any source handler for {:?}", data.doc_id);
- },
- Some(handler) => {
- handler.receive(data);
- },
- }
- }
- pub fn ws(&self) -> Arc<dyn DocumentWebSocket> { self.ws.clone() }
- }
- #[tracing::instrument(level = "debug", skip(ws, handlers))]
- fn listen_ws_state_changed(ws: Arc<dyn DocumentWebSocket>, handlers: Arc<DashMap<String, Arc<dyn WsDocumentHandler>>>) {
- let mut notify = ws.state_notify();
- tokio::spawn(async move {
- loop {
- match notify.recv().await {
- Ok(state) => {
- handlers.iter().for_each(|handle| {
- handle.value().state_changed(&state);
- });
- },
- Err(e) => {
- log::error!("Websocket state notify error: {:?}", e);
- break;
- },
- }
- }
- });
- }
|