use crate::{ errors::DocError, services::{ open_doc::{DocId, OpenedDoc, OpenedDocPersistence}, ws::WsManager, }, }; use bytes::Bytes; use dashmap::DashMap; use flowy_database::ConnectionPool; use flowy_ot::{core::Delta, errors::OTError}; use parking_lot::RwLock; use std::{convert::TryInto, fmt::Debug, sync::Arc}; pub(crate) struct OpenedDocManager { doc_map: DashMap>, ws_manager: Arc>, persistence: Arc, } impl OpenedDocManager { pub(crate) fn new(ws_manager: Arc>, persistence: Arc) -> Self { Self { doc_map: DashMap::new(), ws_manager, persistence, } } #[tracing::instrument(level = "debug", skip(self, data), err)] pub(crate) fn open(&self, id: T, data: D) -> Result<(), DocError> where T: Into + Debug, D: TryInto, { let doc = Arc::new(OpenedDoc::new( id.into(), data.try_into()?, self.persistence.clone(), self.ws_manager.read().sender.clone(), )); self.ws_manager.write().register_handler(doc.id.as_ref(), doc.clone()); self.doc_map.insert(doc.id.clone(), doc.clone()); Ok(()) } pub(crate) fn is_opened(&self, id: T) -> bool where T: Into, { let doc_id = id.into(); self.doc_map.get(&doc_id).is_some() } #[tracing::instrument(level = "debug", skip(self, changeset, pool), err)] pub(crate) async fn apply_changeset(&self, id: T, changeset: Bytes, pool: Arc) -> Result<(), DocError> where T: Into + Debug, { let id = id.into(); match self.doc_map.get(&id) { None => Err(doc_not_found()), Some(doc) => { let _ = doc.apply_delta(changeset, pool)?; Ok(()) }, } } pub(crate) async fn read_doc(&self, id: T) -> Result, DocError> where T: Into + Clone, { if !self.is_opened(id.clone()) { return Err(doc_not_found()); } let doc_id = id.into(); let doc = self.doc_map.get(&doc_id).unwrap(); Ok(doc.data()) } pub(crate) fn close(&self, id: T) -> Result<(), DocError> where T: Into, { let doc_id = id.into(); self.doc_map.remove(&doc_id); self.ws_manager.write().remove_handler(doc_id.as_ref()); Ok(()) } } fn doc_not_found() -> DocError { DocError::not_found().context("Doc is close or you should call open first") }