manager.rs 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. use crate::{
  2. errors::DocError,
  3. services::{
  4. open_doc::{DocId, OpenedDoc, OpenedDocPersistence},
  5. ws::WsManager,
  6. },
  7. };
  8. use bytes::Bytes;
  9. use dashmap::DashMap;
  10. use flowy_database::ConnectionPool;
  11. use flowy_ot::{core::Delta, errors::OTError};
  12. use parking_lot::RwLock;
  13. use std::{convert::TryInto, fmt::Debug, sync::Arc};
  14. pub(crate) struct OpenedDocManager {
  15. doc_map: DashMap<DocId, Arc<OpenedDoc>>,
  16. ws_manager: Arc<RwLock<WsManager>>,
  17. persistence: Arc<dyn OpenedDocPersistence>,
  18. }
  19. impl OpenedDocManager {
  20. pub(crate) fn new(ws_manager: Arc<RwLock<WsManager>>, persistence: Arc<dyn OpenedDocPersistence>) -> Self {
  21. Self {
  22. doc_map: DashMap::new(),
  23. ws_manager,
  24. persistence,
  25. }
  26. }
  27. #[tracing::instrument(level = "debug", skip(self, data), err)]
  28. pub(crate) fn open<T, D>(&self, id: T, data: D) -> Result<(), DocError>
  29. where
  30. T: Into<DocId> + Debug,
  31. D: TryInto<Delta, Error = OTError>,
  32. {
  33. let doc = Arc::new(OpenedDoc::new(
  34. id.into(),
  35. data.try_into()?,
  36. self.persistence.clone(),
  37. self.ws_manager.read().sender.clone(),
  38. ));
  39. self.ws_manager.write().register_handler(doc.id.as_ref(), doc.clone());
  40. self.doc_map.insert(doc.id.clone(), doc.clone());
  41. Ok(())
  42. }
  43. pub(crate) fn is_opened<T>(&self, id: T) -> bool
  44. where
  45. T: Into<DocId>,
  46. {
  47. let doc_id = id.into();
  48. self.doc_map.get(&doc_id).is_some()
  49. }
  50. #[tracing::instrument(level = "debug", skip(self, changeset, pool), err)]
  51. pub(crate) async fn apply_changeset<T>(&self, id: T, changeset: Bytes, pool: Arc<ConnectionPool>) -> Result<(), DocError>
  52. where
  53. T: Into<DocId> + Debug,
  54. {
  55. let id = id.into();
  56. match self.doc_map.get(&id) {
  57. None => Err(doc_not_found()),
  58. Some(doc) => {
  59. let _ = doc.apply_delta(changeset, pool)?;
  60. Ok(())
  61. },
  62. }
  63. }
  64. pub(crate) async fn read_doc<T>(&self, id: T) -> Result<Vec<u8>, DocError>
  65. where
  66. T: Into<DocId> + Clone,
  67. {
  68. if !self.is_opened(id.clone()) {
  69. return Err(doc_not_found());
  70. }
  71. let doc_id = id.into();
  72. let doc = self.doc_map.get(&doc_id).unwrap();
  73. Ok(doc.data())
  74. }
  75. pub(crate) fn close<T>(&self, id: T) -> Result<(), DocError>
  76. where
  77. T: Into<DocId>,
  78. {
  79. let doc_id = id.into();
  80. self.doc_map.remove(&doc_id);
  81. self.ws_manager.write().remove_handler(doc_id.as_ref());
  82. Ok(())
  83. }
  84. }
  85. fn doc_not_found() -> DocError { DocError::not_found().context("Doc is close or you should call open first") }