database_editor.rs 34 KB


  1. use crate::entities::CellIdParams;
  2. use crate::entities::*;
  3. use crate::manager::DatabaseUser;
  4. use crate::notification::{send_notification, DatabaseNotification};
  5. use crate::services::cell::{
  6. apply_cell_data_changeset, get_type_cell_protobuf, stringify_cell_data, AnyTypeCache,
  7. AtomicCellDataCache, CellProtobufBlob, ToCellChangesetString, TypeCellData,
  8. };
  9. use crate::services::database::DatabaseBlocks;
  10. use crate::services::field::{
  11. default_type_option_builder_from_type, transform_type_option, type_option_builder_from_bytes,
  12. FieldBuilder, RowSingleCellData,
  13. };
  14. use crate::services::database::DatabaseViewDataImpl;
  15. use crate::services::database_view::{
  16. DatabaseViewChanged, DatabaseViewData, DatabaseViewEditor, DatabaseViews,
  17. };
  18. use crate::services::filter::FilterType;
  19. use crate::services::persistence::block_index::BlockRowIndexer;
  20. use crate::services::persistence::database_ref::DatabaseRef;
  21. use crate::services::row::{DatabaseBlockRow, DatabaseBlockRowRevision, RowRevisionBuilder};
  22. use bytes::Bytes;
  23. use database_model::*;
  24. use flowy_client_sync::client_database::{
  25. DatabaseRevisionChangeset, DatabaseRevisionPad, JsonDeserializer,
  26. };
  27. use flowy_client_sync::errors::{SyncError, SyncResult};
  28. use flowy_client_sync::make_operations_from_revisions;
  29. use flowy_error::{FlowyError, FlowyResult};
  30. use flowy_revision::{
  31. RevisionCloudService, RevisionManager, RevisionMergeable, RevisionObjectDeserializer,
  32. RevisionObjectSerializer,
  33. };
  34. use flowy_sqlite::ConnectionPool;
  35. use flowy_task::TaskDispatcher;
  36. use lib_infra::future::{to_fut, FutureResult};
  37. use lib_ot::core::EmptyAttributes;
  38. use revision_model::Revision;
  39. use std::collections::HashMap;
  40. use std::sync::Arc;
  41. use tokio::sync::{broadcast, RwLock};
  42. pub trait DatabaseRefIndexerQuery: Send + Sync + 'static {
  43. fn get_ref_views(&self, database_id: &str) -> FlowyResult<Vec<DatabaseRef>>;
  44. }
  45. pub struct DatabaseEditor {
  46. pub database_id: String,
  47. database_pad: Arc<RwLock<DatabaseRevisionPad>>,
  48. rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
  49. database_views: Arc<DatabaseViews>,
  50. database_blocks: Arc<DatabaseBlocks>,
  51. pub database_view_data: Arc<dyn DatabaseViewData>,
  52. pub cell_data_cache: AtomicCellDataCache,
  53. database_ref_query: Arc<dyn DatabaseRefIndexerQuery>,
  54. }
  55. impl Drop for DatabaseEditor {
  56. fn drop(&mut self) {
  57. tracing::trace!("Drop DatabaseRevisionEditor");
  58. }
  59. }
  60. impl DatabaseEditor {
  61. #[allow(clippy::too_many_arguments)]
  62. pub async fn new(
  63. database_id: &str,
  64. user: Arc<dyn DatabaseUser>,
  65. database_pad: Arc<RwLock<DatabaseRevisionPad>>,
  66. rev_manager: RevisionManager<Arc<ConnectionPool>>,
  67. persistence: Arc<BlockRowIndexer>,
  68. database_ref_query: Arc<dyn DatabaseRefIndexerQuery>,
  69. task_scheduler: Arc<RwLock<TaskDispatcher>>,
  70. ) -> FlowyResult<Arc<Self>> {
  71. let rev_manager = Arc::new(rev_manager);
  72. let cell_data_cache = AnyTypeCache::<u64>::new();
  73. // Block manager
  74. let (block_event_tx, block_event_rx) = broadcast::channel(100);
  75. let block_meta_revs = database_pad.read().await.get_block_meta_revs();
  76. let database_blocks =
  77. Arc::new(DatabaseBlocks::new(&user, block_meta_revs, persistence, block_event_tx).await?);
  78. let database_view_data = Arc::new(DatabaseViewDataImpl {
  79. pad: database_pad.clone(),
  80. blocks: database_blocks.clone(),
  81. task_scheduler,
  82. cell_data_cache: cell_data_cache.clone(),
  83. });
  84. // View manager
  85. let database_views = DatabaseViews::new(
  86. user.clone(),
  87. database_view_data.clone(),
  88. cell_data_cache.clone(),
  89. block_event_rx,
  90. )
  91. .await?;
  92. let database_views = Arc::new(database_views);
  93. let editor = Arc::new(Self {
  94. database_id: database_id.to_owned(),
  95. database_pad,
  96. rev_manager,
  97. database_blocks,
  98. database_views,
  99. cell_data_cache,
  100. database_ref_query,
  101. database_view_data,
  102. });
  103. Ok(editor)
  104. }
  105. pub async fn open_view_editor(&self, view_editor: DatabaseViewEditor) {
  106. self.database_views.open(view_editor).await
  107. }
  108. #[tracing::instrument(name = "Close database editor view", level = "debug", skip_all)]
  109. pub async fn close_view_editor(&self, view_id: &str) {
  110. self.rev_manager.generate_snapshot().await;
  111. self.database_views.close(view_id).await;
  112. }
  113. pub async fn dispose(&self) {
  114. self.database_blocks.close().await;
  115. self.rev_manager.close().await;
  116. }
  117. pub async fn number_of_ref_views(&self) -> usize {
  118. self.database_views.number_of_views().await
  119. }
  120. /// Save the type-option data to disk and send a `DatabaseNotification::DidUpdateField` notification
  121. /// to dart side.
  122. ///
  123. /// It will do nothing if the passed-in type_option_data is empty
  124. /// # Arguments
  125. ///
  126. /// * `field_id`: the id of the field
  127. /// * `type_option_data`: the updated type-option data. The `type-option` data might be empty
  128. /// if there is no type-option config for that field. For example, the `RichTextTypeOptionPB`.
  129. ///
  130. pub async fn update_field_type_option(
  131. &self,
  132. view_id: &str,
  133. field_id: &str,
  134. type_option_data: Vec<u8>,
  135. old_field_rev: Option<Arc<FieldRevision>>,
  136. ) -> FlowyResult<()> {
  137. let result = self.get_field_rev(field_id).await;
  138. if result.is_none() {
  139. tracing::warn!("Can't find the field with id: {}", field_id);
  140. return Ok(());
  141. }
  142. let field_rev = result.unwrap();
  143. self
  144. .modify(|pad| {
  145. let changeset = pad.modify_field(field_id, |field| {
  146. let deserializer = TypeOptionJsonDeserializer(field_rev.ty.into());
  147. match deserializer.deserialize(type_option_data) {
  148. Ok(json_str) => {
  149. let field_type = field.ty;
  150. field.insert_type_option_str(&field_type, json_str);
  151. },
  152. Err(err) => {
  153. tracing::error!("Deserialize data to type option json failed: {}", err);
  154. },
  155. }
  156. Ok(Some(()))
  157. })?;
  158. Ok(changeset)
  159. })
  160. .await?;
  161. self
  162. .database_views
  163. .did_update_field_type_option(view_id, field_id, old_field_rev)
  164. .await?;
  165. self.notify_did_update_database_field(field_id).await?;
  166. Ok(())
  167. }
  168. pub async fn next_field_rev(&self, field_type: &FieldType) -> FlowyResult<FieldRevision> {
  169. let name = format!(
  170. "Property {}",
  171. self.database_pad.read().await.get_fields().len() + 1
  172. );
  173. let field_rev = FieldBuilder::from_field_type(field_type)
  174. .name(&name)
  175. .build();
  176. Ok(field_rev)
  177. }
  178. pub async fn create_new_field_rev(&self, field_rev: FieldRevision) -> FlowyResult<()> {
  179. let field_id = field_rev.id.clone();
  180. self
  181. .modify(|pad| Ok(pad.create_field_rev(field_rev, None)?))
  182. .await?;
  183. self.notify_did_insert_database_field(&field_id).await?;
  184. Ok(())
  185. }
  186. pub async fn create_new_field_rev_with_type_option(
  187. &self,
  188. field_type: &FieldType,
  189. type_option_data: Option<Vec<u8>>,
  190. ) -> FlowyResult<FieldRevision> {
  191. let mut field_rev = self.next_field_rev(field_type).await?;
  192. if let Some(type_option_data) = type_option_data {
  193. let type_option_builder = type_option_builder_from_bytes(type_option_data, field_type);
  194. field_rev.insert_type_option(type_option_builder.serializer());
  195. }
  196. self
  197. .modify(|pad| Ok(pad.create_field_rev(field_rev.clone(), None)?))
  198. .await?;
  199. self.notify_did_insert_database_field(&field_rev.id).await?;
  200. Ok(field_rev)
  201. }
  202. pub async fn contain_field(&self, field_id: &str) -> bool {
  203. self.database_pad.read().await.contain_field(field_id)
  204. }
  205. pub async fn update_field(&self, params: FieldChangesetParams) -> FlowyResult<()> {
  206. let field_id = params.field_id.clone();
  207. self
  208. .modify(|pad| {
  209. let changeset = pad.modify_field(&params.field_id, |field| {
  210. if let Some(name) = params.name {
  211. field.name = name;
  212. }
  213. if let Some(desc) = params.desc {
  214. field.desc = desc;
  215. }
  216. if let Some(field_type) = params.field_type {
  217. field.ty = field_type;
  218. }
  219. if let Some(frozen) = params.frozen {
  220. field.frozen = frozen;
  221. }
  222. if let Some(visibility) = params.visibility {
  223. field.visibility = visibility;
  224. }
  225. if let Some(width) = params.width {
  226. field.width = width;
  227. }
  228. Ok(Some(()))
  229. })?;
  230. Ok(changeset)
  231. })
  232. .await?;
  233. self.notify_did_update_database_field(&field_id).await?;
  234. Ok(())
  235. }
  236. pub async fn modify_field_rev<F>(&self, view_id: &str, field_id: &str, f: F) -> FlowyResult<()>
  237. where
  238. F: for<'a> FnOnce(&'a mut FieldRevision) -> FlowyResult<Option<()>>,
  239. {
  240. let mut is_changed = false;
  241. let old_field_rev = self.get_field_rev(field_id).await;
  242. self
  243. .modify(|pad| {
  244. let changeset = pad.modify_field(field_id, |field_rev| {
  245. f(field_rev).map_err(|e| SyncError::internal().context(e))
  246. })?;
  247. is_changed = changeset.is_some();
  248. Ok(changeset)
  249. })
  250. .await?;
  251. if is_changed {
  252. match self
  253. .database_views
  254. .did_update_field_type_option(view_id, field_id, old_field_rev)
  255. .await
  256. {
  257. Ok(_) => {},
  258. Err(e) => tracing::error!("View manager update field failed: {:?}", e),
  259. }
  260. self.notify_did_update_database_field(field_id).await?;
  261. }
  262. Ok(())
  263. }
  264. pub async fn delete_field(&self, field_id: &str) -> FlowyResult<()> {
  265. self
  266. .modify(|pad| Ok(pad.delete_field_rev(field_id)?))
  267. .await?;
  268. let field_order = FieldIdPB::from(field_id);
  269. let notified_changeset = DatabaseFieldChangesetPB::delete(&self.database_id, vec![field_order]);
  270. self.notify_did_update_database(notified_changeset).await?;
  271. Ok(())
  272. }
  273. pub async fn group_by_field(&self, view_id: &str, field_id: &str) -> FlowyResult<()> {
  274. self
  275. .database_views
  276. .group_by_field(view_id, field_id)
  277. .await?;
  278. Ok(())
  279. }
  280. /// Switch the field with id to a new field type.
  281. ///
  282. /// If the field type is not exist before, the default type-option data will be created.
  283. /// Each field type has its corresponding data, aka, the type-option data. Check out [this](https://appflowy.gitbook.io/docs/essential-documentation/contribute-to-appflowy/architecture/frontend/grid#fieldtype)
  284. /// for more information
  285. ///
  286. /// # Arguments
  287. ///
  288. /// * `field_id`: the id of the field
  289. /// * `new_field_type`: the new field type of the field
  290. ///
  291. pub async fn switch_to_field_type(
  292. &self,
  293. field_id: &str,
  294. new_field_type: &FieldType,
  295. ) -> FlowyResult<()> {
  296. //
  297. let make_default_type_option = || -> String {
  298. return default_type_option_builder_from_type(new_field_type)
  299. .serializer()
  300. .json_str();
  301. };
  302. let type_option_transform = |old_field_type: FieldTypeRevision,
  303. old_type_option: Option<String>,
  304. new_type_option: String| {
  305. let old_field_type: FieldType = old_field_type.into();
  306. transform_type_option(
  307. &new_type_option,
  308. new_field_type,
  309. old_type_option,
  310. old_field_type,
  311. )
  312. };
  313. self
  314. .modify(|pad| {
  315. Ok(pad.switch_to_field(
  316. field_id,
  317. new_field_type.clone(),
  318. make_default_type_option,
  319. type_option_transform,
  320. )?)
  321. })
  322. .await?;
  323. self.notify_did_update_database_field(field_id).await?;
  324. Ok(())
  325. }
  326. pub async fn duplicate_field(&self, field_id: &str) -> FlowyResult<()> {
  327. let duplicated_field_id = gen_field_id();
  328. self
  329. .modify(|pad| Ok(pad.duplicate_field_rev(field_id, &duplicated_field_id)?))
  330. .await?;
  331. self
  332. .notify_did_insert_database_field(&duplicated_field_id)
  333. .await?;
  334. Ok(())
  335. }
  336. pub async fn get_field_rev(&self, field_id: &str) -> Option<Arc<FieldRevision>> {
  337. let field_rev = self
  338. .database_pad
  339. .read()
  340. .await
  341. .get_field_rev(field_id)?
  342. .1
  343. .clone();
  344. Some(field_rev)
  345. }
  346. pub async fn get_field_revs(
  347. &self,
  348. field_ids: Option<Vec<String>>,
  349. ) -> FlowyResult<Vec<Arc<FieldRevision>>> {
  350. if field_ids.is_none() {
  351. let field_revs = self.database_pad.read().await.get_field_revs(None)?;
  352. return Ok(field_revs);
  353. }
  354. let field_ids = field_ids.unwrap_or_default();
  355. let expected_len = field_ids.len();
  356. let field_revs = self
  357. .database_pad
  358. .read()
  359. .await
  360. .get_field_revs(Some(field_ids))?;
  361. if expected_len != 0 && field_revs.len() != expected_len {
  362. tracing::error!(
  363. "This is a bug. The len of the field_revs should equal to {}",
  364. expected_len
  365. );
  366. debug_assert!(field_revs.len() == expected_len);
  367. }
  368. Ok(field_revs)
  369. }
  370. pub async fn create_block(&self, block_meta_rev: DatabaseBlockMetaRevision) -> FlowyResult<()> {
  371. self
  372. .modify(|pad| Ok(pad.create_block_meta_rev(block_meta_rev)?))
  373. .await?;
  374. Ok(())
  375. }
  376. pub async fn update_block(
  377. &self,
  378. changeset: DatabaseBlockMetaRevisionChangeset,
  379. ) -> FlowyResult<()> {
  380. self
  381. .modify(|pad| Ok(pad.update_block_rev(changeset)?))
  382. .await?;
  383. Ok(())
  384. }
  385. pub async fn create_row(&self, params: CreateRowParams) -> FlowyResult<RowPB> {
  386. let mut row_rev = self.create_row_rev().await?;
  387. self
  388. .database_views
  389. .will_create_row(&mut row_rev, &params)
  390. .await;
  391. let row_pb = self
  392. .create_row_pb(row_rev, params.start_row_id.clone())
  393. .await?;
  394. self.database_views.did_create_row(&row_pb, &params).await;
  395. Ok(row_pb)
  396. }
  397. #[tracing::instrument(level = "trace", skip_all, err)]
  398. pub async fn move_group(&self, params: MoveGroupParams) -> FlowyResult<()> {
  399. self.database_views.move_group(params).await?;
  400. Ok(())
  401. }
  402. pub async fn insert_rows(&self, row_revs: Vec<RowRevision>) -> FlowyResult<Vec<RowPB>> {
  403. let block_id = self.block_id().await?;
  404. let mut rows_by_block_id: HashMap<String, Vec<RowRevision>> = HashMap::new();
  405. let mut row_orders = vec![];
  406. for row_rev in row_revs {
  407. row_orders.push(RowPB::from(&row_rev));
  408. rows_by_block_id
  409. .entry(block_id.clone())
  410. .or_insert_with(Vec::new)
  411. .push(row_rev);
  412. }
  413. let changesets = self.database_blocks.insert_row(rows_by_block_id).await?;
  414. for changeset in changesets {
  415. self.update_block(changeset).await?;
  416. }
  417. Ok(row_orders)
  418. }
  419. pub async fn update_row(&self, changeset: RowChangeset) -> FlowyResult<()> {
  420. let row_id = changeset.row_id.clone();
  421. let old_row = self.get_row_rev(&row_id).await?;
  422. self.database_blocks.update_row(changeset).await?;
  423. self.database_views.did_update_row(old_row, &row_id).await;
  424. Ok(())
  425. }
  426. /// Returns all the rows in this block.
  427. pub async fn get_row_pbs(&self, view_id: &str, block_id: &str) -> FlowyResult<Vec<RowPB>> {
  428. let rows = self.database_views.get_row_revs(view_id, block_id).await?;
  429. let rows = rows
  430. .into_iter()
  431. .map(|row_rev| RowPB::from(&row_rev))
  432. .collect();
  433. Ok(rows)
  434. }
  435. pub async fn get_all_row_revs(&self, view_id: &str) -> FlowyResult<Vec<Arc<RowRevision>>> {
  436. let mut all_rows = vec![];
  437. let blocks = self.database_blocks.get_blocks(None).await?;
  438. for block in blocks {
  439. let rows = self
  440. .database_views
  441. .get_row_revs(view_id, &block.block_id)
  442. .await?;
  443. all_rows.extend(rows);
  444. }
  445. Ok(all_rows)
  446. }
  447. pub async fn get_row_rev(&self, row_id: &str) -> FlowyResult<Option<Arc<RowRevision>>> {
  448. match self.database_blocks.get_row_rev(row_id).await? {
  449. None => Ok(None),
  450. Some((_, row_rev)) => Ok(Some(row_rev)),
  451. }
  452. }
  453. pub async fn delete_row(&self, row_id: &str) -> FlowyResult<()> {
  454. let row_rev = self.database_blocks.delete_row(row_id).await?;
  455. tracing::trace!("Did delete row:{:?}", row_rev);
  456. if let Some(row_rev) = row_rev {
  457. self.database_views.did_delete_row(row_rev).await;
  458. }
  459. Ok(())
  460. }
  461. pub async fn subscribe_view_changed(
  462. &self,
  463. view_id: &str,
  464. ) -> FlowyResult<broadcast::Receiver<DatabaseViewChanged>> {
  465. self.database_views.subscribe_view_changed(view_id).await
  466. }
  467. pub async fn duplicate_row(&self, _row_id: &str) -> FlowyResult<()> {
  468. Ok(())
  469. }
  470. /// Returns the cell data that encoded in protobuf.
  471. pub async fn get_cell(&self, params: &CellIdParams) -> Option<CellPB> {
  472. let (field_type, cell_bytes) = self.get_type_cell_protobuf(params).await?;
  473. Some(CellPB::new(
  474. &params.field_id,
  475. &params.row_id,
  476. field_type,
  477. cell_bytes.to_vec(),
  478. ))
  479. }
  480. /// Returns a string that represents the current field_type's cell data.
  481. /// For example:
  482. /// Multi-Select: list of the option's name separated by a comma.
  483. /// Number: 123 => $123 if the currency set.
  484. /// Date: 1653609600 => May 27,2022
  485. ///
  486. pub async fn get_cell_display_str(&self, params: &CellIdParams) -> String {
  487. let display_str = || async {
  488. let field_rev = self.get_field_rev(&params.field_id).await?;
  489. let field_type: FieldType = field_rev.ty.into();
  490. let cell_rev = self
  491. .get_cell_rev(&params.row_id, &params.field_id)
  492. .await
  493. .ok()??;
  494. let type_cell_data: TypeCellData = cell_rev.try_into().ok()?;
  495. Some(stringify_cell_data(
  496. type_cell_data.cell_str,
  497. &field_type,
  498. &field_type,
  499. &field_rev,
  500. ))
  501. };
  502. display_str().await.unwrap_or_default()
  503. }
  504. pub async fn get_cell_protobuf(&self, params: &CellIdParams) -> Option<CellProtobufBlob> {
  505. let (_, cell_data) = self.get_type_cell_protobuf(params).await?;
  506. Some(cell_data)
  507. }
  508. async fn get_type_cell_protobuf(
  509. &self,
  510. params: &CellIdParams,
  511. ) -> Option<(FieldType, CellProtobufBlob)> {
  512. let field_rev = self.get_field_rev(&params.field_id).await?;
  513. let (_, row_rev) = self
  514. .database_blocks
  515. .get_row_rev(&params.row_id)
  516. .await
  517. .ok()??;
  518. let cell_rev = row_rev.cells.get(&params.field_id)?.clone();
  519. Some(get_type_cell_protobuf(
  520. cell_rev.type_cell_data,
  521. &field_rev,
  522. Some(self.cell_data_cache.clone()),
  523. ))
  524. }
  525. pub async fn get_cell_rev(
  526. &self,
  527. row_id: &str,
  528. field_id: &str,
  529. ) -> FlowyResult<Option<CellRevision>> {
  530. match self.database_blocks.get_row_rev(row_id).await? {
  531. None => Ok(None),
  532. Some((_, row_rev)) => {
  533. let cell_rev = row_rev.cells.get(field_id).cloned();
  534. Ok(cell_rev)
  535. },
  536. }
  537. }
  538. /// Returns the list of cells corresponding to the given field.
  539. pub async fn get_cells_for_field(
  540. &self,
  541. view_id: &str,
  542. field_id: &str,
  543. ) -> FlowyResult<Vec<RowSingleCellData>> {
  544. let view_editor = self.database_views.get_view_editor(view_id).await?;
  545. view_editor.v_get_cells_for_field(field_id).await
  546. }
  547. #[tracing::instrument(level = "trace", skip_all, err)]
  548. pub async fn update_cell_with_changeset<T: ToCellChangesetString>(
  549. &self,
  550. row_id: &str,
  551. field_id: &str,
  552. cell_changeset: T,
  553. ) -> FlowyResult<()> {
  554. match self.database_pad.read().await.get_field_rev(field_id) {
  555. None => {
  556. let msg = format!("Field with id:{} not found", &field_id);
  557. Err(FlowyError::internal().context(msg))
  558. },
  559. Some((_, field_rev)) => {
  560. tracing::trace!(
  561. "Cell changeset: id:{} / value:{:?}",
  562. &field_id,
  563. cell_changeset
  564. );
  565. let old_row_rev = self.get_row_rev(row_id).await?.clone();
  566. let cell_rev = self.get_cell_rev(row_id, field_id).await?;
  567. // Update the changeset.data property with the return value.
  568. let type_cell_data = apply_cell_data_changeset(
  569. cell_changeset,
  570. cell_rev,
  571. field_rev,
  572. Some(self.cell_data_cache.clone()),
  573. )?;
  574. let cell_changeset = CellChangesetPB {
  575. view_id: self.database_id.clone(),
  576. row_id: row_id.to_owned(),
  577. field_id: field_id.to_owned(),
  578. type_cell_data,
  579. };
  580. self.database_blocks.update_cell(cell_changeset).await?;
  581. self
  582. .database_views
  583. .did_update_row(old_row_rev, row_id)
  584. .await;
  585. Ok(())
  586. },
  587. }
  588. }
  589. #[tracing::instrument(level = "trace", skip_all, err)]
  590. pub async fn update_cell<T: ToCellChangesetString>(
  591. &self,
  592. row_id: String,
  593. field_id: String,
  594. cell_changeset: T,
  595. ) -> FlowyResult<()> {
  596. self
  597. .update_cell_with_changeset(&row_id, &field_id, cell_changeset)
  598. .await
  599. }
  600. pub async fn get_block_meta_revs(&self) -> FlowyResult<Vec<Arc<DatabaseBlockMetaRevision>>> {
  601. let block_meta_revs = self.database_pad.read().await.get_block_meta_revs();
  602. Ok(block_meta_revs)
  603. }
  604. pub async fn get_blocks(
  605. &self,
  606. block_ids: Option<Vec<String>>,
  607. ) -> FlowyResult<Vec<DatabaseBlockRowRevision>> {
  608. let block_ids = match block_ids {
  609. None => self
  610. .database_pad
  611. .read()
  612. .await
  613. .get_block_meta_revs()
  614. .iter()
  615. .map(|block_rev| block_rev.block_id.clone())
  616. .collect::<Vec<String>>(),
  617. Some(block_ids) => block_ids,
  618. };
  619. let blocks = self.database_blocks.get_blocks(Some(block_ids)).await?;
  620. Ok(blocks)
  621. }
  622. pub async fn delete_rows(&self, block_rows: Vec<DatabaseBlockRow>) -> FlowyResult<()> {
  623. let changesets = self.database_blocks.delete_rows(block_rows).await?;
  624. for changeset in changesets {
  625. self.update_block(changeset).await?;
  626. }
  627. Ok(())
  628. }
  629. pub async fn get_database(&self, view_id: &str) -> FlowyResult<DatabasePB> {
  630. let pad = self.database_pad.read().await;
  631. let fields = pad
  632. .get_field_revs(None)?
  633. .iter()
  634. .map(FieldIdPB::from)
  635. .collect();
  636. let mut all_rows = vec![];
  637. for block_rev in pad.get_block_meta_revs() {
  638. if let Ok(rows) = self.get_row_pbs(view_id, &block_rev.block_id).await {
  639. all_rows.extend(rows);
  640. }
  641. }
  642. Ok(DatabasePB {
  643. id: self.database_id.clone(),
  644. fields,
  645. rows: all_rows,
  646. })
  647. }
  648. pub async fn get_setting(&self, view_id: &str) -> FlowyResult<DatabaseViewSettingPB> {
  649. self.database_views.get_setting(view_id).await
  650. }
  651. pub async fn get_all_filters(&self, view_id: &str) -> FlowyResult<Vec<FilterPB>> {
  652. Ok(
  653. self
  654. .database_views
  655. .get_all_filters(view_id)
  656. .await?
  657. .into_iter()
  658. .map(|filter| FilterPB::from(filter.as_ref()))
  659. .collect(),
  660. )
  661. }
  662. pub async fn get_filters(
  663. &self,
  664. view_id: &str,
  665. filter_id: FilterType,
  666. ) -> FlowyResult<Vec<Arc<FilterRevision>>> {
  667. self.database_views.get_filters(view_id, &filter_id).await
  668. }
  669. pub async fn create_or_update_filter(&self, params: AlterFilterParams) -> FlowyResult<()> {
  670. self.database_views.create_or_update_filter(params).await?;
  671. Ok(())
  672. }
  673. pub async fn delete_filter(&self, params: DeleteFilterParams) -> FlowyResult<()> {
  674. self.database_views.delete_filter(params).await?;
  675. Ok(())
  676. }
  677. pub async fn get_all_sorts(&self, view_id: &str) -> FlowyResult<Vec<SortPB>> {
  678. Ok(
  679. self
  680. .database_views
  681. .get_all_sorts(view_id)
  682. .await?
  683. .into_iter()
  684. .map(|sort| SortPB::from(sort.as_ref()))
  685. .collect(),
  686. )
  687. }
  688. pub async fn delete_all_sorts(&self, view_id: &str) -> FlowyResult<()> {
  689. self.database_views.delete_all_sorts(view_id).await
  690. }
  691. pub async fn delete_sort(&self, params: DeleteSortParams) -> FlowyResult<()> {
  692. self.database_views.delete_sort(params).await?;
  693. Ok(())
  694. }
  695. pub async fn create_or_update_sort(&self, params: AlterSortParams) -> FlowyResult<SortRevision> {
  696. let sort_rev = self.database_views.create_or_update_sort(params).await?;
  697. Ok(sort_rev)
  698. }
  699. pub async fn insert_group(&self, params: InsertGroupParams) -> FlowyResult<()> {
  700. self.database_views.insert_or_update_group(params).await
  701. }
  702. pub async fn delete_group(&self, params: DeleteGroupParams) -> FlowyResult<()> {
  703. self.database_views.delete_group(params).await
  704. }
  705. pub async fn move_row(&self, params: MoveRowParams) -> FlowyResult<()> {
  706. let MoveRowParams {
  707. view_id: _,
  708. from_row_id,
  709. to_row_id,
  710. } = params;
  711. match self.database_blocks.get_row_rev(&from_row_id).await? {
  712. None => tracing::warn!("Move row failed, can not find the row:{}", from_row_id),
  713. Some((_, row_rev)) => {
  714. match (
  715. self.database_blocks.index_of_row(&from_row_id).await,
  716. self.database_blocks.index_of_row(&to_row_id).await,
  717. ) {
  718. (Some(from_index), Some(to_index)) => {
  719. tracing::trace!("Move row from {} to {}", from_index, to_index);
  720. self
  721. .database_blocks
  722. .move_row(row_rev.clone(), from_index, to_index)
  723. .await?;
  724. },
  725. (_, None) => tracing::warn!("Can not find the from row id: {}", from_row_id),
  726. (None, _) => tracing::warn!("Can not find the to row id: {}", to_row_id),
  727. }
  728. },
  729. }
  730. Ok(())
  731. }
  732. pub async fn move_group_row(&self, params: MoveGroupRowParams) -> FlowyResult<()> {
  733. let MoveGroupRowParams {
  734. view_id,
  735. from_row_id,
  736. to_group_id,
  737. to_row_id,
  738. } = params;
  739. match self.database_blocks.get_row_rev(&from_row_id).await? {
  740. None => tracing::warn!("Move row failed, can not find the row:{}", from_row_id),
  741. Some((_, row_rev)) => {
  742. let block_manager = self.database_blocks.clone();
  743. self
  744. .database_views
  745. .move_group_row(
  746. &view_id.clone(),
  747. row_rev,
  748. to_group_id,
  749. to_row_id.clone(),
  750. |row_changeset| {
  751. to_fut(async move {
  752. tracing::trace!("Row data changed: {:?}", row_changeset);
  753. let cell_changesets = row_changeset
  754. .cell_by_field_id
  755. .into_iter()
  756. .map(|(field_id, cell_rev)| CellChangesetPB {
  757. view_id: view_id.clone(),
  758. row_id: row_changeset.row_id.clone(),
  759. field_id,
  760. type_cell_data: cell_rev.type_cell_data,
  761. })
  762. .collect::<Vec<CellChangesetPB>>();
  763. for cell_changeset in cell_changesets {
  764. match block_manager.update_cell(cell_changeset).await {
  765. Ok(_) => {},
  766. Err(e) => tracing::error!("Apply cell changeset error:{:?}", e),
  767. }
  768. }
  769. })
  770. },
  771. )
  772. .await?;
  773. },
  774. }
  775. Ok(())
  776. }
  777. pub async fn move_field(&self, params: MoveFieldParams) -> FlowyResult<()> {
  778. let MoveFieldParams {
  779. view_id: _,
  780. field_id,
  781. from_index,
  782. to_index,
  783. } = params;
  784. self
  785. .modify(|pad| Ok(pad.move_field(&field_id, from_index as usize, to_index as usize)?))
  786. .await?;
  787. if let Some((index, field_rev)) = self.database_pad.read().await.get_field_rev(&field_id) {
  788. let delete_field_order = FieldIdPB::from(field_id);
  789. let insert_field = IndexFieldPB::from_field_rev(field_rev, index);
  790. let notified_changeset = DatabaseFieldChangesetPB {
  791. view_id: self.database_id.clone(),
  792. inserted_fields: vec![insert_field],
  793. deleted_fields: vec![delete_field_order],
  794. updated_fields: vec![],
  795. };
  796. self.notify_did_update_database(notified_changeset).await?;
  797. }
  798. Ok(())
  799. }
  800. pub async fn duplicate_database(&self, view_id: &str) -> FlowyResult<BuildDatabaseContext> {
  801. let database_pad = self.database_pad.read().await;
  802. let database_view_data = self.database_views.duplicate_database_view(view_id).await?;
  803. let original_blocks = database_pad.get_block_meta_revs();
  804. let (duplicated_fields, duplicated_blocks) = database_pad.duplicate_database_block_meta().await;
  805. let mut blocks_meta_data = vec![];
  806. if original_blocks.len() == duplicated_blocks.len() {
  807. for (index, original_block_meta) in original_blocks.iter().enumerate() {
  808. let database_block_meta_editor = self
  809. .database_blocks
  810. .get_or_create_block_editor(&original_block_meta.block_id)
  811. .await?;
  812. let duplicated_block_id = &duplicated_blocks[index].block_id;
  813. tracing::trace!("Duplicate block:{} meta data", duplicated_block_id);
  814. let duplicated_block_meta_data = database_block_meta_editor
  815. .duplicate_block(duplicated_block_id)
  816. .await;
  817. blocks_meta_data.push(duplicated_block_meta_data);
  818. }
  819. } else {
  820. debug_assert_eq!(original_blocks.len(), duplicated_blocks.len());
  821. }
  822. drop(database_pad);
  823. Ok(BuildDatabaseContext {
  824. field_revs: duplicated_fields.into_iter().map(Arc::new).collect(),
  825. block_metas: duplicated_blocks,
  826. blocks: blocks_meta_data,
  827. database_view_data,
  828. })
  829. }
  830. #[tracing::instrument(level = "trace", skip_all, err)]
  831. pub async fn load_groups(&self, view_id: &str) -> FlowyResult<RepeatedGroupPB> {
  832. self.database_views.load_groups(view_id).await
  833. }
  834. async fn create_row_rev(&self) -> FlowyResult<RowRevision> {
  835. let field_revs = self.database_pad.read().await.get_field_revs(None)?;
  836. let block_id = self.block_id().await?;
  837. // insert empty row below the row whose id is upper_row_id
  838. let row_rev = RowRevisionBuilder::new(&block_id, field_revs).build();
  839. Ok(row_rev)
  840. }
  841. async fn create_row_pb(
  842. &self,
  843. row_rev: RowRevision,
  844. start_row_id: Option<String>,
  845. ) -> FlowyResult<RowPB> {
  846. let row_pb = RowPB::from(&row_rev);
  847. let block_id = row_rev.block_id.clone();
  848. // insert the row
  849. let row_count = self
  850. .database_blocks
  851. .create_row(row_rev, start_row_id)
  852. .await?;
  853. // update block row count
  854. let changeset = DatabaseBlockMetaRevisionChangeset::from_row_count(block_id, row_count);
  855. self.update_block(changeset).await?;
  856. Ok(row_pb)
  857. }
  858. async fn modify<F>(&self, f: F) -> FlowyResult<()>
  859. where
  860. F:
  861. for<'a> FnOnce(&'a mut DatabaseRevisionPad) -> FlowyResult<Option<DatabaseRevisionChangeset>>,
  862. {
  863. let mut write_guard = self.database_pad.write().await;
  864. if let Some(changeset) = f(&mut write_guard)? {
  865. self.apply_change(changeset).await?;
  866. }
  867. Ok(())
  868. }
  869. async fn apply_change(&self, change: DatabaseRevisionChangeset) -> FlowyResult<()> {
  870. let DatabaseRevisionChangeset {
  871. operations: delta,
  872. md5,
  873. } = change;
  874. let data = delta.json_bytes();
  875. let _ = self.rev_manager.add_local_revision(data, md5).await?;
  876. Ok(())
  877. }
  878. async fn block_id(&self) -> FlowyResult<String> {
  879. match self.database_pad.read().await.get_block_meta_revs().last() {
  880. None => Err(FlowyError::internal().context("There is no block in this database")),
  881. Some(database_block) => Ok(database_block.block_id.clone()),
  882. }
  883. }
  884. #[tracing::instrument(level = "trace", skip_all, err)]
  885. async fn notify_did_insert_database_field(&self, field_id: &str) -> FlowyResult<()> {
  886. if let Some((index, field_rev)) = self.database_pad.read().await.get_field_rev(field_id) {
  887. let index_field = IndexFieldPB::from_field_rev(field_rev, index);
  888. if let Ok(views) = self.database_ref_query.get_ref_views(&self.database_id) {
  889. for view in views {
  890. let notified_changeset =
  891. DatabaseFieldChangesetPB::insert(&view.view_id, vec![index_field.clone()]);
  892. self.notify_did_update_database(notified_changeset).await?;
  893. }
  894. }
  895. }
  896. Ok(())
  897. }
  898. #[tracing::instrument(level = "trace", skip_all, err)]
  899. async fn notify_did_update_database_field(&self, field_id: &str) -> FlowyResult<()> {
  900. if let Some((_, field_rev)) = self
  901. .database_pad
  902. .read()
  903. .await
  904. .get_field_rev(field_id)
  905. .map(|(index, field)| (index, field.clone()))
  906. {
  907. let updated_field = FieldPB::from(field_rev);
  908. let notified_changeset =
  909. DatabaseFieldChangesetPB::update(&self.database_id, vec![updated_field.clone()]);
  910. self.notify_did_update_database(notified_changeset).await?;
  911. send_notification(field_id, DatabaseNotification::DidUpdateField)
  912. .payload(updated_field)
  913. .send();
  914. }
  915. Ok(())
  916. }
  917. async fn notify_did_update_database(
  918. &self,
  919. changeset: DatabaseFieldChangesetPB,
  920. ) -> FlowyResult<()> {
  921. if let Ok(views) = self.database_ref_query.get_ref_views(&self.database_id) {
  922. for view in views {
  923. send_notification(&view.view_id, DatabaseNotification::DidUpdateFields)
  924. .payload(changeset.clone())
  925. .send();
  926. }
  927. }
  928. Ok(())
  929. }
  930. }
  931. #[cfg(feature = "flowy_unit_test")]
  932. impl DatabaseEditor {
  933. pub fn rev_manager(&self) -> Arc<RevisionManager<Arc<ConnectionPool>>> {
  934. self.rev_manager.clone()
  935. }
  936. pub fn database_pad(&self) -> Arc<RwLock<DatabaseRevisionPad>> {
  937. self.database_pad.clone()
  938. }
  939. }
  940. pub struct DatabaseRevisionSerde();
  941. impl RevisionObjectDeserializer for DatabaseRevisionSerde {
  942. type Output = DatabaseRevisionPad;
  943. fn deserialize_revisions(
  944. _object_id: &str,
  945. revisions: Vec<Revision>,
  946. ) -> FlowyResult<Self::Output> {
  947. let pad = DatabaseRevisionPad::from_revisions(revisions)?;
  948. Ok(pad)
  949. }
  950. fn recover_from_revisions(_revisions: Vec<Revision>) -> Option<(Self::Output, i64)> {
  951. None
  952. }
  953. }
  954. impl RevisionObjectSerializer for DatabaseRevisionSerde {
  955. fn combine_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  956. let operations = make_operations_from_revisions::<EmptyAttributes>(revisions)?;
  957. Ok(operations.json_bytes())
  958. }
  959. }
  960. pub struct DatabaseRevisionCloudService {
  961. #[allow(dead_code)]
  962. token: String,
  963. }
  964. impl DatabaseRevisionCloudService {
  965. pub fn new(token: String) -> Self {
  966. Self { token }
  967. }
  968. }
  969. impl RevisionCloudService for DatabaseRevisionCloudService {
  970. #[tracing::instrument(level = "trace", skip(self))]
  971. fn fetch_object(
  972. &self,
  973. _user_id: &str,
  974. _object_id: &str,
  975. ) -> FutureResult<Vec<Revision>, FlowyError> {
  976. FutureResult::new(async move { Ok(vec![]) })
  977. }
  978. }
  979. pub struct DatabaseRevisionMergeable();
  980. impl RevisionMergeable for DatabaseRevisionMergeable {
  981. fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  982. DatabaseRevisionSerde::combine_revisions(revisions)
  983. }
  984. }
  985. struct TypeOptionJsonDeserializer(FieldType);
  986. impl JsonDeserializer for TypeOptionJsonDeserializer {
  987. fn deserialize(&self, type_option_data: Vec<u8>) -> SyncResult<String> {
  988. // The type_option_data sent from Dart is serialized by protobuf.
  989. let builder = type_option_builder_from_bytes(type_option_data, &self.0);
  990. let json = builder.serializer().json_str();
  991. tracing::trace!("Deserialize type-option data to: {}", json);
  992. Ok(json)
  993. }
  994. }