manager.rs 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. use collab::preclude::{Collab, CollabBuilder};
  2. use collab_persistence::kv::rocks_kv::RocksCollabDB;
  3. use collab_plugins::disk_plugin::rocksdb::RocksdbDiskPlugin;
  4. use parking_lot::RwLock;
  5. use std::{collections::HashMap, sync::Arc};
  6. use flowy_error::{FlowyError, FlowyResult};
  7. use crate::{
  8. document::{Document, DocumentDataWrapper},
  9. entities::DocEventPB,
  10. notification::{send_notification, DocumentNotification},
  11. };
  12. pub trait DocumentUser: Send + Sync {
  13. fn user_id(&self) -> Result<i64, FlowyError>;
  14. fn token(&self) -> Result<String, FlowyError>; // unused now.
  15. fn kv_db(&self) -> Result<Arc<RocksCollabDB>, FlowyError>;
  16. }
  17. pub struct DocumentManager {
  18. documents: Arc<RwLock<HashMap<String, Arc<Document>>>>,
  19. user: Arc<dyn DocumentUser>,
  20. }
  21. // unsafe impl Send for DocumentManager {}
  22. // unsafe impl Sync for DocumentManager {}
  23. impl DocumentManager {
  24. pub fn new(user: Arc<dyn DocumentUser>) -> Self {
  25. Self {
  26. documents: Default::default(),
  27. user,
  28. }
  29. }
  30. pub fn create_document(
  31. &self,
  32. doc_id: String,
  33. data: DocumentDataWrapper,
  34. ) -> FlowyResult<Arc<Document>> {
  35. let collab = self.get_collab_for_doc_id(&doc_id)?;
  36. let document = Arc::new(Document::create_with_data(collab, data.0)?);
  37. self.documents.write().insert(doc_id, document.clone());
  38. Ok(document)
  39. }
  40. pub fn open_document(&self, doc_id: String) -> FlowyResult<Arc<Document>> {
  41. if let Some(doc) = self.documents.read().get(&doc_id) {
  42. return Ok(doc.clone());
  43. }
  44. tracing::debug!("open_document: {:?}", &doc_id);
  45. let collab = self.get_collab_for_doc_id(&doc_id)?;
  46. let document = Arc::new(Document::new(collab)?);
  47. let clone_doc_id = doc_id.clone();
  48. document
  49. .lock()
  50. .open(move |events, is_remote| {
  51. tracing::debug!("data_change: {:?}, from remote: {}", &events, is_remote);
  52. send_notification(&clone_doc_id, DocumentNotification::DidReceiveUpdate)
  53. .payload::<DocEventPB>((events, is_remote).into())
  54. .send();
  55. })
  56. .map_err(|err| FlowyError::internal().context(err))?;
  57. self.documents.write().insert(doc_id, document.clone());
  58. Ok(document)
  59. }
  60. pub fn close_document(&self, doc_id: String) -> FlowyResult<()> {
  61. self.documents.write().remove(&doc_id);
  62. Ok(())
  63. }
  64. fn get_collab_for_doc_id(&self, doc_id: &str) -> Result<Collab, FlowyError> {
  65. let uid = self.user.user_id()?;
  66. let kv_db = self.user.kv_db()?;
  67. let mut collab = CollabBuilder::new(uid, doc_id).build();
  68. let disk_plugin = Arc::new(
  69. RocksdbDiskPlugin::new(uid, kv_db).map_err(|err| FlowyError::internal().context(err))?,
  70. );
  71. collab.add_plugin(disk_plugin);
  72. collab.initial();
  73. Ok(collab)
  74. }
  75. }