block_manager.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. use crate::dart_notification::{send_dart_notification, GridNotification};
  2. use crate::entities::{CellChangesetPB, GridBlockChangesetPB, InsertedRowPB, RowPB};
  3. use crate::manager::GridUser;
  4. use crate::services::block_editor::{GridBlockRevisionCompress, GridBlockRevisionEditor};
  5. use crate::services::persistence::block_index::BlockIndexCache;
  6. use crate::services::persistence::rev_sqlite::SQLiteGridBlockRevisionPersistence;
  7. use crate::services::row::{block_from_row_orders, make_row_from_row_rev, GridBlockSnapshot};
  8. use dashmap::DashMap;
  9. use flowy_database::ConnectionPool;
  10. use flowy_error::FlowyResult;
  11. use flowy_grid_data_model::revision::{
  12. GridBlockMetaRevision, GridBlockMetaRevisionChangeset, RowChangeset, RowRevision,
  13. };
  14. use flowy_revision::{
  15. RevisionManager, RevisionPersistence, RevisionPersistenceConfiguration, SQLiteRevisionSnapshotPersistence,
  16. };
  17. use std::borrow::Cow;
  18. use std::collections::HashMap;
  19. use std::sync::Arc;
  20. type BlockId = String;
  21. pub(crate) struct GridBlockManager {
  22. user: Arc<dyn GridUser>,
  23. persistence: Arc<BlockIndexCache>,
  24. block_editors: DashMap<BlockId, Arc<GridBlockRevisionEditor>>,
  25. }
  26. impl GridBlockManager {
  27. pub(crate) async fn new(
  28. user: &Arc<dyn GridUser>,
  29. block_meta_revs: Vec<Arc<GridBlockMetaRevision>>,
  30. persistence: Arc<BlockIndexCache>,
  31. ) -> FlowyResult<Self> {
  32. let block_editors = make_block_editors(user, block_meta_revs).await?;
  33. let user = user.clone();
  34. let manager = Self {
  35. user,
  36. block_editors,
  37. persistence,
  38. };
  39. Ok(manager)
  40. }
  41. // #[tracing::instrument(level = "trace", skip(self))]
  42. pub(crate) async fn get_block_editor(&self, block_id: &str) -> FlowyResult<Arc<GridBlockRevisionEditor>> {
  43. debug_assert!(!block_id.is_empty());
  44. match self.block_editors.get(block_id) {
  45. None => {
  46. tracing::error!("This is a fatal error, block with id:{} is not exist", block_id);
  47. let editor = Arc::new(make_grid_block_editor(&self.user, block_id).await?);
  48. self.block_editors.insert(block_id.to_owned(), editor.clone());
  49. Ok(editor)
  50. }
  51. Some(editor) => Ok(editor.clone()),
  52. }
  53. }
  54. pub(crate) async fn get_editor_from_row_id(&self, row_id: &str) -> FlowyResult<Arc<GridBlockRevisionEditor>> {
  55. let block_id = self.persistence.get_block_id(row_id)?;
  56. self.get_block_editor(&block_id).await
  57. }
  58. #[tracing::instrument(level = "trace", skip(self, start_row_id), err)]
  59. pub(crate) async fn create_row(&self, row_rev: RowRevision, start_row_id: Option<String>) -> FlowyResult<i32> {
  60. let block_id = row_rev.block_id.clone();
  61. let _ = self.persistence.insert(&row_rev.block_id, &row_rev.id)?;
  62. let editor = self.get_block_editor(&row_rev.block_id).await?;
  63. let mut index_row_order = InsertedRowPB::from(&row_rev);
  64. let (row_count, row_index) = editor.create_row(row_rev, start_row_id).await?;
  65. index_row_order.index = row_index;
  66. let changeset = GridBlockChangesetPB::insert(block_id.clone(), vec![index_row_order]);
  67. let _ = self.notify_did_update_block(&block_id, changeset).await?;
  68. Ok(row_count)
  69. }
  70. pub(crate) async fn insert_row(
  71. &self,
  72. rows_by_block_id: HashMap<String, Vec<RowRevision>>,
  73. ) -> FlowyResult<Vec<GridBlockMetaRevisionChangeset>> {
  74. let mut changesets = vec![];
  75. for (block_id, row_revs) in rows_by_block_id {
  76. let mut inserted_row_orders = vec![];
  77. let editor = self.get_block_editor(&block_id).await?;
  78. let mut row_count = 0;
  79. for row in row_revs {
  80. let _ = self.persistence.insert(&row.block_id, &row.id)?;
  81. let mut row_order = InsertedRowPB::from(&row);
  82. let (count, index) = editor.create_row(row, None).await?;
  83. row_count = count;
  84. row_order.index = index;
  85. inserted_row_orders.push(row_order);
  86. }
  87. changesets.push(GridBlockMetaRevisionChangeset::from_row_count(
  88. block_id.clone(),
  89. row_count,
  90. ));
  91. let _ = self
  92. .notify_did_update_block(
  93. &block_id,
  94. GridBlockChangesetPB::insert(block_id.clone(), inserted_row_orders),
  95. )
  96. .await?;
  97. }
  98. Ok(changesets)
  99. }
  100. pub async fn update_row(&self, changeset: RowChangeset) -> FlowyResult<()> {
  101. let editor = self.get_editor_from_row_id(&changeset.row_id).await?;
  102. let _ = editor.update_row(changeset.clone()).await?;
  103. match editor.get_row_rev(&changeset.row_id).await? {
  104. None => tracing::error!("Update row failed, can't find the row with id: {}", changeset.row_id),
  105. Some(row_rev) => {
  106. let row_pb = make_row_from_row_rev(row_rev.clone());
  107. let block_order_changeset = GridBlockChangesetPB::update(&editor.block_id, vec![row_pb]);
  108. let _ = self
  109. .notify_did_update_block(&editor.block_id, block_order_changeset)
  110. .await?;
  111. }
  112. }
  113. Ok(())
  114. }
  115. #[tracing::instrument(level = "trace", skip_all, err)]
  116. pub async fn delete_row(&self, row_id: &str) -> FlowyResult<Option<Arc<RowRevision>>> {
  117. let row_id = row_id.to_owned();
  118. let block_id = self.persistence.get_block_id(&row_id)?;
  119. let editor = self.get_block_editor(&block_id).await?;
  120. match editor.get_row_rev(&row_id).await? {
  121. None => Ok(None),
  122. Some(row_rev) => {
  123. let _ = editor.delete_rows(vec![Cow::Borrowed(&row_id)]).await?;
  124. let _ = self
  125. .notify_did_update_block(
  126. &block_id,
  127. GridBlockChangesetPB::delete(&block_id, vec![row_rev.id.clone()]),
  128. )
  129. .await?;
  130. Ok(Some(row_rev))
  131. }
  132. }
  133. }
  134. pub(crate) async fn delete_rows(&self, row_orders: Vec<RowPB>) -> FlowyResult<Vec<GridBlockMetaRevisionChangeset>> {
  135. let mut changesets = vec![];
  136. for grid_block in block_from_row_orders(row_orders) {
  137. let editor = self.get_block_editor(&grid_block.id).await?;
  138. let row_ids = grid_block
  139. .rows
  140. .into_iter()
  141. .map(|row_info| Cow::Owned(row_info.row_id().to_owned()))
  142. .collect::<Vec<Cow<String>>>();
  143. let row_count = editor.delete_rows(row_ids).await?;
  144. let changeset = GridBlockMetaRevisionChangeset::from_row_count(grid_block.id.clone(), row_count);
  145. changesets.push(changeset);
  146. }
  147. Ok(changesets)
  148. }
  149. // This function will be moved to GridViewRevisionEditor
  150. pub(crate) async fn move_row(&self, row_rev: Arc<RowRevision>, from: usize, to: usize) -> FlowyResult<()> {
  151. let editor = self.get_editor_from_row_id(&row_rev.id).await?;
  152. let _ = editor.move_row(&row_rev.id, from, to).await?;
  153. let delete_row_id = row_rev.id.clone();
  154. let insert_row = InsertedRowPB {
  155. index: Some(to as i32),
  156. row: make_row_from_row_rev(row_rev),
  157. is_new: false,
  158. };
  159. let notified_changeset = GridBlockChangesetPB {
  160. block_id: editor.block_id.clone(),
  161. inserted_rows: vec![insert_row],
  162. deleted_rows: vec![delete_row_id],
  163. ..Default::default()
  164. };
  165. let _ = self
  166. .notify_did_update_block(&editor.block_id, notified_changeset)
  167. .await?;
  168. Ok(())
  169. }
  170. // This function will be moved to GridViewRevisionEditor.
  171. pub async fn index_of_row(&self, row_id: &str) -> Option<usize> {
  172. match self.get_editor_from_row_id(row_id).await {
  173. Ok(editor) => editor.index_of_row(row_id).await,
  174. Err(_) => None,
  175. }
  176. }
  177. pub async fn update_cell(&self, changeset: CellChangesetPB) -> FlowyResult<()> {
  178. let row_changeset: RowChangeset = changeset.clone().into();
  179. let _ = self.update_row(row_changeset).await?;
  180. self.notify_did_update_cell(changeset).await?;
  181. Ok(())
  182. }
  183. pub async fn get_row_rev(&self, row_id: &str) -> FlowyResult<Option<Arc<RowRevision>>> {
  184. let editor = self.get_editor_from_row_id(row_id).await?;
  185. let row_ids = vec![Cow::Borrowed(row_id)];
  186. let mut row_revs = editor.get_row_revs(Some(row_ids)).await?;
  187. if row_revs.is_empty() {
  188. Ok(None)
  189. } else {
  190. Ok(row_revs.pop())
  191. }
  192. }
  193. pub async fn get_row_orders(&self, block_id: &str) -> FlowyResult<Vec<RowPB>> {
  194. let editor = self.get_block_editor(block_id).await?;
  195. editor.get_row_infos::<&str>(None).await
  196. }
  197. pub(crate) async fn get_block_snapshots(
  198. &self,
  199. block_ids: Option<Vec<String>>,
  200. ) -> FlowyResult<Vec<GridBlockSnapshot>> {
  201. let mut snapshots = vec![];
  202. match block_ids {
  203. None => {
  204. for iter in self.block_editors.iter() {
  205. let editor = iter.value();
  206. let block_id = editor.block_id.clone();
  207. let row_revs = editor.get_row_revs::<&str>(None).await?;
  208. snapshots.push(GridBlockSnapshot { block_id, row_revs });
  209. }
  210. }
  211. Some(block_ids) => {
  212. for block_id in block_ids {
  213. let editor = self.get_block_editor(&block_id).await?;
  214. let row_revs = editor.get_row_revs::<&str>(None).await?;
  215. snapshots.push(GridBlockSnapshot { block_id, row_revs });
  216. }
  217. }
  218. }
  219. Ok(snapshots)
  220. }
  221. async fn notify_did_update_block(&self, block_id: &str, changeset: GridBlockChangesetPB) -> FlowyResult<()> {
  222. send_dart_notification(block_id, GridNotification::DidUpdateGridBlock)
  223. .payload(changeset)
  224. .send();
  225. Ok(())
  226. }
  227. async fn notify_did_update_cell(&self, changeset: CellChangesetPB) -> FlowyResult<()> {
  228. let id = format!("{}:{}", changeset.row_id, changeset.field_id);
  229. send_dart_notification(&id, GridNotification::DidUpdateCell).send();
  230. Ok(())
  231. }
  232. }
  233. /// Initialize each block editor
  234. async fn make_block_editors(
  235. user: &Arc<dyn GridUser>,
  236. block_meta_revs: Vec<Arc<GridBlockMetaRevision>>,
  237. ) -> FlowyResult<DashMap<String, Arc<GridBlockRevisionEditor>>> {
  238. let editor_map = DashMap::new();
  239. for block_meta_rev in block_meta_revs {
  240. let editor = make_grid_block_editor(user, &block_meta_rev.block_id).await?;
  241. editor_map.insert(block_meta_rev.block_id.clone(), Arc::new(editor));
  242. }
  243. Ok(editor_map)
  244. }
  245. async fn make_grid_block_editor(user: &Arc<dyn GridUser>, block_id: &str) -> FlowyResult<GridBlockRevisionEditor> {
  246. tracing::trace!("Open block:{} editor", block_id);
  247. let token = user.token()?;
  248. let user_id = user.user_id()?;
  249. let rev_manager = make_grid_block_rev_manager(user, block_id)?;
  250. GridBlockRevisionEditor::new(&user_id, &token, block_id, rev_manager).await
  251. }
  252. pub fn make_grid_block_rev_manager(
  253. user: &Arc<dyn GridUser>,
  254. block_id: &str,
  255. ) -> FlowyResult<RevisionManager<Arc<ConnectionPool>>> {
  256. let user_id = user.user_id()?;
  257. let pool = user.db_pool()?;
  258. let disk_cache = SQLiteGridBlockRevisionPersistence::new(&user_id, pool.clone());
  259. let configuration = RevisionPersistenceConfiguration::new(4, false);
  260. let rev_persistence = RevisionPersistence::new(&user_id, block_id, disk_cache, configuration);
  261. let rev_compactor = GridBlockRevisionCompress();
  262. let snapshot_persistence = SQLiteRevisionSnapshotPersistence::new(block_id, pool);
  263. let rev_manager = RevisionManager::new(&user_id, block_id, rev_persistence, rev_compactor, snapshot_persistence);
  264. Ok(rev_manager)
  265. }