manager.rs 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. use std::sync::Weak;
  2. use std::{collections::HashMap, sync::Arc};
  3. use appflowy_integrate::collab_builder::AppFlowyCollabBuilder;
  4. use appflowy_integrate::{CollabType, RocksCollabDB};
  5. use collab::core::collab::MutexCollab;
  6. use collab_document::blocks::DocumentData;
  7. use collab_document::document::Document;
  8. use collab_document::document_data::default_document_data;
  9. use collab_document::YrsDocAction;
  10. use parking_lot::RwLock;
  11. use flowy_document_deps::cloud::DocumentCloudService;
  12. use flowy_error::{internal_error, FlowyError, FlowyResult};
  13. use crate::document::MutexDocument;
  14. use crate::entities::DocumentSnapshotPB;
  15. pub trait DocumentUser: Send + Sync {
  16. fn user_id(&self) -> Result<i64, FlowyError>;
  17. fn token(&self) -> Result<Option<String>, FlowyError>; // unused now.
  18. fn collab_db(&self, uid: i64) -> Result<Weak<RocksCollabDB>, FlowyError>;
  19. }
  20. pub struct DocumentManager {
  21. pub user: Arc<dyn DocumentUser>,
  22. collab_builder: Arc<AppFlowyCollabBuilder>,
  23. documents: Arc<RwLock<HashMap<String, Arc<MutexDocument>>>>,
  24. #[allow(dead_code)]
  25. cloud_service: Arc<dyn DocumentCloudService>,
  26. }
  27. impl DocumentManager {
  28. pub fn new(
  29. user: Arc<dyn DocumentUser>,
  30. collab_builder: Arc<AppFlowyCollabBuilder>,
  31. cloud_service: Arc<dyn DocumentCloudService>,
  32. ) -> Self {
  33. Self {
  34. user,
  35. collab_builder,
  36. documents: Default::default(),
  37. cloud_service,
  38. }
  39. }
  40. pub async fn initialize(&self, _uid: i64, _workspace_id: String) -> FlowyResult<()> {
  41. self.documents.write().clear();
  42. Ok(())
  43. }
  44. pub async fn initialize_with_new_user(&self, uid: i64, workspace_id: String) -> FlowyResult<()> {
  45. self.initialize(uid, workspace_id).await?;
  46. Ok(())
  47. }
  48. /// Create a new document.
  49. ///
  50. /// if the document already exists, return the existing document.
  51. /// if the data is None, will create a document with default data.
  52. pub fn create_document(
  53. &self,
  54. uid: i64,
  55. doc_id: &str,
  56. data: Option<DocumentData>,
  57. ) -> FlowyResult<Arc<MutexDocument>> {
  58. tracing::trace!("create a document: {:?}", doc_id);
  59. let collab = self.collab_for_document(uid, doc_id, vec![])?;
  60. let data = data.unwrap_or_else(default_document_data);
  61. let document = Arc::new(MutexDocument::create_with_data(collab, data)?);
  62. Ok(document)
  63. }
  64. /// Return the document
  65. #[tracing::instrument(level = "debug", skip_all)]
  66. pub async fn get_document(&self, doc_id: &str) -> FlowyResult<Arc<MutexDocument>> {
  67. if let Some(doc) = self.documents.read().get(doc_id) {
  68. return Ok(doc.clone());
  69. }
  70. let mut updates = vec![];
  71. if !self.is_doc_exist(doc_id)? {
  72. // Try to get the document from the cloud service
  73. updates = self.cloud_service.get_document_updates(doc_id).await?;
  74. }
  75. let uid = self.user.user_id()?;
  76. let db = self.user.collab_db(uid)?;
  77. let collab = self
  78. .collab_builder
  79. .build(uid, doc_id, CollabType::Document, updates, db)?;
  80. let document = Arc::new(MutexDocument::open(doc_id, collab)?);
  81. // save the document to the memory and read it from the memory if we open the same document again.
  82. // and we don't want to subscribe to the document changes if we open the same document again.
  83. self
  84. .documents
  85. .write()
  86. .insert(doc_id.to_string(), document.clone());
  87. Ok(document)
  88. }
  89. pub async fn get_document_data(&self, doc_id: &str) -> FlowyResult<DocumentData> {
  90. let mut updates = vec![];
  91. if !self.is_doc_exist(doc_id)? {
  92. if let Ok(document_updates) = self.cloud_service.get_document_updates(doc_id).await {
  93. updates = document_updates;
  94. } else {
  95. return Err(FlowyError::collab_not_sync());
  96. }
  97. }
  98. let uid = self.user.user_id()?;
  99. let collab = self.collab_for_document(uid, doc_id, updates)?;
  100. Document::open(collab)?
  101. .get_document_data()
  102. .map_err(internal_error)
  103. }
  104. pub fn close_document(&self, doc_id: &str) -> FlowyResult<()> {
  105. self.documents.write().remove(doc_id);
  106. Ok(())
  107. }
  108. pub fn delete_document(&self, doc_id: &str) -> FlowyResult<()> {
  109. let uid = self.user.user_id()?;
  110. if let Some(db) = self.user.collab_db(uid)?.upgrade() {
  111. let _ = db.with_write_txn(|txn| {
  112. txn.delete_doc(uid, &doc_id)?;
  113. Ok(())
  114. });
  115. self.documents.write().remove(doc_id);
  116. }
  117. Ok(())
  118. }
  119. /// Return the list of snapshots of the document.
  120. pub async fn get_document_snapshots(
  121. &self,
  122. document_id: &str,
  123. limit: usize,
  124. ) -> FlowyResult<Vec<DocumentSnapshotPB>> {
  125. let snapshots = self
  126. .cloud_service
  127. .get_document_snapshots(document_id, limit)
  128. .await?
  129. .into_iter()
  130. .map(|snapshot| DocumentSnapshotPB {
  131. snapshot_id: snapshot.snapshot_id,
  132. snapshot_desc: "".to_string(),
  133. created_at: snapshot.created_at,
  134. data: snapshot.data,
  135. })
  136. .collect::<Vec<_>>();
  137. Ok(snapshots)
  138. }
  139. fn collab_for_document(
  140. &self,
  141. uid: i64,
  142. doc_id: &str,
  143. updates: Vec<Vec<u8>>,
  144. ) -> FlowyResult<Arc<MutexCollab>> {
  145. let db = self.user.collab_db(uid)?;
  146. let collab = self
  147. .collab_builder
  148. .build(uid, doc_id, CollabType::Document, updates, db)?;
  149. Ok(collab)
  150. }
  151. fn is_doc_exist(&self, doc_id: &str) -> FlowyResult<bool> {
  152. let uid = self.user.user_id()?;
  153. if let Some(collab_db) = self.user.collab_db(uid)?.upgrade() {
  154. let read_txn = collab_db.read_txn();
  155. Ok(read_txn.is_exist(uid, doc_id))
  156. } else {
  157. Ok(false)
  158. }
  159. }
  160. /// Only expose this method for testing
  161. #[cfg(debug_assertions)]
  162. pub fn get_cloud_service(&self) -> &Arc<dyn DocumentCloudService> {
  163. &self.cloud_service
  164. }
  165. }