manager.rs 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. use crate::services::grid_editor::ClientGridEditor;
  2. use crate::services::kv_persistence::GridKVPersistence;
  3. use bytes::Bytes;
  4. use dashmap::DashMap;
  5. use flowy_database::ConnectionPool;
  6. use flowy_error::{FlowyError, FlowyResult};
  7. use flowy_grid_data_model::entities::{BuildGridContext, GridMeta};
  8. use flowy_revision::disk::{SQLiteGridBlockMetaRevisionPersistence, SQLiteGridRevisionPersistence};
  9. use flowy_revision::{RevisionManager, RevisionPersistence, RevisionWebSocket};
  10. use flowy_sync::client_grid::{make_block_meta_delta, make_grid_delta};
  11. use flowy_sync::entities::revision::{RepeatedRevision, Revision};
  12. use std::sync::Arc;
  13. use tokio::sync::RwLock;
  14. pub trait GridUser: Send + Sync {
  15. fn user_id(&self) -> Result<String, FlowyError>;
  16. fn token(&self) -> Result<String, FlowyError>;
  17. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError>;
  18. }
  19. pub struct GridManager {
  20. editor_map: Arc<GridEditorMap>,
  21. grid_user: Arc<dyn GridUser>,
  22. kv_persistence: Arc<RwLock<Option<Arc<GridKVPersistence>>>>,
  23. }
  24. impl GridManager {
  25. pub fn new(grid_user: Arc<dyn GridUser>, _rev_web_socket: Arc<dyn RevisionWebSocket>) -> Self {
  26. let grid_editors = Arc::new(GridEditorMap::new());
  27. // kv_persistence will be initialized after first access.
  28. // See get_kv_persistence function below
  29. let kv_persistence = Arc::new(RwLock::new(None));
  30. Self {
  31. editor_map: grid_editors,
  32. grid_user,
  33. kv_persistence,
  34. }
  35. }
  36. #[tracing::instrument(level = "debug", skip_all, err)]
  37. pub async fn create_grid<T: AsRef<str>>(&self, grid_id: T, revisions: RepeatedRevision) -> FlowyResult<()> {
  38. let grid_id = grid_id.as_ref();
  39. let db_pool = self.grid_user.db_pool()?;
  40. let rev_manager = self.make_grid_rev_manager(grid_id, db_pool)?;
  41. let _ = rev_manager.reset_object(revisions).await?;
  42. Ok(())
  43. }
  44. #[tracing::instrument(level = "debug", skip_all, err)]
  45. pub async fn create_grid_block_meta<T: AsRef<str>>(
  46. &self,
  47. block_id: T,
  48. revisions: RepeatedRevision,
  49. ) -> FlowyResult<()> {
  50. let block_id = block_id.as_ref();
  51. let db_pool = self.grid_user.db_pool()?;
  52. let rev_manager = self.make_grid_block_meta_rev_manager(block_id, db_pool)?;
  53. let _ = rev_manager.reset_object(revisions).await?;
  54. Ok(())
  55. }
  56. #[tracing::instrument(level = "debug", skip_all, fields(grid_id), err)]
  57. pub async fn open_grid<T: AsRef<str>>(&self, grid_id: T) -> FlowyResult<Arc<ClientGridEditor>> {
  58. let grid_id = grid_id.as_ref();
  59. tracing::Span::current().record("grid_id", &grid_id);
  60. self.get_or_create_grid_editor(grid_id).await
  61. }
  62. #[tracing::instrument(level = "trace", skip_all, fields(grid_id), err)]
  63. pub fn close_grid<T: AsRef<str>>(&self, grid_id: T) -> FlowyResult<()> {
  64. let grid_id = grid_id.as_ref();
  65. tracing::Span::current().record("grid_id", &grid_id);
  66. self.editor_map.remove(grid_id);
  67. Ok(())
  68. }
  69. #[tracing::instrument(level = "debug", skip(self, grid_id), fields(doc_id), err)]
  70. pub fn delete_grid<T: AsRef<str>>(&self, grid_id: T) -> FlowyResult<()> {
  71. let grid_id = grid_id.as_ref();
  72. tracing::Span::current().record("grid_id", &grid_id);
  73. self.editor_map.remove(grid_id);
  74. Ok(())
  75. }
  76. // #[tracing::instrument(level = "debug", skip(self), err)]
  77. pub fn get_grid_editor(&self, grid_id: &str) -> FlowyResult<Arc<ClientGridEditor>> {
  78. match self.editor_map.get(grid_id) {
  79. None => Err(FlowyError::internal().context("Should call open_grid function first")),
  80. Some(editor) => Ok(editor),
  81. }
  82. }
  83. async fn get_or_create_grid_editor(&self, grid_id: &str) -> FlowyResult<Arc<ClientGridEditor>> {
  84. match self.editor_map.get(grid_id) {
  85. None => {
  86. tracing::trace!("Create grid editor with id: {}", grid_id);
  87. let db_pool = self.grid_user.db_pool()?;
  88. let editor = self.make_grid_editor(grid_id, db_pool).await?;
  89. self.editor_map.insert(grid_id, &editor);
  90. Ok(editor)
  91. }
  92. Some(editor) => Ok(editor),
  93. }
  94. }
  95. async fn make_grid_editor(
  96. &self,
  97. grid_id: &str,
  98. pool: Arc<ConnectionPool>,
  99. ) -> Result<Arc<ClientGridEditor>, FlowyError> {
  100. let user = self.grid_user.clone();
  101. let rev_manager = self.make_grid_rev_manager(grid_id, pool.clone())?;
  102. let grid_editor = ClientGridEditor::new(grid_id, user, rev_manager).await?;
  103. Ok(grid_editor)
  104. }
  105. pub fn make_grid_rev_manager(&self, grid_id: &str, pool: Arc<ConnectionPool>) -> FlowyResult<RevisionManager> {
  106. let user_id = self.grid_user.user_id()?;
  107. let disk_cache = Arc::new(SQLiteGridRevisionPersistence::new(&user_id, pool));
  108. let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, grid_id, disk_cache));
  109. let rev_manager = RevisionManager::new(&user_id, grid_id, rev_persistence);
  110. Ok(rev_manager)
  111. }
  112. fn make_grid_block_meta_rev_manager(
  113. &self,
  114. block_d: &str,
  115. pool: Arc<ConnectionPool>,
  116. ) -> FlowyResult<RevisionManager> {
  117. let user_id = self.grid_user.user_id()?;
  118. let disk_cache = Arc::new(SQLiteGridBlockMetaRevisionPersistence::new(&user_id, pool));
  119. let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, block_d, disk_cache));
  120. let rev_manager = RevisionManager::new(&user_id, block_d, rev_persistence);
  121. Ok(rev_manager)
  122. }
  123. #[allow(dead_code)]
  124. async fn get_kv_persistence(&self) -> FlowyResult<Arc<GridKVPersistence>> {
  125. let read_guard = self.kv_persistence.read().await;
  126. if read_guard.is_some() {
  127. return Ok(read_guard.clone().unwrap());
  128. }
  129. drop(read_guard);
  130. let pool = self.grid_user.db_pool()?;
  131. let kv_persistence = Arc::new(GridKVPersistence::new(pool));
  132. *self.kv_persistence.write().await = Some(kv_persistence.clone());
  133. Ok(kv_persistence)
  134. }
  135. }
  136. pub struct GridEditorMap {
  137. inner: DashMap<String, Arc<ClientGridEditor>>,
  138. }
  139. impl GridEditorMap {
  140. fn new() -> Self {
  141. Self { inner: DashMap::new() }
  142. }
  143. pub(crate) fn insert(&self, grid_id: &str, grid_editor: &Arc<ClientGridEditor>) {
  144. if self.inner.contains_key(grid_id) {
  145. tracing::warn!("Grid:{} already exists in cache", grid_id);
  146. }
  147. self.inner.insert(grid_id.to_string(), grid_editor.clone());
  148. }
  149. pub(crate) fn get(&self, grid_id: &str) -> Option<Arc<ClientGridEditor>> {
  150. Some(self.inner.get(grid_id)?.clone())
  151. }
  152. pub(crate) fn remove(&self, grid_id: &str) {
  153. self.inner.remove(grid_id);
  154. }
  155. }
  156. pub async fn make_grid_view_data(
  157. user_id: &str,
  158. view_id: &str,
  159. grid_manager: Arc<GridManager>,
  160. build_context: BuildGridContext,
  161. ) -> FlowyResult<Bytes> {
  162. let block_id = build_context.block_metas.block_id.clone();
  163. let grid_meta = GridMeta {
  164. grid_id: view_id.to_string(),
  165. fields: build_context.field_metas,
  166. block_metas: vec![build_context.block_metas],
  167. };
  168. let grid_meta_delta = make_grid_delta(&grid_meta);
  169. let grid_delta_data = grid_meta_delta.to_delta_bytes();
  170. let repeated_revision: RepeatedRevision =
  171. Revision::initial_revision(user_id, view_id, grid_delta_data.clone()).into();
  172. let _ = grid_manager.create_grid(view_id, repeated_revision).await?;
  173. let grid_block_meta_delta = make_block_meta_delta(&build_context.block_meta_data);
  174. let block_meta_delta_data = grid_block_meta_delta.to_delta_bytes();
  175. let repeated_revision: RepeatedRevision =
  176. Revision::initial_revision(user_id, &block_id, block_meta_delta_data).into();
  177. let _ = grid_manager
  178. .create_grid_block_meta(&block_id, repeated_revision)
  179. .await?;
  180. Ok(grid_delta_data)
  181. }