editor.rs 37 KB


  1. use crate::entities::*;
  2. use crate::notification::{send_notification, DatabaseNotification};
  3. use crate::services::cell::{AtomicCellDataCache, TypeCellData};
  4. use crate::services::database::DatabaseBlockEvent;
  5. use crate::services::database_view::notifier::DatabaseViewChangedNotifier;
  6. use crate::services::database_view::trait_impl::*;
  7. use crate::services::database_view::DatabaseViewChangedReceiverRunner;
  8. use crate::services::field::{RowSingleCellData, TypeOptionCellDataHandler};
  9. use crate::services::filter::{
  10. FilterChangeset, FilterController, FilterTaskHandler, FilterType, UpdatedFilterType,
  11. };
  12. use crate::services::group::{
  13. default_group_configuration, find_grouping_field, make_group_controller, Group,
  14. GroupConfigurationReader, GroupController, MoveGroupRowContext,
  15. };
  16. use crate::services::row::DatabaseBlockRowRevision;
  17. use crate::services::sort::{
  18. DeletedSortType, SortChangeset, SortController, SortTaskHandler, SortType,
  19. };
  20. use database_model::{
  21. gen_database_filter_id, gen_database_id, gen_database_sort_id, CalendarLayoutSetting,
  22. FieldRevision, FieldTypeRevision, FilterRevision, LayoutRevision, RowChangeset, RowRevision,
  23. SortRevision,
  24. };
  25. use flowy_client_sync::client_database::{
  26. make_database_view_operations, DatabaseViewRevisionChangeset, DatabaseViewRevisionPad,
  27. };
  28. use flowy_error::{FlowyError, FlowyResult};
  29. use flowy_revision::RevisionManager;
  30. use flowy_sqlite::ConnectionPool;
  31. use flowy_task::TaskDispatcher;
  32. use lib_infra::future::Fut;
  33. use nanoid::nanoid;
  34. use revision_model::Revision;
  35. use std::borrow::Cow;
  36. use std::collections::HashMap;
  37. use std::future::Future;
  38. use std::sync::Arc;
  39. use tokio::sync::{broadcast, RwLock};
  40. pub trait DatabaseViewData: Send + Sync + 'static {
  41. /// If the field_ids is None, then it will return all the field revisions
  42. fn get_field_revs(&self, field_ids: Option<Vec<String>>) -> Fut<Vec<Arc<FieldRevision>>>;
  43. /// Returns the field with the field_id
  44. fn get_field_rev(&self, field_id: &str) -> Fut<Option<Arc<FieldRevision>>>;
  45. fn get_primary_field_rev(&self) -> Fut<Option<Arc<FieldRevision>>>;
  46. /// Returns the index of the row with row_id
  47. fn index_of_row(&self, row_id: &str) -> Fut<Option<usize>>;
  48. /// Returns the `index` and `RowRevision` with row_id
  49. fn get_row_rev(&self, row_id: &str) -> Fut<Option<(usize, Arc<RowRevision>)>>;
  50. /// Returns all the rows that the block has. If the passed-in block_ids is None, then will return all the rows
  51. /// The relationship between the grid and the block is:
  52. /// A grid has a list of blocks
  53. /// A block has a list of rows
  54. /// A row has a list of cells
  55. ///
  56. fn get_row_revs(&self, block_ids: Option<Vec<String>>) -> Fut<Vec<Arc<RowRevision>>>;
  57. /// Get all the blocks that the current Grid has.
  58. /// One grid has a list of blocks
  59. fn get_blocks(&self) -> Fut<Vec<DatabaseBlockRowRevision>>;
  60. /// Returns a `TaskDispatcher` used to poll a `Task`
  61. fn get_task_scheduler(&self) -> Arc<RwLock<TaskDispatcher>>;
  62. fn get_type_option_cell_handler(
  63. &self,
  64. field_rev: &FieldRevision,
  65. field_type: &FieldType,
  66. ) -> Option<Box<dyn TypeOptionCellDataHandler>>;
  67. }
  68. pub struct DatabaseViewEditor {
  69. user_id: String,
  70. pub view_id: String,
  71. pad: Arc<RwLock<DatabaseViewRevisionPad>>,
  72. rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
  73. delegate: Arc<dyn DatabaseViewData>,
  74. group_controller: Arc<RwLock<Box<dyn GroupController>>>,
  75. filter_controller: Arc<FilterController>,
  76. sort_controller: Arc<RwLock<SortController>>,
  77. pub notifier: DatabaseViewChangedNotifier,
  78. }
  79. impl Drop for DatabaseViewEditor {
  80. fn drop(&mut self) {
  81. tracing::trace!("Drop {}", std::any::type_name::<Self>());
  82. }
  83. }
  84. impl DatabaseViewEditor {
  85. pub async fn from_pad(
  86. user_id: &str,
  87. delegate: Arc<dyn DatabaseViewData>,
  88. cell_data_cache: AtomicCellDataCache,
  89. rev_manager: RevisionManager<Arc<ConnectionPool>>,
  90. view_rev_pad: DatabaseViewRevisionPad,
  91. ) -> FlowyResult<Self> {
  92. let view_id = view_rev_pad.view_id.clone();
  93. let (notifier, _) = broadcast::channel(100);
  94. tokio::spawn(DatabaseViewChangedReceiverRunner(Some(notifier.subscribe())).run());
  95. let view_rev_pad = Arc::new(RwLock::new(view_rev_pad));
  96. let rev_manager = Arc::new(rev_manager);
  97. let group_controller = new_group_controller(
  98. user_id.to_owned(),
  99. view_id.clone(),
  100. view_rev_pad.clone(),
  101. rev_manager.clone(),
  102. delegate.clone(),
  103. )
  104. .await?;
  105. let user_id = user_id.to_owned();
  106. let group_controller = Arc::new(RwLock::new(group_controller));
  107. let filter_controller = make_filter_controller(
  108. &view_id,
  109. delegate.clone(),
  110. notifier.clone(),
  111. cell_data_cache.clone(),
  112. view_rev_pad.clone(),
  113. )
  114. .await;
  115. let sort_controller = make_sort_controller(
  116. &view_id,
  117. delegate.clone(),
  118. notifier.clone(),
  119. filter_controller.clone(),
  120. view_rev_pad.clone(),
  121. cell_data_cache,
  122. )
  123. .await;
  124. Ok(Self {
  125. pad: view_rev_pad,
  126. user_id,
  127. view_id,
  128. rev_manager,
  129. delegate,
  130. group_controller,
  131. filter_controller,
  132. sort_controller,
  133. notifier,
  134. })
  135. }
  136. #[tracing::instrument(level = "trace", skip_all, err)]
  137. pub async fn new(
  138. user_id: &str,
  139. token: &str,
  140. view_id: String,
  141. delegate: Arc<dyn DatabaseViewData>,
  142. cell_data_cache: AtomicCellDataCache,
  143. mut rev_manager: RevisionManager<Arc<ConnectionPool>>,
  144. ) -> FlowyResult<Self> {
  145. let cloud = Arc::new(DatabaseViewRevisionCloudService {
  146. token: token.to_owned(),
  147. });
  148. let view_rev_pad = match rev_manager
  149. .initialize::<DatabaseViewRevisionSerde>(Some(cloud))
  150. .await
  151. {
  152. Ok(pad) => pad,
  153. Err(err) => {
  154. // It shouldn't be here, because the snapshot should come to recue.
  155. tracing::error!("Deserialize database view revisions failed: {}", err);
  156. let (view, reset_revision) = generate_restore_view(&view_id).await;
  157. let _ = rev_manager.reset_object(vec![reset_revision]).await;
  158. view
  159. },
  160. };
  161. Self::from_pad(
  162. user_id,
  163. delegate,
  164. cell_data_cache,
  165. rev_manager,
  166. view_rev_pad,
  167. )
  168. .await
  169. }
  170. #[tracing::instrument(name = "close database view editor", level = "trace", skip_all)]
  171. pub async fn close(&self) {
  172. self.rev_manager.generate_snapshot().await;
  173. self.rev_manager.close().await;
  174. self.sort_controller.write().await.close().await;
  175. self.filter_controller.close().await;
  176. }
  177. pub async fn handle_block_event(&self, event: Cow<'_, DatabaseBlockEvent>) {
  178. let changeset = match event.into_owned() {
  179. DatabaseBlockEvent::InsertRow { block_id: _, row } => {
  180. //
  181. RowsChangesetPB::from_insert(self.view_id.clone(), vec![row])
  182. },
  183. DatabaseBlockEvent::UpdateRow { block_id: _, row } => {
  184. //
  185. RowsChangesetPB::from_update(self.view_id.clone(), vec![row])
  186. },
  187. DatabaseBlockEvent::DeleteRow {
  188. block_id: _,
  189. row_id,
  190. } => {
  191. //
  192. RowsChangesetPB::from_delete(self.view_id.clone(), vec![row_id])
  193. },
  194. DatabaseBlockEvent::Move {
  195. block_id: _,
  196. deleted_row_id,
  197. inserted_row,
  198. } => {
  199. //
  200. RowsChangesetPB::from_move(
  201. self.view_id.clone(),
  202. vec![deleted_row_id],
  203. vec![inserted_row],
  204. )
  205. },
  206. };
  207. send_notification(&self.view_id, DatabaseNotification::DidUpdateViewRows)
  208. .payload(changeset)
  209. .send();
  210. }
  211. pub async fn v_sort_rows(&self, rows: &mut Vec<Arc<RowRevision>>) {
  212. self.sort_controller.write().await.sort_rows(rows).await
  213. }
  214. pub async fn v_filter_rows(&self, _block_id: &str, rows: &mut Vec<Arc<RowRevision>>) {
  215. self.filter_controller.filter_row_revs(rows).await;
  216. }
  217. pub async fn v_duplicate_database_view(&self) -> FlowyResult<String> {
  218. let json_str = self.pad.read().await.json_str()?;
  219. Ok(json_str)
  220. }
  221. pub async fn v_will_create_row(&self, row_rev: &mut RowRevision, params: &CreateRowParams) {
  222. if params.group_id.is_none() {
  223. return;
  224. }
  225. let group_id = params.group_id.as_ref().unwrap();
  226. let _ = self
  227. .mut_group_controller(|group_controller, field_rev| {
  228. group_controller.will_create_row(row_rev, &field_rev, group_id);
  229. Ok(())
  230. })
  231. .await;
  232. }
  233. pub async fn v_did_create_row(&self, row_pb: &RowPB, params: &CreateRowParams) {
  234. // Send the group notification if the current view has groups
  235. match params.group_id.as_ref() {
  236. None => {},
  237. Some(group_id) => {
  238. let index = match params.start_row_id {
  239. None => Some(0),
  240. Some(_) => None,
  241. };
  242. self
  243. .group_controller
  244. .write()
  245. .await
  246. .did_create_row(row_pb, group_id);
  247. let inserted_row = InsertedRowPB {
  248. row: row_pb.clone(),
  249. index,
  250. is_new: true,
  251. };
  252. let changeset = GroupRowsNotificationPB::insert(group_id.clone(), vec![inserted_row]);
  253. self.notify_did_update_group_rows(changeset).await;
  254. },
  255. }
  256. }
  257. #[tracing::instrument(level = "trace", skip_all)]
  258. pub async fn v_did_delete_row(&self, row_rev: &RowRevision) {
  259. // Send the group notification if the current view has groups;
  260. let result = self
  261. .mut_group_controller(|group_controller, field_rev| {
  262. group_controller.did_delete_delete_row(row_rev, &field_rev)
  263. })
  264. .await;
  265. if let Some(result) = result {
  266. tracing::trace!("Delete row in view changeset: {:?}", result.row_changesets);
  267. for changeset in result.row_changesets {
  268. self.notify_did_update_group_rows(changeset).await;
  269. }
  270. }
  271. }
  272. pub async fn v_did_update_row(
  273. &self,
  274. old_row_rev: Option<Arc<RowRevision>>,
  275. row_rev: &RowRevision,
  276. ) {
  277. let result = self
  278. .mut_group_controller(|group_controller, field_rev| {
  279. Ok(group_controller.did_update_group_row(&old_row_rev, row_rev, &field_rev))
  280. })
  281. .await;
  282. if let Some(Ok(result)) = result {
  283. let mut changeset = GroupChangesetPB {
  284. view_id: self.view_id.clone(),
  285. ..Default::default()
  286. };
  287. if let Some(inserted_group) = result.inserted_group {
  288. tracing::trace!("Create group after editing the row: {:?}", inserted_group);
  289. changeset.inserted_groups.push(inserted_group);
  290. }
  291. if let Some(delete_group) = result.deleted_group {
  292. tracing::trace!("Delete group after editing the row: {:?}", delete_group);
  293. changeset.deleted_groups.push(delete_group.group_id);
  294. }
  295. self.notify_did_update_groups(changeset).await;
  296. tracing::trace!(
  297. "Group changesets after editing the row: {:?}",
  298. result.row_changesets
  299. );
  300. for changeset in result.row_changesets {
  301. self.notify_did_update_group_rows(changeset).await;
  302. }
  303. }
  304. let filter_controller = self.filter_controller.clone();
  305. let sort_controller = self.sort_controller.clone();
  306. let row_id = row_rev.id.clone();
  307. tokio::spawn(async move {
  308. filter_controller.did_receive_row_changed(&row_id).await;
  309. sort_controller
  310. .read()
  311. .await
  312. .did_receive_row_changed(&row_id)
  313. .await;
  314. });
  315. }
  316. pub async fn v_move_group_row(
  317. &self,
  318. row_rev: &RowRevision,
  319. row_changeset: &mut RowChangeset,
  320. to_group_id: &str,
  321. to_row_id: Option<String>,
  322. ) {
  323. let result = self
  324. .mut_group_controller(|group_controller, field_rev| {
  325. let move_row_context = MoveGroupRowContext {
  326. row_rev,
  327. row_changeset,
  328. field_rev: field_rev.as_ref(),
  329. to_group_id,
  330. to_row_id,
  331. };
  332. group_controller.move_group_row(move_row_context)
  333. })
  334. .await;
  335. if let Some(result) = result {
  336. let mut changeset = GroupChangesetPB {
  337. view_id: self.view_id.clone(),
  338. ..Default::default()
  339. };
  340. if let Some(delete_group) = result.deleted_group {
  341. tracing::info!("Delete group after moving the row: {:?}", delete_group);
  342. changeset.deleted_groups.push(delete_group.group_id);
  343. }
  344. self.notify_did_update_groups(changeset).await;
  345. for changeset in result.row_changesets {
  346. self.notify_did_update_group_rows(changeset).await;
  347. }
  348. }
  349. }
  350. /// Only call once after database view editor initialized
  351. #[tracing::instrument(level = "trace", skip(self))]
  352. pub async fn v_load_groups(&self) -> FlowyResult<Vec<GroupPB>> {
  353. let groups = self
  354. .group_controller
  355. .read()
  356. .await
  357. .groups()
  358. .into_iter()
  359. .cloned()
  360. .collect::<Vec<Group>>();
  361. tracing::trace!("Number of groups: {}", groups.len());
  362. Ok(groups.into_iter().map(GroupPB::from).collect())
  363. }
  364. #[tracing::instrument(level = "trace", skip(self))]
  365. pub async fn v_get_group(&self, group_id: &str) -> FlowyResult<GroupPB> {
  366. match self.group_controller.read().await.get_group(group_id) {
  367. None => Err(FlowyError::record_not_found().context("Can't find the group")),
  368. Some((_, group)) => Ok(GroupPB::from(group)),
  369. }
  370. }
  371. #[tracing::instrument(level = "trace", skip(self), err)]
  372. pub async fn v_move_group(&self, params: MoveGroupParams) -> FlowyResult<()> {
  373. self
  374. .group_controller
  375. .write()
  376. .await
  377. .move_group(&params.from_group_id, &params.to_group_id)?;
  378. match self
  379. .group_controller
  380. .read()
  381. .await
  382. .get_group(&params.from_group_id)
  383. {
  384. None => tracing::warn!("Can not find the group with id: {}", params.from_group_id),
  385. Some((index, group)) => {
  386. let inserted_group = InsertedGroupPB {
  387. group: GroupPB::from(group),
  388. index: index as i32,
  389. };
  390. let changeset = GroupChangesetPB {
  391. view_id: self.view_id.clone(),
  392. inserted_groups: vec![inserted_group],
  393. deleted_groups: vec![params.from_group_id.clone()],
  394. update_groups: vec![],
  395. initial_groups: vec![],
  396. };
  397. self.notify_did_update_groups(changeset).await;
  398. },
  399. }
  400. Ok(())
  401. }
  402. pub async fn group_id(&self) -> String {
  403. self.group_controller.read().await.field_id().to_string()
  404. }
  405. /// Initialize new group when grouping by a new field
  406. ///
  407. pub async fn v_initialize_new_group(&self, params: InsertGroupParams) -> FlowyResult<()> {
  408. if let Some(field_rev) = self.delegate.get_field_rev(&params.field_id).await {
  409. self
  410. .modify(|pad| {
  411. let configuration = default_group_configuration(&field_rev);
  412. let changeset = pad.insert_or_update_group_configuration(
  413. &params.field_id,
  414. &params.field_type_rev,
  415. configuration,
  416. )?;
  417. Ok(changeset)
  418. })
  419. .await?;
  420. }
  421. if self.group_controller.read().await.field_id() != params.field_id {
  422. self.v_update_group_setting(&params.field_id).await?;
  423. self.notify_did_update_setting().await;
  424. }
  425. Ok(())
  426. }
  427. pub async fn v_delete_group(&self, params: DeleteGroupParams) -> FlowyResult<()> {
  428. self
  429. .modify(|pad| {
  430. let changeset =
  431. pad.delete_group(&params.group_id, &params.field_id, &params.field_type_rev)?;
  432. Ok(changeset)
  433. })
  434. .await
  435. }
  436. pub async fn v_get_setting(&self) -> DatabaseViewSettingPB {
  437. let field_revs = self.delegate.get_field_revs(None).await;
  438. make_database_view_setting(&*self.pad.read().await, &field_revs)
  439. }
  440. pub async fn v_get_all_sorts(&self) -> Vec<Arc<SortRevision>> {
  441. let field_revs = self.delegate.get_field_revs(None).await;
  442. self.pad.read().await.get_all_sorts(&field_revs)
  443. }
  444. #[tracing::instrument(level = "trace", skip(self), err)]
  445. pub async fn v_insert_sort(&self, params: AlterSortParams) -> FlowyResult<SortRevision> {
  446. let sort_type = SortType::from(&params);
  447. let is_exist = params.sort_id.is_some();
  448. let sort_id = match params.sort_id {
  449. None => gen_database_sort_id(),
  450. Some(sort_id) => sort_id,
  451. };
  452. let sort_rev = SortRevision {
  453. id: sort_id,
  454. field_id: params.field_id.clone(),
  455. field_type: params.field_type,
  456. condition: params.condition.into(),
  457. };
  458. let mut sort_controller = self.sort_controller.write().await;
  459. let changeset = if is_exist {
  460. self
  461. .modify(|pad| {
  462. let changeset = pad.update_sort(&params.field_id, sort_rev.clone())?;
  463. Ok(changeset)
  464. })
  465. .await?;
  466. sort_controller
  467. .did_receive_changes(SortChangeset::from_update(sort_type))
  468. .await
  469. } else {
  470. self
  471. .modify(|pad| {
  472. let changeset = pad.insert_sort(&params.field_id, sort_rev.clone())?;
  473. Ok(changeset)
  474. })
  475. .await?;
  476. sort_controller
  477. .did_receive_changes(SortChangeset::from_insert(sort_type))
  478. .await
  479. };
  480. drop(sort_controller);
  481. self.notify_did_update_sort(changeset).await;
  482. Ok(sort_rev)
  483. }
  484. pub async fn v_delete_sort(&self, params: DeleteSortParams) -> FlowyResult<()> {
  485. let notification = self
  486. .sort_controller
  487. .write()
  488. .await
  489. .did_receive_changes(SortChangeset::from_delete(DeletedSortType::from(
  490. params.clone(),
  491. )))
  492. .await;
  493. let sort_type = params.sort_type;
  494. self
  495. .modify(|pad| {
  496. let changeset =
  497. pad.delete_sort(&params.sort_id, &sort_type.field_id, sort_type.field_type)?;
  498. Ok(changeset)
  499. })
  500. .await?;
  501. self.notify_did_update_sort(notification).await;
  502. Ok(())
  503. }
  504. pub async fn v_delete_all_sorts(&self) -> FlowyResult<()> {
  505. let all_sorts = self.v_get_all_sorts().await;
  506. // self.sort_controller.write().await.delete_all_sorts().await;
  507. self
  508. .modify(|pad| {
  509. let changeset = pad.delete_all_sorts()?;
  510. Ok(changeset)
  511. })
  512. .await?;
  513. let mut notification = SortChangesetNotificationPB::new(self.view_id.clone());
  514. notification.delete_sorts = all_sorts
  515. .into_iter()
  516. .map(|sort| SortPB::from(sort.as_ref()))
  517. .collect();
  518. self.notify_did_update_sort(notification).await;
  519. Ok(())
  520. }
  521. pub async fn v_get_all_filters(&self) -> Vec<Arc<FilterRevision>> {
  522. let field_revs = self.delegate.get_field_revs(None).await;
  523. self.pad.read().await.get_all_filters(&field_revs)
  524. }
  525. pub async fn v_get_filters(&self, filter_type: &FilterType) -> Vec<Arc<FilterRevision>> {
  526. let field_type_rev: FieldTypeRevision = filter_type.field_type.clone().into();
  527. self
  528. .pad
  529. .read()
  530. .await
  531. .get_filters(&filter_type.field_id, &field_type_rev)
  532. }
  533. #[tracing::instrument(level = "trace", skip(self), err)]
  534. pub async fn v_insert_filter(&self, params: AlterFilterParams) -> FlowyResult<()> {
  535. let filter_type = FilterType::from(&params);
  536. let is_exist = params.filter_id.is_some();
  537. let filter_id = match params.filter_id {
  538. None => gen_database_filter_id(),
  539. Some(filter_id) => filter_id,
  540. };
  541. let filter_rev = FilterRevision {
  542. id: filter_id.clone(),
  543. field_id: params.field_id.clone(),
  544. field_type: params.field_type,
  545. condition: params.condition,
  546. content: params.content,
  547. };
  548. let filter_controller = self.filter_controller.clone();
  549. let changeset = if is_exist {
  550. let old_filter_type = self
  551. .delegate
  552. .get_field_rev(&params.field_id)
  553. .await
  554. .map(|field| FilterType::from(&field));
  555. self
  556. .modify(|pad| {
  557. let changeset = pad.update_filter(&params.field_id, filter_rev)?;
  558. Ok(changeset)
  559. })
  560. .await?;
  561. filter_controller
  562. .did_receive_changes(FilterChangeset::from_update(UpdatedFilterType::new(
  563. old_filter_type,
  564. filter_type,
  565. )))
  566. .await
  567. } else {
  568. self
  569. .modify(|pad| {
  570. let changeset = pad.insert_filter(&params.field_id, filter_rev)?;
  571. Ok(changeset)
  572. })
  573. .await?;
  574. filter_controller
  575. .did_receive_changes(FilterChangeset::from_insert(filter_type))
  576. .await
  577. };
  578. drop(filter_controller);
  579. if let Some(changeset) = changeset {
  580. self.notify_did_update_filter(changeset).await;
  581. }
  582. Ok(())
  583. }
  584. #[tracing::instrument(level = "trace", skip(self), err)]
  585. pub async fn v_delete_filter(&self, params: DeleteFilterParams) -> FlowyResult<()> {
  586. let filter_type = params.filter_type;
  587. let changeset = self
  588. .filter_controller
  589. .did_receive_changes(FilterChangeset::from_delete(filter_type.clone()))
  590. .await;
  591. self
  592. .modify(|pad| {
  593. let changeset = pad.delete_filter(
  594. &params.filter_id,
  595. &filter_type.field_id,
  596. filter_type.field_type,
  597. )?;
  598. Ok(changeset)
  599. })
  600. .await?;
  601. if changeset.is_some() {
  602. self.notify_did_update_filter(changeset.unwrap()).await;
  603. }
  604. Ok(())
  605. }
  606. /// Returns the current calendar settings
  607. #[tracing::instrument(level = "debug", skip(self), err)]
  608. pub async fn v_get_layout_settings(
  609. &self,
  610. layout_ty: &LayoutRevision,
  611. ) -> FlowyResult<LayoutSettingParams> {
  612. let mut layout_setting = LayoutSettingParams::default();
  613. match layout_ty {
  614. LayoutRevision::Grid => {},
  615. LayoutRevision::Board => {},
  616. LayoutRevision::Calendar => {
  617. if let Some(calendar) = self
  618. .pad
  619. .read()
  620. .await
  621. .get_layout_setting::<CalendarLayoutSetting>(layout_ty)
  622. {
  623. // Check the field exist or not
  624. if let Some(field_rev) = self.delegate.get_field_rev(&calendar.layout_field_id).await {
  625. let field_type: FieldType = field_rev.ty.into();
  626. // Check the type of field is Datetime or not
  627. if field_type == FieldType::DateTime {
  628. layout_setting.calendar = Some(calendar);
  629. }
  630. }
  631. }
  632. },
  633. }
  634. tracing::debug!("{:?}", layout_setting);
  635. Ok(layout_setting)
  636. }
  637. /// Update the calendar settings and send the notification to refresh the UI
  638. pub async fn v_set_layout_settings(&self, params: LayoutSettingParams) -> FlowyResult<()> {
  639. // Maybe it needs no send notification to refresh the UI
  640. if let Some(new_calendar_setting) = params.calendar {
  641. if let Some(field_rev) = self
  642. .delegate
  643. .get_field_rev(&new_calendar_setting.layout_field_id)
  644. .await
  645. {
  646. let field_type: FieldType = field_rev.ty.into();
  647. if field_type != FieldType::DateTime {
  648. return Err(FlowyError::unexpect_calendar_field_type());
  649. }
  650. let layout_ty = LayoutRevision::Calendar;
  651. let old_calender_setting = self.v_get_layout_settings(&layout_ty).await?.calendar;
  652. self
  653. .modify(|pad| Ok(pad.set_layout_setting(&layout_ty, &new_calendar_setting)?))
  654. .await?;
  655. let new_field_id = new_calendar_setting.layout_field_id.clone();
  656. let layout_setting_pb: LayoutSettingPB = LayoutSettingParams {
  657. calendar: Some(new_calendar_setting),
  658. }
  659. .into();
  660. if let Some(old_calendar_setting) = old_calender_setting {
  661. // compare the new layout field id is equal to old layout field id
  662. // if not equal, send the DidSetNewLayoutField notification
  663. // if equal, send the DidUpdateLayoutSettings notification
  664. if old_calendar_setting.layout_field_id != new_field_id {
  665. send_notification(&self.view_id, DatabaseNotification::DidSetNewLayoutField)
  666. .payload(layout_setting_pb)
  667. .send();
  668. } else {
  669. send_notification(&self.view_id, DatabaseNotification::DidUpdateLayoutSettings)
  670. .payload(layout_setting_pb)
  671. .send();
  672. }
  673. } else {
  674. tracing::warn!("Calendar setting should not be empty")
  675. }
  676. }
  677. }
  678. Ok(())
  679. }
  680. #[tracing::instrument(level = "trace", skip_all, err)]
  681. pub async fn v_did_update_field_type_option(
  682. &self,
  683. field_id: &str,
  684. old_field_rev: Option<Arc<FieldRevision>>,
  685. ) -> FlowyResult<()> {
  686. if let Some(field_rev) = self.delegate.get_field_rev(field_id).await {
  687. let old = old_field_rev.map(|old_field_rev| FilterType::from(&old_field_rev));
  688. let new = FilterType::from(&field_rev);
  689. let filter_type = UpdatedFilterType::new(old, new);
  690. let filter_changeset = FilterChangeset::from_update(filter_type);
  691. self
  692. .sort_controller
  693. .read()
  694. .await
  695. .did_update_view_field_type_option(&field_rev)
  696. .await;
  697. let filter_controller = self.filter_controller.clone();
  698. let _ = tokio::spawn(async move {
  699. if let Some(notification) = filter_controller
  700. .did_receive_changes(filter_changeset)
  701. .await
  702. {
  703. send_notification(&notification.view_id, DatabaseNotification::DidUpdateFilter)
  704. .payload(notification)
  705. .send();
  706. }
  707. });
  708. }
  709. Ok(())
  710. }
  711. ///
  712. ///
  713. /// # Arguments
  714. ///
  715. /// * `field_id`:
  716. ///
  717. #[tracing::instrument(level = "debug", skip_all, err)]
  718. pub async fn v_update_group_setting(&self, field_id: &str) -> FlowyResult<()> {
  719. if let Some(field_rev) = self.delegate.get_field_rev(field_id).await {
  720. let row_revs = self.delegate.get_row_revs(None).await;
  721. let configuration_reader = GroupConfigurationReaderImpl {
  722. pad: self.pad.clone(),
  723. view_editor_delegate: self.delegate.clone(),
  724. };
  725. let new_group_controller = new_group_controller_with_field_rev(
  726. self.user_id.clone(),
  727. self.view_id.clone(),
  728. self.pad.clone(),
  729. self.rev_manager.clone(),
  730. field_rev,
  731. row_revs,
  732. configuration_reader,
  733. )
  734. .await?;
  735. let new_groups = new_group_controller
  736. .groups()
  737. .into_iter()
  738. .map(|group| GroupPB::from(group.clone()))
  739. .collect();
  740. *self.group_controller.write().await = new_group_controller;
  741. let changeset = GroupChangesetPB {
  742. view_id: self.view_id.clone(),
  743. initial_groups: new_groups,
  744. ..Default::default()
  745. };
  746. debug_assert!(!changeset.is_empty());
  747. if !changeset.is_empty() {
  748. send_notification(&changeset.view_id, DatabaseNotification::DidGroupByField)
  749. .payload(changeset)
  750. .send();
  751. }
  752. }
  753. Ok(())
  754. }
  755. pub(crate) async fn v_get_cells_for_field(
  756. &self,
  757. field_id: &str,
  758. ) -> FlowyResult<Vec<RowSingleCellData>> {
  759. get_cells_for_field(self.delegate.clone(), field_id).await
  760. }
  761. pub async fn v_get_calendar_event(&self, row_id: &str) -> Option<CalendarEventPB> {
  762. let layout_ty = LayoutRevision::Calendar;
  763. let calendar_setting = self
  764. .v_get_layout_settings(&layout_ty)
  765. .await
  766. .ok()?
  767. .calendar?;
  768. // Text
  769. let primary_field = self.delegate.get_primary_field_rev().await?;
  770. let text_cell = get_cell_for_row(self.delegate.clone(), &primary_field.id, row_id).await?;
  771. // Date
  772. let date_field = self
  773. .delegate
  774. .get_field_rev(&calendar_setting.layout_field_id)
  775. .await?;
  776. let date_cell = get_cell_for_row(self.delegate.clone(), &date_field.id, row_id).await?;
  777. let title = text_cell
  778. .into_text_field_cell_data()
  779. .unwrap_or_default()
  780. .into();
  781. let timestamp = date_cell
  782. .into_date_field_cell_data()
  783. .unwrap_or_default()
  784. .timestamp
  785. .unwrap_or_default();
  786. Some(CalendarEventPB {
  787. row_id: row_id.to_string(),
  788. date_field_id: date_field.id.clone(),
  789. title,
  790. timestamp,
  791. })
  792. }
  793. pub async fn v_get_all_calendar_events(&self) -> Option<Vec<CalendarEventPB>> {
  794. let layout_ty = LayoutRevision::Calendar;
  795. let calendar_setting = self
  796. .v_get_layout_settings(&layout_ty)
  797. .await
  798. .ok()?
  799. .calendar?;
  800. // Text
  801. let primary_field = self.delegate.get_primary_field_rev().await?;
  802. let text_cells = self.v_get_cells_for_field(&primary_field.id).await.ok()?;
  803. // Date
  804. let timestamp_by_row_id = self
  805. .v_get_cells_for_field(&calendar_setting.layout_field_id)
  806. .await
  807. .ok()?
  808. .into_iter()
  809. .map(|date_cell| {
  810. let row_id = date_cell.row_id.clone();
  811. // timestamp
  812. let timestamp = date_cell
  813. .into_date_field_cell_data()
  814. .map(|date_cell_data| date_cell_data.timestamp.unwrap_or_default())
  815. .unwrap_or_default();
  816. (row_id, timestamp)
  817. })
  818. .collect::<HashMap<String, i64>>();
  819. let mut events: Vec<CalendarEventPB> = vec![];
  820. for text_cell in text_cells {
  821. let row_id = text_cell.row_id.clone();
  822. let timestamp = timestamp_by_row_id
  823. .get(&row_id)
  824. .cloned()
  825. .unwrap_or_default();
  826. let title = text_cell
  827. .into_text_field_cell_data()
  828. .unwrap_or_default()
  829. .into();
  830. let event = CalendarEventPB {
  831. row_id,
  832. date_field_id: calendar_setting.layout_field_id.clone(),
  833. title,
  834. timestamp,
  835. };
  836. events.push(event);
  837. }
  838. Some(events)
  839. }
  840. async fn notify_did_update_setting(&self) {
  841. let setting = self.v_get_setting().await;
  842. send_notification(&self.view_id, DatabaseNotification::DidUpdateSettings)
  843. .payload(setting)
  844. .send();
  845. }
  846. pub async fn notify_did_update_group_rows(&self, payload: GroupRowsNotificationPB) {
  847. send_notification(&payload.group_id, DatabaseNotification::DidUpdateGroupRow)
  848. .payload(payload)
  849. .send();
  850. }
  851. pub async fn notify_did_update_filter(&self, notification: FilterChangesetNotificationPB) {
  852. send_notification(&notification.view_id, DatabaseNotification::DidUpdateFilter)
  853. .payload(notification)
  854. .send();
  855. }
  856. pub async fn notify_did_update_sort(&self, notification: SortChangesetNotificationPB) {
  857. if !notification.is_empty() {
  858. send_notification(&notification.view_id, DatabaseNotification::DidUpdateSort)
  859. .payload(notification)
  860. .send();
  861. }
  862. }
  863. async fn notify_did_update_groups(&self, changeset: GroupChangesetPB) {
  864. send_notification(&self.view_id, DatabaseNotification::DidUpdateGroups)
  865. .payload(changeset)
  866. .send();
  867. }
  868. async fn modify<F>(&self, f: F) -> FlowyResult<()>
  869. where
  870. F: for<'a> FnOnce(
  871. &'a mut DatabaseViewRevisionPad,
  872. ) -> FlowyResult<Option<DatabaseViewRevisionChangeset>>,
  873. {
  874. let mut write_guard = self.pad.write().await;
  875. match f(&mut write_guard)? {
  876. None => {},
  877. Some(change) => {
  878. apply_change(&self.user_id, self.rev_manager.clone(), change).await?;
  879. },
  880. }
  881. Ok(())
  882. }
  883. async fn mut_group_controller<F, T>(&self, f: F) -> Option<T>
  884. where
  885. F: FnOnce(&mut Box<dyn GroupController>, Arc<FieldRevision>) -> FlowyResult<T>,
  886. {
  887. let group_field_id = self.group_controller.read().await.field_id().to_owned();
  888. match self.delegate.get_field_rev(&group_field_id).await {
  889. None => None,
  890. Some(field_rev) => {
  891. let mut write_guard = self.group_controller.write().await;
  892. f(&mut write_guard, field_rev).ok()
  893. },
  894. }
  895. }
  896. #[allow(dead_code)]
  897. async fn async_mut_group_controller<F, O, T>(&self, f: F) -> Option<T>
  898. where
  899. F: FnOnce(Arc<RwLock<Box<dyn GroupController>>>, Arc<FieldRevision>) -> O,
  900. O: Future<Output = FlowyResult<T>> + Sync + 'static,
  901. {
  902. let group_field_id = self.group_controller.read().await.field_id().to_owned();
  903. match self.delegate.get_field_rev(&group_field_id).await {
  904. None => None,
  905. Some(field_rev) => {
  906. let _write_guard = self.group_controller.write().await;
  907. f(self.group_controller.clone(), field_rev).await.ok()
  908. },
  909. }
  910. }
  911. }
  912. pub(crate) async fn get_cell_for_row(
  913. delegate: Arc<dyn DatabaseViewData>,
  914. field_id: &str,
  915. row_id: &str,
  916. ) -> Option<RowSingleCellData> {
  917. let (_, row_rev) = delegate.get_row_rev(row_id).await?;
  918. let mut cells = get_cells_for_field_in_rows(delegate, field_id, vec![row_rev])
  919. .await
  920. .ok()?;
  921. if cells.is_empty() {
  922. None
  923. } else {
  924. assert_eq!(cells.len(), 1);
  925. Some(cells.remove(0))
  926. }
  927. }
  928. // Returns the list of cells corresponding to the given field.
  929. pub(crate) async fn get_cells_for_field(
  930. delegate: Arc<dyn DatabaseViewData>,
  931. field_id: &str,
  932. ) -> FlowyResult<Vec<RowSingleCellData>> {
  933. let row_revs = delegate.get_row_revs(None).await;
  934. get_cells_for_field_in_rows(delegate, field_id, row_revs).await
  935. }
  936. pub(crate) async fn get_cells_for_field_in_rows(
  937. delegate: Arc<dyn DatabaseViewData>,
  938. field_id: &str,
  939. row_revs: Vec<Arc<RowRevision>>,
  940. ) -> FlowyResult<Vec<RowSingleCellData>> {
  941. let field_rev = delegate.get_field_rev(field_id).await.unwrap();
  942. let field_type: FieldType = field_rev.ty.into();
  943. let mut cells = vec![];
  944. if let Some(handler) = delegate.get_type_option_cell_handler(&field_rev, &field_type) {
  945. for row_rev in row_revs {
  946. if let Some(cell_rev) = row_rev.cells.get(field_id) {
  947. if let Ok(type_cell_data) = TypeCellData::try_from(cell_rev) {
  948. if let Ok(cell_data) =
  949. handler.get_cell_data(type_cell_data.cell_str, &field_type, &field_rev)
  950. {
  951. cells.push(RowSingleCellData {
  952. row_id: row_rev.id.clone(),
  953. field_id: field_rev.id.clone(),
  954. field_type: field_type.clone(),
  955. cell_data,
  956. })
  957. }
  958. }
  959. }
  960. }
  961. }
  962. Ok(cells)
  963. }
  964. async fn new_group_controller(
  965. user_id: String,
  966. view_id: String,
  967. view_rev_pad: Arc<RwLock<DatabaseViewRevisionPad>>,
  968. rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
  969. delegate: Arc<dyn DatabaseViewData>,
  970. ) -> FlowyResult<Box<dyn GroupController>> {
  971. let configuration_reader = GroupConfigurationReaderImpl {
  972. pad: view_rev_pad.clone(),
  973. view_editor_delegate: delegate.clone(),
  974. };
  975. let field_revs = delegate.get_field_revs(None).await;
  976. let row_revs = delegate.get_row_revs(None).await;
  977. let layout = view_rev_pad.read().await.layout();
  978. // Read the group field or find a new group field
  979. let field_rev = configuration_reader
  980. .get_configuration()
  981. .await
  982. .and_then(|configuration| {
  983. field_revs
  984. .iter()
  985. .find(|field_rev| field_rev.id == configuration.field_id)
  986. .cloned()
  987. })
  988. .unwrap_or_else(|| find_grouping_field(&field_revs, &layout).unwrap());
  989. new_group_controller_with_field_rev(
  990. user_id,
  991. view_id,
  992. view_rev_pad,
  993. rev_manager,
  994. field_rev,
  995. row_revs,
  996. configuration_reader,
  997. )
  998. .await
  999. }
  1000. /// Returns a [GroupController]
  1001. ///
  1002. async fn new_group_controller_with_field_rev(
  1003. user_id: String,
  1004. view_id: String,
  1005. view_rev_pad: Arc<RwLock<DatabaseViewRevisionPad>>,
  1006. rev_manager: Arc<RevisionManager<Arc<ConnectionPool>>>,
  1007. grouping_field_rev: Arc<FieldRevision>,
  1008. row_revs: Vec<Arc<RowRevision>>,
  1009. configuration_reader: GroupConfigurationReaderImpl,
  1010. ) -> FlowyResult<Box<dyn GroupController>> {
  1011. let configuration_writer = GroupConfigurationWriterImpl {
  1012. user_id,
  1013. rev_manager,
  1014. view_pad: view_rev_pad,
  1015. };
  1016. make_group_controller(
  1017. view_id,
  1018. grouping_field_rev,
  1019. row_revs,
  1020. configuration_reader,
  1021. configuration_writer,
  1022. )
  1023. .await
  1024. }
  1025. async fn make_filter_controller(
  1026. view_id: &str,
  1027. delegate: Arc<dyn DatabaseViewData>,
  1028. notifier: DatabaseViewChangedNotifier,
  1029. cell_data_cache: AtomicCellDataCache,
  1030. pad: Arc<RwLock<DatabaseViewRevisionPad>>,
  1031. ) -> Arc<FilterController> {
  1032. let field_revs = delegate.get_field_revs(None).await;
  1033. let filter_revs = pad.read().await.get_all_filters(&field_revs);
  1034. let task_scheduler = delegate.get_task_scheduler();
  1035. let filter_delegate = DatabaseViewFilterDelegateImpl {
  1036. editor_delegate: delegate.clone(),
  1037. view_revision_pad: pad,
  1038. };
  1039. let handler_id = gen_handler_id();
  1040. let filter_controller = FilterController::new(
  1041. view_id,
  1042. &handler_id,
  1043. filter_delegate,
  1044. task_scheduler.clone(),
  1045. filter_revs,
  1046. cell_data_cache,
  1047. notifier,
  1048. )
  1049. .await;
  1050. let filter_controller = Arc::new(filter_controller);
  1051. task_scheduler
  1052. .write()
  1053. .await
  1054. .register_handler(FilterTaskHandler::new(
  1055. handler_id,
  1056. filter_controller.clone(),
  1057. ));
  1058. filter_controller
  1059. }
  1060. async fn make_sort_controller(
  1061. view_id: &str,
  1062. delegate: Arc<dyn DatabaseViewData>,
  1063. notifier: DatabaseViewChangedNotifier,
  1064. filter_controller: Arc<FilterController>,
  1065. pad: Arc<RwLock<DatabaseViewRevisionPad>>,
  1066. cell_data_cache: AtomicCellDataCache,
  1067. ) -> Arc<RwLock<SortController>> {
  1068. let handler_id = gen_handler_id();
  1069. let field_revs = delegate.get_field_revs(None).await;
  1070. let sorts = pad.read().await.get_all_sorts(&field_revs);
  1071. let sort_delegate = DatabaseViewSortDelegateImpl {
  1072. editor_delegate: delegate.clone(),
  1073. view_revision_pad: pad,
  1074. filter_controller,
  1075. };
  1076. let task_scheduler = delegate.get_task_scheduler();
  1077. let sort_controller = Arc::new(RwLock::new(SortController::new(
  1078. view_id,
  1079. &handler_id,
  1080. sorts,
  1081. sort_delegate,
  1082. task_scheduler.clone(),
  1083. cell_data_cache,
  1084. notifier,
  1085. )));
  1086. task_scheduler
  1087. .write()
  1088. .await
  1089. .register_handler(SortTaskHandler::new(handler_id, sort_controller.clone()));
  1090. sort_controller
  1091. }
  1092. fn gen_handler_id() -> String {
  1093. nanoid!(10)
  1094. }
  1095. async fn generate_restore_view(view_id: &str) -> (DatabaseViewRevisionPad, Revision) {
  1096. let database_id = gen_database_id();
  1097. let view = DatabaseViewRevisionPad::new(
  1098. database_id,
  1099. view_id.to_owned(),
  1100. "".to_string(),
  1101. LayoutRevision::Grid,
  1102. );
  1103. let bytes = make_database_view_operations(&view).json_bytes();
  1104. let reset_revision = Revision::initial_revision(view_id, bytes);
  1105. (view, reset_revision)
  1106. }
  1107. #[cfg(test)]
  1108. mod tests {
  1109. use flowy_client_sync::client_database::DatabaseOperations;
  1110. #[test]
  1111. fn test() {
  1112. let s1 = r#"[{"insert":"{\"view_id\":\"fTURELffPr\",\"grid_id\":\"fTURELffPr\",\"layout\":0,\"filters\":[],\"groups\":[]}"}]"#;
  1113. let _delta_1 = DatabaseOperations::from_json(s1).unwrap();
  1114. let s2 = r#"[{"retain":195},{"insert":"{\\\"group_id\\\":\\\"wD9i\\\",\\\"visible\\\":true},{\\\"group_id\\\":\\\"xZtv\\\",\\\"visible\\\":true},{\\\"group_id\\\":\\\"tFV2\\\",\\\"visible\\\":true}"},{"retain":10}]"#;
  1115. let _delta_2 = DatabaseOperations::from_json(s2).unwrap();
  1116. }
  1117. }