database_editor.rs 36 KB


  1. use crate::entities::CellIdParams;
  2. use crate::entities::*;
  3. use crate::manager::DatabaseUser;
  4. use crate::notification::{send_notification, DatabaseNotification};
  5. use crate::services::cell::{
  6. apply_cell_data_changeset, get_type_cell_protobuf, stringify_cell_data, AnyTypeCache,
  7. AtomicCellDataCache, CellProtobufBlob, ToCellChangesetString, TypeCellData,
  8. };
  9. use crate::services::database::DatabaseBlocks;
  10. use crate::services::field::{
  11. default_type_option_builder_from_type, transform_type_option, type_option_builder_from_bytes,
  12. FieldBuilder, RowSingleCellData,
  13. };
  14. use crate::services::database::DatabaseViewDataImpl;
  15. use crate::services::database_view::{
  16. DatabaseViewChanged, DatabaseViewData, DatabaseViewEditor, DatabaseViews,
  17. };
  18. use crate::services::filter::FilterType;
  19. use crate::services::persistence::block_index::BlockRowIndexer;
  20. use crate::services::persistence::database_ref::DatabaseViewRef;
  21. use crate::services::row::{DatabaseBlockRow, DatabaseBlockRowRevision, RowRevisionBuilder};
  22. use bytes::Bytes;
  23. use database_model::*;
  24. use flowy_client_sync::client_database::{
  25. DatabaseRevisionChangeset, DatabaseRevisionPad, JsonDeserializer,
  26. };
  27. use flowy_client_sync::errors::{SyncError, SyncResult};
  28. use flowy_client_sync::make_operations_from_revisions;
  29. use flowy_error::{FlowyError, FlowyResult};
  30. use flowy_revision::{
  31. RevisionCloudService, RevisionManager, RevisionMergeable, RevisionObjectDeserializer,
  32. RevisionObjectSerializer,
  33. };
  34. use flowy_sqlite::ConnectionPool;
  35. use flowy_task::TaskDispatcher;
  36. use lib_infra::future::{to_fut, FutureResult};
  37. use lib_ot::core::EmptyAttributes;
  38. use revision_model::Revision;
  39. use std::collections::HashMap;
  40. use std::sync::Arc;
  41. use tokio::sync::{broadcast, RwLock};
  42. pub trait DatabaseRefIndexerQuery: Send + Sync + 'static {
  43. fn get_ref_views(&self, database_id: &str) -> FlowyResult<Vec<DatabaseViewRef>>;
  44. }
  45. pub struct DatabaseEditor {
  46. pub database_id: String,
  47. database_pad: Arc<RwLock<DatabaseRevisionPad>>,
  48. rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
  49. database_views: Arc<DatabaseViews>,
  50. database_blocks: Arc<DatabaseBlocks>,
  51. pub database_view_data: Arc<dyn DatabaseViewData>,
  52. pub cell_data_cache: AtomicCellDataCache,
  53. database_ref_query: Arc<dyn DatabaseRefIndexerQuery>,
  54. }
  55. impl Drop for DatabaseEditor {
  56. fn drop(&mut self) {
  57. tracing::trace!("Drop DatabaseRevisionEditor");
  58. }
  59. }
  60. impl DatabaseEditor {
  61. #[allow(clippy::too_many_arguments)]
  62. pub async fn new(
  63. database_id: &str,
  64. user: Arc<dyn DatabaseUser>,
  65. database_pad: Arc<RwLock<DatabaseRevisionPad>>,
  66. rev_manager: RevisionManager<Arc<ConnectionPool>>,
  67. persistence: Arc<BlockRowIndexer>,
  68. database_ref_query: Arc<dyn DatabaseRefIndexerQuery>,
  69. task_scheduler: Arc<RwLock<TaskDispatcher>>,
  70. ) -> FlowyResult<Arc<Self>> {
  71. let rev_manager = Arc::new(rev_manager);
  72. let cell_data_cache = AnyTypeCache::<u64>::new();
  73. // Block manager
  74. let (block_event_tx, block_event_rx) = broadcast::channel(100);
  75. let block_meta_revs = database_pad.read().await.get_block_meta_revs();
  76. let database_blocks =
  77. Arc::new(DatabaseBlocks::new(&user, block_meta_revs, persistence, block_event_tx).await?);
  78. let database_view_data = Arc::new(DatabaseViewDataImpl {
  79. pad: database_pad.clone(),
  80. blocks: database_blocks.clone(),
  81. task_scheduler,
  82. cell_data_cache: cell_data_cache.clone(),
  83. });
  84. // View manager
  85. let database_views = DatabaseViews::new(
  86. user.clone(),
  87. database_view_data.clone(),
  88. cell_data_cache.clone(),
  89. block_event_rx,
  90. )
  91. .await?;
  92. let database_views = Arc::new(database_views);
  93. let editor = Arc::new(Self {
  94. database_id: database_id.to_owned(),
  95. database_pad,
  96. rev_manager,
  97. database_blocks,
  98. database_views,
  99. cell_data_cache,
  100. database_ref_query,
  101. database_view_data,
  102. });
  103. Ok(editor)
  104. }
  105. pub async fn open_view_editor(&self, view_editor: DatabaseViewEditor) {
  106. self.database_views.open(view_editor).await
  107. }
  108. #[tracing::instrument(level = "debug", skip_all)]
  109. pub async fn close_view_editor(&self, view_id: &str) {
  110. self.database_views.close(view_id).await;
  111. }
  112. pub async fn dispose(&self) {
  113. self.rev_manager.generate_snapshot().await;
  114. self.database_blocks.close().await;
  115. self.rev_manager.close().await;
  116. }
  117. pub async fn number_of_ref_views(&self) -> usize {
  118. self.database_views.number_of_views().await
  119. }
  120. pub async fn is_view_open(&self, view_id: &str) -> bool {
  121. self.database_views.is_view_exist(view_id).await
  122. }
  123. /// Save the type-option data to disk and send a `DatabaseNotification::DidUpdateField` notification
  124. /// to dart side.
  125. ///
  126. /// It will do nothing if the passed-in type_option_data is empty
  127. /// # Arguments
  128. ///
  129. /// * `field_id`: the id of the field
  130. /// * `type_option_data`: the updated type-option data. The `type-option` data might be empty
  131. /// if there is no type-option config for that field. For example, the `RichTextTypeOptionPB`.
  132. ///
  133. pub async fn update_field_type_option(
  134. &self,
  135. view_id: &str,
  136. field_id: &str,
  137. type_option_data: Vec<u8>,
  138. old_field_rev: Option<Arc<FieldRevision>>,
  139. ) -> FlowyResult<()> {
  140. let result = self.get_field_rev(field_id).await;
  141. if result.is_none() {
  142. tracing::warn!("Can't find the field with id: {}", field_id);
  143. return Ok(());
  144. }
  145. let field_rev = result.unwrap();
  146. self
  147. .modify(|pad| {
  148. let changeset = pad.modify_field(field_id, |field| {
  149. let deserializer = TypeOptionJsonDeserializer(field_rev.ty.into());
  150. match deserializer.deserialize(type_option_data) {
  151. Ok(json_str) => {
  152. let field_type = field.ty;
  153. field.insert_type_option_str(&field_type, json_str);
  154. },
  155. Err(err) => {
  156. tracing::error!("Deserialize data to type option json failed: {}", err);
  157. },
  158. }
  159. Ok(Some(()))
  160. })?;
  161. Ok(changeset)
  162. })
  163. .await?;
  164. self
  165. .database_views
  166. .did_update_field_type_option(view_id, field_id, old_field_rev)
  167. .await?;
  168. self.notify_did_update_database_field(field_id).await?;
  169. Ok(())
  170. }
  171. pub async fn next_field_rev(&self, field_type: &FieldType) -> FlowyResult<FieldRevision> {
  172. let name = format!(
  173. "Property {}",
  174. self.database_pad.read().await.get_fields().len() + 1
  175. );
  176. let field_rev = FieldBuilder::from_field_type(field_type)
  177. .name(&name)
  178. .build();
  179. Ok(field_rev)
  180. }
  181. pub async fn create_new_field_rev(&self, field_rev: FieldRevision) -> FlowyResult<()> {
  182. let field_id = field_rev.id.clone();
  183. self
  184. .modify(|pad| Ok(pad.create_field_rev(field_rev, None)?))
  185. .await?;
  186. self.notify_did_insert_database_field(&field_id).await?;
  187. Ok(())
  188. }
  189. pub async fn create_new_field_rev_with_type_option(
  190. &self,
  191. field_type: &FieldType,
  192. type_option_data: Option<Vec<u8>>,
  193. ) -> FlowyResult<FieldRevision> {
  194. let mut field_rev = self.next_field_rev(field_type).await?;
  195. if let Some(type_option_data) = type_option_data {
  196. let type_option_builder = type_option_builder_from_bytes(type_option_data, field_type);
  197. field_rev.insert_type_option(type_option_builder.serializer());
  198. }
  199. self
  200. .modify(|pad| Ok(pad.create_field_rev(field_rev.clone(), None)?))
  201. .await?;
  202. self.notify_did_insert_database_field(&field_rev.id).await?;
  203. Ok(field_rev)
  204. }
  205. pub async fn contain_field(&self, field_id: &str) -> bool {
  206. self.database_pad.read().await.contain_field(field_id)
  207. }
  208. pub async fn update_field(&self, params: FieldChangesetParams) -> FlowyResult<()> {
  209. let field_id = params.field_id.clone();
  210. self
  211. .modify(|pad| {
  212. let changeset = pad.modify_field(&params.field_id, |field| {
  213. if let Some(name) = params.name {
  214. field.name = name;
  215. }
  216. if let Some(desc) = params.desc {
  217. field.desc = desc;
  218. }
  219. if let Some(field_type) = params.field_type {
  220. field.ty = field_type;
  221. }
  222. if let Some(frozen) = params.frozen {
  223. field.frozen = frozen;
  224. }
  225. if let Some(visibility) = params.visibility {
  226. field.visibility = visibility;
  227. }
  228. if let Some(width) = params.width {
  229. field.width = width;
  230. }
  231. Ok(Some(()))
  232. })?;
  233. Ok(changeset)
  234. })
  235. .await?;
  236. self.notify_did_update_database_field(&field_id).await?;
  237. Ok(())
  238. }
  239. pub async fn modify_field_rev<F>(&self, view_id: &str, field_id: &str, f: F) -> FlowyResult<()>
  240. where
  241. F: for<'a> FnOnce(&'a mut FieldRevision) -> FlowyResult<Option<()>>,
  242. {
  243. let mut is_changed = false;
  244. let old_field_rev = self.get_field_rev(field_id).await;
  245. self
  246. .modify(|pad| {
  247. let changeset = pad.modify_field(field_id, |field_rev| {
  248. f(field_rev).map_err(|e| SyncError::internal().context(e))
  249. })?;
  250. is_changed = changeset.is_some();
  251. Ok(changeset)
  252. })
  253. .await?;
  254. if is_changed {
  255. match self
  256. .database_views
  257. .did_update_field_type_option(view_id, field_id, old_field_rev)
  258. .await
  259. {
  260. Ok(_) => {},
  261. Err(e) => tracing::error!("View manager update field failed: {:?}", e),
  262. }
  263. self.notify_did_update_database_field(field_id).await?;
  264. }
  265. Ok(())
  266. }
  267. pub async fn delete_field(&self, field_id: &str) -> FlowyResult<()> {
  268. self
  269. .modify(|pad| Ok(pad.delete_field_rev(field_id)?))
  270. .await?;
  271. let field_order = FieldIdPB::from(field_id);
  272. let notified_changeset = DatabaseFieldChangesetPB::delete(&self.database_id, vec![field_order]);
  273. self.notify_did_update_database(notified_changeset).await?;
  274. Ok(())
  275. }
  276. pub async fn group_by_field(&self, view_id: &str, field_id: &str) -> FlowyResult<()> {
  277. self
  278. .database_views
  279. .group_by_field(view_id, field_id)
  280. .await?;
  281. Ok(())
  282. }
  283. /// Switch the field with id to a new field type.
  284. ///
  285. /// If the field type is not exist before, the default type-option data will be created.
  286. /// Each field type has its corresponding data, aka, the type-option data. Check out [this](https://appflowy.gitbook.io/docs/essential-documentation/contribute-to-appflowy/architecture/frontend/grid#fieldtype)
  287. /// for more information
  288. ///
  289. /// # Arguments
  290. ///
  291. /// * `field_id`: the id of the field
  292. /// * `new_field_type`: the new field type of the field
  293. ///
  294. pub async fn switch_to_field_type(
  295. &self,
  296. field_id: &str,
  297. new_field_type: &FieldType,
  298. ) -> FlowyResult<()> {
  299. //
  300. let make_default_type_option = || -> String {
  301. return default_type_option_builder_from_type(new_field_type)
  302. .serializer()
  303. .json_str();
  304. };
  305. let type_option_transform = |old_field_type: FieldTypeRevision,
  306. old_type_option: Option<String>,
  307. new_type_option: String| {
  308. let old_field_type: FieldType = old_field_type.into();
  309. transform_type_option(
  310. &new_type_option,
  311. new_field_type,
  312. old_type_option,
  313. old_field_type,
  314. )
  315. };
  316. self
  317. .modify(|pad| {
  318. Ok(pad.switch_to_field(
  319. field_id,
  320. new_field_type.clone(),
  321. make_default_type_option,
  322. type_option_transform,
  323. )?)
  324. })
  325. .await?;
  326. self.notify_did_update_database_field(field_id).await?;
  327. Ok(())
  328. }
  329. pub async fn duplicate_field(&self, field_id: &str) -> FlowyResult<()> {
  330. let duplicated_field_id = gen_field_id();
  331. self
  332. .modify(|pad| Ok(pad.duplicate_field_rev(field_id, &duplicated_field_id)?))
  333. .await?;
  334. self
  335. .notify_did_insert_database_field(&duplicated_field_id)
  336. .await?;
  337. Ok(())
  338. }
  339. pub async fn get_field_rev(&self, field_id: &str) -> Option<Arc<FieldRevision>> {
  340. let field_rev = self
  341. .database_pad
  342. .read()
  343. .await
  344. .get_field_rev(field_id)?
  345. .1
  346. .clone();
  347. Some(field_rev)
  348. }
  349. pub async fn get_field_revs(
  350. &self,
  351. field_ids: Option<Vec<String>>,
  352. ) -> FlowyResult<Vec<Arc<FieldRevision>>> {
  353. if field_ids.is_none() {
  354. let field_revs = self.database_pad.read().await.get_field_revs(None)?;
  355. return Ok(field_revs);
  356. }
  357. let field_ids = field_ids.unwrap_or_default();
  358. let expected_len = field_ids.len();
  359. let field_revs = self
  360. .database_pad
  361. .read()
  362. .await
  363. .get_field_revs(Some(field_ids))?;
  364. if expected_len != 0 && field_revs.len() != expected_len {
  365. tracing::error!(
  366. "This is a bug. The len of the field_revs should equal to {}",
  367. expected_len
  368. );
  369. debug_assert!(field_revs.len() == expected_len);
  370. }
  371. Ok(field_revs)
  372. }
  373. pub async fn create_block(&self, block_meta_rev: DatabaseBlockMetaRevision) -> FlowyResult<()> {
  374. self
  375. .modify(|pad| Ok(pad.create_block_meta_rev(block_meta_rev)?))
  376. .await?;
  377. Ok(())
  378. }
  379. pub async fn update_block(
  380. &self,
  381. changeset: DatabaseBlockMetaRevisionChangeset,
  382. ) -> FlowyResult<()> {
  383. self
  384. .modify(|pad| Ok(pad.update_block_rev(changeset)?))
  385. .await?;
  386. Ok(())
  387. }
  388. pub async fn create_row(&self, params: CreateRowParams) -> FlowyResult<RowPB> {
  389. let mut row_rev = self
  390. .create_row_rev(params.cell_data_by_field_id.clone())
  391. .await?;
  392. self
  393. .database_views
  394. .will_create_row(&mut row_rev, &params)
  395. .await;
  396. let row_pb = self
  397. .create_row_pb(row_rev, params.start_row_id.clone())
  398. .await?;
  399. self.database_views.did_create_row(&row_pb, &params).await;
  400. Ok(row_pb)
  401. }
  402. #[tracing::instrument(level = "trace", skip_all, err)]
  403. pub async fn move_group(&self, params: MoveGroupParams) -> FlowyResult<()> {
  404. self.database_views.move_group(params).await?;
  405. Ok(())
  406. }
  407. pub async fn insert_rows(&self, row_revs: Vec<RowRevision>) -> FlowyResult<Vec<RowPB>> {
  408. let block_id = self.block_id().await?;
  409. let mut rows_by_block_id: HashMap<String, Vec<RowRevision>> = HashMap::new();
  410. let mut row_orders = vec![];
  411. for row_rev in row_revs {
  412. row_orders.push(RowPB::from(&row_rev));
  413. rows_by_block_id
  414. .entry(block_id.clone())
  415. .or_insert_with(Vec::new)
  416. .push(row_rev);
  417. }
  418. let changesets = self.database_blocks.insert_row(rows_by_block_id).await?;
  419. for changeset in changesets {
  420. self.update_block(changeset).await?;
  421. }
  422. Ok(row_orders)
  423. }
  424. pub async fn update_row(&self, changeset: RowChangeset) -> FlowyResult<()> {
  425. let row_id = changeset.row_id.clone();
  426. let old_row = self.get_row_rev(&row_id).await?;
  427. self.database_blocks.update_row(changeset).await?;
  428. self.database_views.did_update_row(old_row, &row_id).await;
  429. Ok(())
  430. }
  431. /// Returns all the rows in this block.
  432. pub async fn get_row_pbs(&self, view_id: &str, block_id: &str) -> FlowyResult<Vec<RowPB>> {
  433. let rows = self.database_views.get_row_revs(view_id, block_id).await?;
  434. let rows = rows
  435. .into_iter()
  436. .map(|row_rev| RowPB::from(&row_rev))
  437. .collect();
  438. Ok(rows)
  439. }
  440. pub async fn get_all_row_revs(&self, view_id: &str) -> FlowyResult<Vec<Arc<RowRevision>>> {
  441. let mut all_rows = vec![];
  442. let blocks = self.database_blocks.get_blocks(None).await?;
  443. for block in blocks {
  444. let rows = self
  445. .database_views
  446. .get_row_revs(view_id, &block.block_id)
  447. .await?;
  448. all_rows.extend(rows);
  449. }
  450. Ok(all_rows)
  451. }
  452. pub async fn get_row_rev(&self, row_id: &str) -> FlowyResult<Option<Arc<RowRevision>>> {
  453. match self.database_blocks.get_row_rev(row_id).await? {
  454. None => Ok(None),
  455. Some((_, row_rev)) => Ok(Some(row_rev)),
  456. }
  457. }
  458. pub async fn delete_row(&self, row_id: &str) -> FlowyResult<()> {
  459. let row_rev = self.database_blocks.delete_row(row_id).await?;
  460. tracing::trace!("Did delete row:{:?}", row_rev);
  461. if let Some(row_rev) = row_rev {
  462. self.database_views.did_delete_row(row_rev).await;
  463. }
  464. Ok(())
  465. }
  466. pub async fn subscribe_view_changed(
  467. &self,
  468. view_id: &str,
  469. ) -> FlowyResult<broadcast::Receiver<DatabaseViewChanged>> {
  470. self.database_views.subscribe_view_changed(view_id).await
  471. }
  472. pub async fn duplicate_row(&self, _row_id: &str) -> FlowyResult<()> {
  473. Ok(())
  474. }
  475. /// Returns the cell data that encoded in protobuf.
  476. pub async fn get_cell(&self, params: &CellIdParams) -> Option<CellPB> {
  477. let (field_type, cell_bytes) = self.get_type_cell_protobuf(params).await?;
  478. Some(CellPB::new(
  479. &params.field_id,
  480. &params.row_id,
  481. field_type,
  482. cell_bytes.to_vec(),
  483. ))
  484. }
  485. /// Returns a string that represents the current field_type's cell data.
  486. /// For example:
  487. /// Multi-Select: list of the option's name separated by a comma.
  488. /// Number: 123 => $123 if the currency set.
  489. /// Date: 1653609600 => May 27,2022
  490. ///
  491. pub async fn get_cell_display_str(&self, params: &CellIdParams) -> String {
  492. let display_str = || async {
  493. let field_rev = self.get_field_rev(&params.field_id).await?;
  494. let field_type: FieldType = field_rev.ty.into();
  495. let cell_rev = self
  496. .get_cell_rev(&params.row_id, &params.field_id)
  497. .await
  498. .ok()??;
  499. let type_cell_data: TypeCellData = cell_rev.try_into().ok()?;
  500. Some(stringify_cell_data(
  501. type_cell_data.cell_str,
  502. &field_type,
  503. &field_type,
  504. &field_rev,
  505. ))
  506. };
  507. display_str().await.unwrap_or_default()
  508. }
  509. pub async fn get_cell_protobuf(&self, params: &CellIdParams) -> Option<CellProtobufBlob> {
  510. let (_, cell_data) = self.get_type_cell_protobuf(params).await?;
  511. Some(cell_data)
  512. }
  513. async fn get_type_cell_protobuf(
  514. &self,
  515. params: &CellIdParams,
  516. ) -> Option<(FieldType, CellProtobufBlob)> {
  517. let field_rev = self.get_field_rev(&params.field_id).await?;
  518. let (_, row_rev) = self
  519. .database_blocks
  520. .get_row_rev(&params.row_id)
  521. .await
  522. .ok()??;
  523. let cell_rev = row_rev.cells.get(&params.field_id)?.clone();
  524. Some(get_type_cell_protobuf(
  525. cell_rev.type_cell_data,
  526. &field_rev,
  527. Some(self.cell_data_cache.clone()),
  528. ))
  529. }
  530. pub async fn get_cell_rev(
  531. &self,
  532. row_id: &str,
  533. field_id: &str,
  534. ) -> FlowyResult<Option<CellRevision>> {
  535. match self.database_blocks.get_row_rev(row_id).await? {
  536. None => Ok(None),
  537. Some((_, row_rev)) => {
  538. let cell_rev = row_rev.cells.get(field_id).cloned();
  539. Ok(cell_rev)
  540. },
  541. }
  542. }
  543. /// Returns the list of cells corresponding to the given field.
  544. pub async fn get_cells_for_field(
  545. &self,
  546. view_id: &str,
  547. field_id: &str,
  548. ) -> FlowyResult<Vec<RowSingleCellData>> {
  549. let view_editor = self.database_views.get_view_editor(view_id).await?;
  550. view_editor.v_get_cells_for_field(field_id).await
  551. }
  552. #[tracing::instrument(level = "trace", skip_all, err)]
  553. pub async fn update_cell_with_changeset<T: ToCellChangesetString>(
  554. &self,
  555. row_id: &str,
  556. field_id: &str,
  557. cell_changeset: T,
  558. ) -> FlowyResult<()> {
  559. match self.database_pad.read().await.get_field_rev(field_id) {
  560. None => {
  561. let msg = format!("Field with id:{} not found", &field_id);
  562. Err(FlowyError::internal().context(msg))
  563. },
  564. Some((_, field_rev)) => {
  565. tracing::trace!(
  566. "Cell changeset: id:{} / value:{:?}",
  567. &field_id,
  568. cell_changeset
  569. );
  570. let old_row_rev = self.get_row_rev(row_id).await?.clone();
  571. let cell_rev = self.get_cell_rev(row_id, field_id).await?;
  572. // Update the changeset.data property with the return value.
  573. let type_cell_data = apply_cell_data_changeset(
  574. cell_changeset,
  575. cell_rev,
  576. field_rev,
  577. Some(self.cell_data_cache.clone()),
  578. )?;
  579. let cell_changeset = CellChangesetPB {
  580. view_id: self.database_id.clone(),
  581. row_id: row_id.to_owned(),
  582. field_id: field_id.to_owned(),
  583. type_cell_data,
  584. };
  585. self.database_blocks.update_cell(cell_changeset).await?;
  586. self
  587. .database_views
  588. .did_update_row(old_row_rev, row_id)
  589. .await;
  590. Ok(())
  591. },
  592. }
  593. }
  594. #[tracing::instrument(level = "trace", skip_all, err)]
  595. pub async fn update_cell<T: ToCellChangesetString>(
  596. &self,
  597. row_id: String,
  598. field_id: String,
  599. cell_changeset: T,
  600. ) -> FlowyResult<()> {
  601. self
  602. .update_cell_with_changeset(&row_id, &field_id, cell_changeset)
  603. .await
  604. }
  605. pub async fn get_block_meta_revs(&self) -> FlowyResult<Vec<Arc<DatabaseBlockMetaRevision>>> {
  606. let block_meta_revs = self.database_pad.read().await.get_block_meta_revs();
  607. Ok(block_meta_revs)
  608. }
  609. pub async fn get_blocks(
  610. &self,
  611. block_ids: Option<Vec<String>>,
  612. ) -> FlowyResult<Vec<DatabaseBlockRowRevision>> {
  613. let block_ids = match block_ids {
  614. None => self
  615. .database_pad
  616. .read()
  617. .await
  618. .get_block_meta_revs()
  619. .iter()
  620. .map(|block_rev| block_rev.block_id.clone())
  621. .collect::<Vec<String>>(),
  622. Some(block_ids) => block_ids,
  623. };
  624. let blocks = self.database_blocks.get_blocks(Some(block_ids)).await?;
  625. Ok(blocks)
  626. }
  627. pub async fn delete_rows(&self, block_rows: Vec<DatabaseBlockRow>) -> FlowyResult<()> {
  628. let changesets = self.database_blocks.delete_rows(block_rows).await?;
  629. for changeset in changesets {
  630. self.update_block(changeset).await?;
  631. }
  632. Ok(())
  633. }
  634. pub async fn get_database(&self, view_id: &str) -> FlowyResult<DatabasePB> {
  635. let pad = self.database_pad.read().await;
  636. let fields = pad
  637. .get_field_revs(None)?
  638. .iter()
  639. .map(FieldIdPB::from)
  640. .collect();
  641. let mut all_rows = vec![];
  642. for block_rev in pad.get_block_meta_revs() {
  643. if let Ok(rows) = self.get_row_pbs(view_id, &block_rev.block_id).await {
  644. all_rows.extend(rows);
  645. }
  646. }
  647. Ok(DatabasePB {
  648. id: self.database_id.clone(),
  649. fields,
  650. rows: all_rows,
  651. })
  652. }
  653. pub async fn get_setting(&self, view_id: &str) -> FlowyResult<DatabaseViewSettingPB> {
  654. self.database_views.get_setting(view_id).await
  655. }
  656. pub async fn get_all_filters(&self, view_id: &str) -> FlowyResult<Vec<FilterPB>> {
  657. Ok(
  658. self
  659. .database_views
  660. .get_all_filters(view_id)
  661. .await?
  662. .into_iter()
  663. .map(|filter| FilterPB::from(filter.as_ref()))
  664. .collect(),
  665. )
  666. }
  667. pub async fn get_filters(
  668. &self,
  669. view_id: &str,
  670. filter_id: FilterType,
  671. ) -> FlowyResult<Vec<Arc<FilterRevision>>> {
  672. self.database_views.get_filters(view_id, &filter_id).await
  673. }
  674. pub async fn create_or_update_filter(&self, params: AlterFilterParams) -> FlowyResult<()> {
  675. self.database_views.create_or_update_filter(params).await?;
  676. Ok(())
  677. }
  678. pub async fn delete_filter(&self, params: DeleteFilterParams) -> FlowyResult<()> {
  679. self.database_views.delete_filter(params).await?;
  680. Ok(())
  681. }
  682. pub async fn get_all_sorts(&self, view_id: &str) -> FlowyResult<Vec<SortPB>> {
  683. Ok(
  684. self
  685. .database_views
  686. .get_all_sorts(view_id)
  687. .await?
  688. .into_iter()
  689. .map(|sort| SortPB::from(sort.as_ref()))
  690. .collect(),
  691. )
  692. }
  693. pub async fn delete_all_sorts(&self, view_id: &str) -> FlowyResult<()> {
  694. self.database_views.delete_all_sorts(view_id).await
  695. }
  696. pub async fn delete_sort(&self, params: DeleteSortParams) -> FlowyResult<()> {
  697. self.database_views.delete_sort(params).await?;
  698. Ok(())
  699. }
  700. pub async fn create_or_update_sort(&self, params: AlterSortParams) -> FlowyResult<SortRevision> {
  701. let sort_rev = self.database_views.create_or_update_sort(params).await?;
  702. Ok(sort_rev)
  703. }
  704. pub async fn insert_group(&self, params: InsertGroupParams) -> FlowyResult<()> {
  705. self.database_views.insert_or_update_group(params).await
  706. }
  707. pub async fn delete_group(&self, params: DeleteGroupParams) -> FlowyResult<()> {
  708. self.database_views.delete_group(params).await
  709. }
  710. pub async fn move_row(&self, params: MoveRowParams) -> FlowyResult<()> {
  711. let MoveRowParams {
  712. view_id: _,
  713. from_row_id,
  714. to_row_id,
  715. } = params;
  716. match self.database_blocks.get_row_rev(&from_row_id).await? {
  717. None => tracing::warn!("Move row failed, can not find the row:{}", from_row_id),
  718. Some((_, row_rev)) => {
  719. match (
  720. self.database_blocks.index_of_row(&from_row_id).await,
  721. self.database_blocks.index_of_row(&to_row_id).await,
  722. ) {
  723. (Some(from_index), Some(to_index)) => {
  724. tracing::trace!("Move row from {} to {}", from_index, to_index);
  725. self
  726. .database_blocks
  727. .move_row(row_rev.clone(), from_index, to_index)
  728. .await?;
  729. },
  730. (_, None) => tracing::warn!("Can not find the from row id: {}", from_row_id),
  731. (None, _) => tracing::warn!("Can not find the to row id: {}", to_row_id),
  732. }
  733. },
  734. }
  735. Ok(())
  736. }
  737. pub async fn move_group_row(&self, params: MoveGroupRowParams) -> FlowyResult<()> {
  738. let MoveGroupRowParams {
  739. view_id,
  740. from_row_id,
  741. to_group_id,
  742. to_row_id,
  743. } = params;
  744. match self.database_blocks.get_row_rev(&from_row_id).await? {
  745. None => tracing::warn!("Move row failed, can not find the row:{}", from_row_id),
  746. Some((_, row_rev)) => {
  747. let block_manager = self.database_blocks.clone();
  748. self
  749. .database_views
  750. .move_group_row(
  751. &view_id.clone(),
  752. row_rev,
  753. to_group_id,
  754. to_row_id.clone(),
  755. |row_changeset| {
  756. to_fut(async move {
  757. tracing::trace!("Row data changed: {:?}", row_changeset);
  758. let cell_changesets = row_changeset
  759. .cell_by_field_id
  760. .into_iter()
  761. .map(|(field_id, cell_rev)| CellChangesetPB {
  762. view_id: view_id.clone(),
  763. row_id: row_changeset.row_id.clone(),
  764. field_id,
  765. type_cell_data: cell_rev.type_cell_data,
  766. })
  767. .collect::<Vec<CellChangesetPB>>();
  768. for cell_changeset in cell_changesets {
  769. match block_manager.update_cell(cell_changeset).await {
  770. Ok(_) => {},
  771. Err(e) => tracing::error!("Apply cell changeset error:{:?}", e),
  772. }
  773. }
  774. })
  775. },
  776. )
  777. .await?;
  778. },
  779. }
  780. Ok(())
  781. }
  782. pub async fn move_field(&self, params: MoveFieldParams) -> FlowyResult<()> {
  783. let MoveFieldParams {
  784. view_id: _,
  785. field_id,
  786. from_index,
  787. to_index,
  788. } = params;
  789. self
  790. .modify(|pad| Ok(pad.move_field(&field_id, from_index as usize, to_index as usize)?))
  791. .await?;
  792. if let Some((index, field_rev)) = self.database_pad.read().await.get_field_rev(&field_id) {
  793. let delete_field_order = FieldIdPB::from(field_id);
  794. let insert_field = IndexFieldPB::from_field_rev(field_rev, index);
  795. let notified_changeset = DatabaseFieldChangesetPB {
  796. view_id: self.database_id.clone(),
  797. inserted_fields: vec![insert_field],
  798. deleted_fields: vec![delete_field_order],
  799. updated_fields: vec![],
  800. };
  801. self.notify_did_update_database(notified_changeset).await?;
  802. }
  803. Ok(())
  804. }
  805. pub async fn duplicate_database(&self, view_id: &str) -> FlowyResult<BuildDatabaseContext> {
  806. let database_pad = self.database_pad.read().await;
  807. let database_view_data = self.database_views.duplicate_database_view(view_id).await?;
  808. let original_blocks = database_pad.get_block_meta_revs();
  809. let (duplicated_fields, duplicated_blocks) = database_pad.duplicate_database_block_meta().await;
  810. let mut blocks_meta_data = vec![];
  811. if original_blocks.len() == duplicated_blocks.len() {
  812. for (index, original_block_meta) in original_blocks.iter().enumerate() {
  813. let database_block_meta_editor = self
  814. .database_blocks
  815. .get_or_create_block_editor(&original_block_meta.block_id)
  816. .await?;
  817. let duplicated_block_id = &duplicated_blocks[index].block_id;
  818. tracing::trace!("Duplicate block:{} meta data", duplicated_block_id);
  819. let duplicated_block_meta_data = database_block_meta_editor
  820. .duplicate_block(duplicated_block_id)
  821. .await;
  822. blocks_meta_data.push(duplicated_block_meta_data);
  823. }
  824. } else {
  825. debug_assert_eq!(original_blocks.len(), duplicated_blocks.len());
  826. }
  827. drop(database_pad);
  828. Ok(BuildDatabaseContext {
  829. field_revs: duplicated_fields.into_iter().map(Arc::new).collect(),
  830. block_metas: duplicated_blocks,
  831. blocks: blocks_meta_data,
  832. layout_setting: Default::default(),
  833. database_view_data,
  834. })
  835. }
  836. #[tracing::instrument(level = "trace", skip_all, err)]
  837. pub async fn load_groups(&self, view_id: &str) -> FlowyResult<RepeatedGroupPB> {
  838. self.database_views.load_groups(view_id).await
  839. }
  840. #[tracing::instrument(level = "trace", skip_all, err)]
  841. pub async fn get_group(&self, view_id: &str, group_id: &str) -> FlowyResult<GroupPB> {
  842. self.database_views.get_group(view_id, group_id).await
  843. }
  844. pub async fn get_layout_setting<T: Into<LayoutRevision>>(
  845. &self,
  846. view_id: &str,
  847. layout_ty: T,
  848. ) -> FlowyResult<LayoutSettingParams> {
  849. let layout_ty = layout_ty.into();
  850. self
  851. .database_views
  852. .get_layout_setting(view_id, &layout_ty)
  853. .await
  854. }
  855. pub async fn set_layout_setting(
  856. &self,
  857. view_id: &str,
  858. layout_setting: LayoutSettingParams,
  859. ) -> FlowyResult<()> {
  860. self
  861. .database_views
  862. .set_layout_setting(view_id, layout_setting)
  863. .await
  864. }
  865. pub async fn get_all_calendar_events(&self, view_id: &str) -> Vec<CalendarEventPB> {
  866. match self.database_views.get_view_editor(view_id).await {
  867. Ok(view_editor) => view_editor
  868. .v_get_all_calendar_events()
  869. .await
  870. .unwrap_or_default(),
  871. Err(err) => {
  872. tracing::error!("Get calendar event failed: {}", err);
  873. vec![]
  874. },
  875. }
  876. }
  877. #[tracing::instrument(level = "trace", skip(self))]
  878. pub async fn get_calendar_event(&self, view_id: &str, row_id: &str) -> Option<CalendarEventPB> {
  879. let view_editor = self.database_views.get_view_editor(view_id).await.ok()?;
  880. view_editor.v_get_calendar_event(row_id).await
  881. }
  882. async fn create_row_rev(
  883. &self,
  884. cell_data_by_field_id: Option<HashMap<String, String>>,
  885. ) -> FlowyResult<RowRevision> {
  886. let field_revs = self.database_pad.read().await.get_field_revs(None)?;
  887. let block_id = self.block_id().await?;
  888. // insert empty row below the row whose id is upper_row_id
  889. let builder = match cell_data_by_field_id {
  890. None => RowRevisionBuilder::new(&block_id, field_revs),
  891. Some(cell_data_by_field_id) => {
  892. RowRevisionBuilder::new_with_data(&block_id, field_revs, cell_data_by_field_id)
  893. },
  894. };
  895. let row_rev = builder.build();
  896. Ok(row_rev)
  897. }
  898. async fn create_row_pb(
  899. &self,
  900. row_rev: RowRevision,
  901. start_row_id: Option<String>,
  902. ) -> FlowyResult<RowPB> {
  903. let row_pb = RowPB::from(&row_rev);
  904. let block_id = row_rev.block_id.clone();
  905. // insert the row
  906. let row_count = self
  907. .database_blocks
  908. .create_row(row_rev, start_row_id)
  909. .await?;
  910. // update block row count
  911. let changeset = DatabaseBlockMetaRevisionChangeset::from_row_count(block_id, row_count);
  912. self.update_block(changeset).await?;
  913. Ok(row_pb)
  914. }
  915. async fn modify<F>(&self, f: F) -> FlowyResult<()>
  916. where
  917. F:
  918. for<'a> FnOnce(&'a mut DatabaseRevisionPad) -> FlowyResult<Option<DatabaseRevisionChangeset>>,
  919. {
  920. let mut write_guard = self.database_pad.write().await;
  921. if let Some(changeset) = f(&mut write_guard)? {
  922. self.apply_change(changeset).await?;
  923. }
  924. Ok(())
  925. }
  926. async fn apply_change(&self, change: DatabaseRevisionChangeset) -> FlowyResult<()> {
  927. let DatabaseRevisionChangeset {
  928. operations: delta,
  929. md5,
  930. } = change;
  931. let data = delta.json_bytes();
  932. let _ = self.rev_manager.add_local_revision(data, md5).await?;
  933. Ok(())
  934. }
  935. async fn block_id(&self) -> FlowyResult<String> {
  936. match self.database_pad.read().await.get_block_meta_revs().last() {
  937. None => Err(FlowyError::internal().context("There is no block in this database")),
  938. Some(database_block) => Ok(database_block.block_id.clone()),
  939. }
  940. }
  941. #[tracing::instrument(level = "trace", skip_all, err)]
  942. async fn notify_did_insert_database_field(&self, field_id: &str) -> FlowyResult<()> {
  943. if let Some((index, field_rev)) = self.database_pad.read().await.get_field_rev(field_id) {
  944. let index_field = IndexFieldPB::from_field_rev(field_rev, index);
  945. if let Ok(views) = self.database_ref_query.get_ref_views(&self.database_id) {
  946. for view in views {
  947. let notified_changeset =
  948. DatabaseFieldChangesetPB::insert(&view.view_id, vec![index_field.clone()]);
  949. self.notify_did_update_database(notified_changeset).await?;
  950. }
  951. }
  952. }
  953. Ok(())
  954. }
  955. #[tracing::instrument(level = "trace", skip_all, err)]
  956. async fn notify_did_update_database_field(&self, field_id: &str) -> FlowyResult<()> {
  957. if let Some((_, field_rev)) = self
  958. .database_pad
  959. .read()
  960. .await
  961. .get_field_rev(field_id)
  962. .map(|(index, field)| (index, field.clone()))
  963. {
  964. let updated_field = FieldPB::from(field_rev);
  965. let notified_changeset =
  966. DatabaseFieldChangesetPB::update(&self.database_id, vec![updated_field.clone()]);
  967. self.notify_did_update_database(notified_changeset).await?;
  968. send_notification(field_id, DatabaseNotification::DidUpdateField)
  969. .payload(updated_field)
  970. .send();
  971. }
  972. Ok(())
  973. }
  974. async fn notify_did_update_database(
  975. &self,
  976. changeset: DatabaseFieldChangesetPB,
  977. ) -> FlowyResult<()> {
  978. if let Ok(views) = self.database_ref_query.get_ref_views(&self.database_id) {
  979. for view in views {
  980. send_notification(&view.view_id, DatabaseNotification::DidUpdateFields)
  981. .payload(changeset.clone())
  982. .send();
  983. }
  984. }
  985. Ok(())
  986. }
  987. }
  988. #[cfg(feature = "flowy_unit_test")]
  989. impl DatabaseEditor {
  990. pub fn rev_manager(&self) -> Arc<RevisionManager<Arc<ConnectionPool>>> {
  991. self.rev_manager.clone()
  992. }
  993. pub fn database_pad(&self) -> Arc<RwLock<DatabaseRevisionPad>> {
  994. self.database_pad.clone()
  995. }
  996. }
  997. pub struct DatabaseRevisionSerde();
  998. impl RevisionObjectDeserializer for DatabaseRevisionSerde {
  999. type Output = DatabaseRevisionPad;
  1000. fn deserialize_revisions(
  1001. _object_id: &str,
  1002. revisions: Vec<Revision>,
  1003. ) -> FlowyResult<Self::Output> {
  1004. let pad = DatabaseRevisionPad::from_revisions(revisions)?;
  1005. Ok(pad)
  1006. }
  1007. fn recover_from_revisions(_revisions: Vec<Revision>) -> Option<(Self::Output, i64)> {
  1008. None
  1009. }
  1010. }
  1011. impl RevisionObjectSerializer for DatabaseRevisionSerde {
  1012. fn combine_revisions(revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  1013. let operations = make_operations_from_revisions::<EmptyAttributes>(revisions)?;
  1014. Ok(operations.json_bytes())
  1015. }
  1016. }
  1017. pub struct DatabaseRevisionCloudService {
  1018. #[allow(dead_code)]
  1019. token: String,
  1020. }
  1021. impl DatabaseRevisionCloudService {
  1022. pub fn new(token: String) -> Self {
  1023. Self { token }
  1024. }
  1025. }
  1026. impl RevisionCloudService for DatabaseRevisionCloudService {
  1027. #[tracing::instrument(level = "trace", skip(self))]
  1028. fn fetch_object(
  1029. &self,
  1030. _user_id: &str,
  1031. _object_id: &str,
  1032. ) -> FutureResult<Vec<Revision>, FlowyError> {
  1033. FutureResult::new(async move { Ok(vec![]) })
  1034. }
  1035. }
  1036. pub struct DatabaseRevisionMergeable();
  1037. impl RevisionMergeable for DatabaseRevisionMergeable {
  1038. fn combine_revisions(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes> {
  1039. DatabaseRevisionSerde::combine_revisions(revisions)
  1040. }
  1041. }
  1042. struct TypeOptionJsonDeserializer(FieldType);
  1043. impl JsonDeserializer for TypeOptionJsonDeserializer {
  1044. fn deserialize(&self, type_option_data: Vec<u8>) -> SyncResult<String> {
  1045. // The type_option_data sent from Dart is serialized by protobuf.
  1046. let builder = type_option_builder_from_bytes(type_option_data, &self.0);
  1047. let json = builder.serializer().json_str();
  1048. tracing::trace!("Deserialize type-option data to: {}", json);
  1049. Ok(json)
  1050. }
  1051. }