grid_editor.rs 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. use crate::manager::GridUser;
  2. use crate::services::grid_meta_editor::GridBlockMetaEditorManager;
  3. use crate::services::kv_persistence::{GridKVPersistence, KVTransaction};
  4. use bytes::Bytes;
  5. use flowy_collaboration::client_grid::{GridChange, GridMetaPad};
  6. use flowy_collaboration::entities::revision::Revision;
  7. use flowy_collaboration::util::make_delta_from_revisions;
  8. use flowy_error::{FlowyError, FlowyResult};
  9. use flowy_grid_data_model::entities::{
  10. CellMetaChangeset, Field, FieldChangeset, Grid, GridBlock, GridBlockChangeset, RepeatedFieldOrder,
  11. RepeatedRowOrder, Row, RowMeta, RowMetaChangeset,
  12. };
  13. use std::collections::HashMap;
  14. use crate::services::row::{
  15. make_row_by_row_id, make_rows, row_meta_from_context, CreateRowContext, CreateRowContextBuilder,
  16. };
  17. use flowy_sync::{RevisionCloudService, RevisionCompactor, RevisionManager, RevisionObjectBuilder};
  18. use lib_infra::future::FutureResult;
  19. use lib_ot::core::PlainTextAttributes;
  20. use std::sync::Arc;
  21. use tokio::sync::RwLock;
  22. pub struct ClientGridEditor {
  23. grid_id: String,
  24. user: Arc<dyn GridUser>,
  25. grid_meta_pad: Arc<RwLock<GridMetaPad>>,
  26. rev_manager: Arc<RevisionManager>,
  27. block_meta_manager: Arc<GridBlockMetaEditorManager>,
  28. kv_persistence: Arc<GridKVPersistence>,
  29. }
  30. impl ClientGridEditor {
  31. pub async fn new(
  32. grid_id: &str,
  33. user: Arc<dyn GridUser>,
  34. mut rev_manager: RevisionManager,
  35. kv_persistence: Arc<GridKVPersistence>,
  36. ) -> FlowyResult<Arc<Self>> {
  37. let token = user.token()?;
  38. let cloud = Arc::new(GridRevisionCloudService { token });
  39. let grid_pad = rev_manager.load::<GridPadBuilder>(Some(cloud)).await?;
  40. let rev_manager = Arc::new(rev_manager);
  41. let grid_meta_pad = Arc::new(RwLock::new(grid_pad));
  42. let block_meta_manager =
  43. Arc::new(GridBlockMetaEditorManager::new(&user, grid_meta_pad.read().await.get_blocks().clone()).await?);
  44. Ok(Arc::new(Self {
  45. grid_id: grid_id.to_owned(),
  46. user,
  47. grid_meta_pad,
  48. rev_manager,
  49. block_meta_manager,
  50. kv_persistence,
  51. }))
  52. }
  53. pub async fn create_field(&self, field: Field) -> FlowyResult<()> {
  54. let _ = self.modify(|grid| Ok(grid.create_field(field)?)).await?;
  55. Ok(())
  56. }
  57. pub async fn update_field(&self, change: FieldChangeset) -> FlowyResult<()> {
  58. let _ = self.modify(|grid| Ok(grid.update_field(change)?)).await?;
  59. Ok(())
  60. }
  61. pub async fn delete_field(&self, field_id: &str) -> FlowyResult<()> {
  62. let _ = self.modify(|grid| Ok(grid.delete_field(field_id)?)).await?;
  63. Ok(())
  64. }
  65. pub async fn create_block(&self, grid_block: GridBlock) -> FlowyResult<()> {
  66. let _ = self.modify(|grid| Ok(grid.create_block(grid_block)?)).await?;
  67. Ok(())
  68. }
  69. pub async fn update_block(&self, changeset: GridBlockChangeset) -> FlowyResult<()> {
  70. let _ = self.modify(|grid| Ok(grid.update_block(changeset)?)).await?;
  71. Ok(())
  72. }
  73. pub async fn create_row(&self) -> FlowyResult<()> {
  74. let fields = self.grid_meta_pad.read().await.get_fields(None)?;
  75. let block_id = self.last_block_id().await?;
  76. let row = row_meta_from_context(&block_id, CreateRowContextBuilder::new(&fields).build());
  77. let row_count = self.block_meta_manager.create_row(row).await?;
  78. let changeset = GridBlockChangeset::from_row_count(&block_id, row_count);
  79. let _ = self.update_block(changeset).await?;
  80. Ok(())
  81. }
  82. pub async fn insert_rows(&self, contexts: Vec<CreateRowContext>) -> FlowyResult<()> {
  83. let block_id = self.last_block_id().await?;
  84. let mut rows_by_block_id: HashMap<String, Vec<RowMeta>> = HashMap::new();
  85. for ctx in contexts {
  86. let row_meta = row_meta_from_context(&block_id, ctx);
  87. rows_by_block_id
  88. .entry(block_id.clone())
  89. .or_insert(Vec::new())
  90. .push(row_meta);
  91. }
  92. let changesets = self.block_meta_manager.insert_row(rows_by_block_id).await?;
  93. for changeset in changesets {
  94. let _ = self.update_block(changeset).await?;
  95. }
  96. Ok(())
  97. }
  98. pub async fn update_row(&self, changeset: RowMetaChangeset) -> FlowyResult<()> {
  99. self.block_meta_manager.update_row(changeset).await
  100. }
  101. pub async fn update_cell(&self, changeset: CellMetaChangeset) -> FlowyResult<()> {
  102. let row_changeset: RowMetaChangeset = changeset.into();
  103. self.update_row(row_changeset).await
  104. }
  105. pub async fn get_rows(&self, row_orders: Option<RepeatedRowOrder>) -> FlowyResult<Vec<Row>> {
  106. let row_metas = self.get_row_metas(&row_orders).await?;
  107. let fields = self.grid_meta_pad.read().await.get_fields(None)?;
  108. match row_orders {
  109. None => Ok(make_rows(&fields, row_metas)),
  110. Some(row_orders) => {
  111. let mut row_map: HashMap<String, Row> = make_row_by_row_id(&fields, row_metas);
  112. let rows = row_orders
  113. .iter()
  114. .flat_map(|row_order| row_map.remove(&row_order.row_id))
  115. .collect::<Vec<_>>();
  116. Ok(rows)
  117. }
  118. }
  119. }
  120. pub async fn get_row_metas(&self, row_orders: &Option<RepeatedRowOrder>) -> FlowyResult<Vec<RowMeta>> {
  121. match row_orders {
  122. None => {
  123. let grid_blocks = self.grid_meta_pad.read().await.get_blocks();
  124. let row_metas = self.block_meta_manager.get_all_rows(grid_blocks).await?;
  125. Ok(row_metas)
  126. }
  127. Some(row_orders) => {
  128. let row_metas = self.block_meta_manager.get_rows(row_orders).await?;
  129. Ok(row_metas)
  130. }
  131. }
  132. }
  133. pub async fn delete_rows(&self, row_ids: Vec<String>) -> FlowyResult<()> {
  134. let changesets = self.block_meta_manager.delete_rows(row_ids).await?;
  135. for changeset in changesets {
  136. let _ = self.update_block(changeset).await?;
  137. }
  138. Ok(())
  139. }
  140. pub async fn grid_data(&self) -> Grid {
  141. todo!()
  142. }
  143. pub async fn get_fields(&self, field_orders: Option<RepeatedFieldOrder>) -> FlowyResult<Vec<Field>> {
  144. let fields = self.grid_meta_pad.read().await.get_fields(field_orders)?;
  145. Ok(fields)
  146. }
  147. pub async fn get_blocks(&self) -> FlowyResult<Vec<GridBlock>> {
  148. let grid_blocks = self.grid_meta_pad.read().await.get_blocks();
  149. Ok(grid_blocks)
  150. }
  151. pub async fn delta_str(&self) -> String {
  152. self.grid_meta_pad.read().await.delta_str()
  153. }
  154. async fn modify<F>(&self, f: F) -> FlowyResult<()>
  155. where
  156. F: for<'a> FnOnce(&'a mut GridMetaPad) -> FlowyResult<Option<GridChange>>,
  157. {
  158. let mut write_guard = self.grid_meta_pad.write().await;
  159. match f(&mut *write_guard)? {
  160. None => {}
  161. Some(change) => {
  162. let _ = self.apply_change(change).await?;
  163. }
  164. }
  165. Ok(())
  166. }
  167. async fn apply_change(&self, change: GridChange) -> FlowyResult<()> {
  168. let GridChange { delta, md5 } = change;
  169. let user_id = self.user.user_id()?;
  170. let (base_rev_id, rev_id) = self.rev_manager.next_rev_id_pair();
  171. let delta_data = delta.to_bytes();
  172. let revision = Revision::new(
  173. &self.rev_manager.object_id,
  174. base_rev_id,
  175. rev_id,
  176. delta_data,
  177. &user_id,
  178. md5,
  179. );
  180. let _ = self
  181. .rev_manager
  182. .add_local_revision(&revision, Box::new(GridRevisionCompactor()))
  183. .await?;
  184. Ok(())
  185. }
  186. async fn last_block_id(&self) -> FlowyResult<String> {
  187. match self.grid_meta_pad.read().await.get_blocks().last() {
  188. None => Err(FlowyError::internal().context("There is no grid block in this grid")),
  189. Some(grid_block) => Ok(grid_block.id.clone()),
  190. }
  191. }
  192. }
  193. #[cfg(feature = "flowy_unit_test")]
  194. impl ClientGridEditor {
  195. pub fn rev_manager(&self) -> Arc<RevisionManager> {
  196. self.rev_manager.clone()
  197. }
  198. }
  199. pub struct GridPadBuilder();
  200. impl RevisionObjectBuilder for GridPadBuilder {
  201. type Output = GridMetaPad;
  202. fn build_object(object_id: &str, revisions: Vec<Revision>) -> FlowyResult<Self::Output> {
  203. let pad = GridMetaPad::from_revisions(object_id, revisions)?;
  204. Ok(pad)
  205. }
  206. }
  207. struct GridRevisionCloudService {
  208. #[allow(dead_code)]
  209. token: String,
  210. }
  211. impl RevisionCloudService for GridRevisionCloudService {
  212. #[tracing::instrument(level = "trace", skip(self))]
  213. fn fetch_object(&self, _user_id: &str, _object_id: &str) -> FutureResult<Vec<Revision>, FlowyError> {
  214. FutureResult::new(async move { Ok(vec![]) })
  215. }
  216. }
  217. struct GridRevisionCompactor();
  218. impl RevisionCompactor for GridRevisionCompactor {
  219. fn bytes_from_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  220. let delta = make_delta_from_revisions::<PlainTextAttributes>(revisions)?;
  221. Ok(delta.to_bytes())
  222. }
  223. }