editor_manager.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. use crate::entities::{
  2. AlterFilterParams, AlterSortParams, CreateRowParams, DatabaseViewSettingPB, DeleteFilterParams,
  3. DeleteGroupParams, DeleteSortParams, InsertGroupParams, MoveGroupParams, RepeatedGroupPB, RowPB,
  4. };
  5. use crate::manager::DatabaseUser;
  6. use crate::services::cell::AtomicCellDataCache;
  7. use crate::services::database::DatabaseBlockEvent;
  8. use crate::services::database_view::notifier::*;
  9. use crate::services::database_view::trait_impl::{
  10. DatabaseViewRevisionMergeable, DatabaseViewRevisionSerde,
  11. };
  12. use crate::services::database_view::{DatabaseViewData, DatabaseViewEditor};
  13. use crate::services::filter::FilterType;
  14. use crate::services::persistence::rev_sqlite::{
  15. SQLiteDatabaseRevisionSnapshotPersistence, SQLiteDatabaseViewRevisionPersistence,
  16. };
  17. use database_model::{FieldRevision, FilterRevision, RowChangeset, RowRevision, SortRevision};
  18. use flowy_client_sync::client_database::DatabaseViewRevisionPad;
  19. use flowy_error::FlowyResult;
  20. use flowy_revision::{RevisionManager, RevisionPersistence, RevisionPersistenceConfiguration};
  21. use flowy_sqlite::ConnectionPool;
  22. use lib_infra::future::Fut;
  23. use std::borrow::Cow;
  24. use std::collections::HashMap;
  25. use std::sync::Arc;
  26. use tokio::sync::{broadcast, RwLock};
  27. /// It's used to manager the list of views that reference to the same database.
  28. pub struct DatabaseViews {
  29. user: Arc<dyn DatabaseUser>,
  30. delegate: Arc<dyn DatabaseViewData>,
  31. view_editors: Arc<RwLock<HashMap<String, Arc<DatabaseViewEditor>>>>,
  32. cell_data_cache: AtomicCellDataCache,
  33. }
  34. impl DatabaseViews {
  35. pub async fn new(
  36. user: Arc<dyn DatabaseUser>,
  37. delegate: Arc<dyn DatabaseViewData>,
  38. cell_data_cache: AtomicCellDataCache,
  39. block_event_rx: broadcast::Receiver<DatabaseBlockEvent>,
  40. ) -> FlowyResult<Self> {
  41. let view_editors = Arc::new(RwLock::new(HashMap::default()));
  42. listen_on_database_block_event(block_event_rx, view_editors.clone());
  43. Ok(Self {
  44. user,
  45. delegate,
  46. view_editors,
  47. cell_data_cache,
  48. })
  49. }
  50. pub async fn open(&self, view_editor: DatabaseViewEditor) {
  51. let view_id = view_editor.view_id.clone();
  52. self
  53. .view_editors
  54. .write()
  55. .await
  56. .insert(view_id, Arc::new(view_editor));
  57. }
  58. pub async fn close(&self, view_id: &str) {
  59. if let Ok(mut view_editors) = self.view_editors.try_write() {
  60. if let Some(view_editor) = view_editors.remove(view_id) {
  61. view_editor.close().await;
  62. }
  63. } else {
  64. tracing::error!("Try to get the lock of view_editors failed");
  65. }
  66. }
  67. pub async fn number_of_views(&self) -> usize {
  68. self.view_editors.read().await.values().len()
  69. }
  70. pub async fn subscribe_view_changed(
  71. &self,
  72. view_id: &str,
  73. ) -> FlowyResult<broadcast::Receiver<DatabaseViewChanged>> {
  74. Ok(self.get_view_editor(view_id).await?.notifier.subscribe())
  75. }
  76. pub async fn get_row_revs(
  77. &self,
  78. view_id: &str,
  79. block_id: &str,
  80. ) -> FlowyResult<Vec<Arc<RowRevision>>> {
  81. let mut row_revs = self
  82. .delegate
  83. .get_row_revs(Some(vec![block_id.to_owned()]))
  84. .await;
  85. if let Ok(view_editor) = self.get_view_editor(view_id).await {
  86. view_editor.v_filter_rows(block_id, &mut row_revs).await;
  87. view_editor.v_sort_rows(&mut row_revs).await;
  88. }
  89. Ok(row_revs)
  90. }
  91. pub async fn duplicate_database_view(&self, view_id: &str) -> FlowyResult<String> {
  92. let editor = self.get_view_editor(view_id).await?;
  93. let view_data = editor.v_duplicate_data().await?;
  94. Ok(view_data)
  95. }
  96. /// When the row was created, we may need to modify the [RowRevision] according to the [CreateRowParams].
  97. pub async fn will_create_row(&self, row_rev: &mut RowRevision, params: &CreateRowParams) {
  98. for view_editor in self.view_editors.read().await.values() {
  99. view_editor.v_will_create_row(row_rev, params).await;
  100. }
  101. }
  102. /// Notify the view that the row was created. For the moment, the view is just sending notifications.
  103. pub async fn did_create_row(&self, row_pb: &RowPB, params: &CreateRowParams) {
  104. for view_editor in self.view_editors.read().await.values() {
  105. view_editor.v_did_create_row(row_pb, params).await;
  106. }
  107. }
  108. /// Insert/Delete the group's row if the corresponding cell data was changed.
  109. pub async fn did_update_row(&self, old_row_rev: Option<Arc<RowRevision>>, row_id: &str) {
  110. match self.delegate.get_row_rev(row_id).await {
  111. None => {
  112. tracing::warn!("Can not find the row in grid view");
  113. },
  114. Some((_, row_rev)) => {
  115. for view_editor in self.view_editors.read().await.values() {
  116. view_editor
  117. .v_did_update_row(old_row_rev.clone(), &row_rev)
  118. .await;
  119. }
  120. },
  121. }
  122. }
  123. pub async fn group_by_field(&self, view_id: &str, field_id: &str) -> FlowyResult<()> {
  124. let view_editor = self.get_view_editor(view_id).await?;
  125. view_editor.v_update_group_setting(field_id).await?;
  126. Ok(())
  127. }
  128. pub async fn did_delete_row(&self, row_rev: Arc<RowRevision>) {
  129. for view_editor in self.view_editors.read().await.values() {
  130. view_editor.v_did_delete_row(&row_rev).await;
  131. }
  132. }
  133. pub async fn get_setting(&self, view_id: &str) -> FlowyResult<DatabaseViewSettingPB> {
  134. let view_editor = self.get_view_editor(view_id).await?;
  135. Ok(view_editor.v_get_setting().await)
  136. }
  137. pub async fn get_all_filters(&self, view_id: &str) -> FlowyResult<Vec<Arc<FilterRevision>>> {
  138. let view_editor = self.get_view_editor(view_id).await?;
  139. Ok(view_editor.v_get_all_filters().await)
  140. }
  141. pub async fn get_filters(
  142. &self,
  143. view_id: &str,
  144. filter_id: &FilterType,
  145. ) -> FlowyResult<Vec<Arc<FilterRevision>>> {
  146. let view_editor = self.get_view_editor(view_id).await?;
  147. Ok(view_editor.v_get_filters(filter_id).await)
  148. }
  149. pub async fn create_or_update_filter(&self, params: AlterFilterParams) -> FlowyResult<()> {
  150. let view_editor = self.get_view_editor(&params.view_id).await?;
  151. view_editor.v_insert_filter(params).await
  152. }
  153. pub async fn delete_filter(&self, params: DeleteFilterParams) -> FlowyResult<()> {
  154. let view_editor = self.get_view_editor(&params.view_id).await?;
  155. view_editor.v_delete_filter(params).await
  156. }
  157. pub async fn get_all_sorts(&self, view_id: &str) -> FlowyResult<Vec<Arc<SortRevision>>> {
  158. let view_editor = self.get_view_editor(view_id).await?;
  159. Ok(view_editor.v_get_all_sorts().await)
  160. }
  161. pub async fn create_or_update_sort(&self, params: AlterSortParams) -> FlowyResult<SortRevision> {
  162. let view_editor = self.get_view_editor(&params.view_id).await?;
  163. view_editor.v_insert_sort(params).await
  164. }
  165. pub async fn delete_all_sorts(&self, view_id: &str) -> FlowyResult<()> {
  166. let view_editor = self.get_view_editor(view_id).await?;
  167. view_editor.v_delete_all_sorts().await
  168. }
  169. pub async fn delete_sort(&self, params: DeleteSortParams) -> FlowyResult<()> {
  170. let view_editor = self.get_view_editor(&params.view_id).await?;
  171. view_editor.v_delete_sort(params).await
  172. }
  173. pub async fn load_groups(&self, view_id: &str) -> FlowyResult<RepeatedGroupPB> {
  174. let view_editor = self.get_view_editor(view_id).await?;
  175. let groups = view_editor.v_load_groups().await?;
  176. Ok(RepeatedGroupPB { items: groups })
  177. }
  178. pub async fn insert_or_update_group(&self, params: InsertGroupParams) -> FlowyResult<()> {
  179. let view_editor = self.get_view_editor(&params.view_id).await?;
  180. view_editor.v_initialize_new_group(params).await
  181. }
  182. pub async fn delete_group(&self, params: DeleteGroupParams) -> FlowyResult<()> {
  183. let view_editor = self.get_view_editor(&params.view_id).await?;
  184. view_editor.v_delete_group(params).await
  185. }
  186. pub async fn move_group(&self, params: MoveGroupParams) -> FlowyResult<()> {
  187. let view_editor = self.get_view_editor(&params.view_id).await?;
  188. view_editor.v_move_group(params).await?;
  189. Ok(())
  190. }
  191. /// It may generate a RowChangeset when the Row was moved from one group to another.
  192. /// The return value, [RowChangeset], contains the changes made by the groups.
  193. ///
  194. pub async fn move_group_row(
  195. &self,
  196. view_id: &str,
  197. row_rev: Arc<RowRevision>,
  198. to_group_id: String,
  199. to_row_id: Option<String>,
  200. recv_row_changeset: impl FnOnce(RowChangeset) -> Fut<()>,
  201. ) -> FlowyResult<()> {
  202. let mut row_changeset = RowChangeset::new(row_rev.id.clone());
  203. let view_editor = self.get_view_editor(view_id).await?;
  204. view_editor
  205. .v_move_group_row(
  206. &row_rev,
  207. &mut row_changeset,
  208. &to_group_id,
  209. to_row_id.clone(),
  210. )
  211. .await;
  212. if !row_changeset.is_empty() {
  213. recv_row_changeset(row_changeset).await;
  214. }
  215. Ok(())
  216. }
  217. /// Notifies the view's field type-option data is changed
  218. /// For the moment, only the groups will be generated after the type-option data changed. A
  219. /// [FieldRevision] has a property named type_options contains a list of type-option data.
  220. /// # Arguments
  221. ///
  222. /// * `field_id`: the id of the field in current view
  223. ///
  224. #[tracing::instrument(level = "debug", skip(self, old_field_rev), err)]
  225. pub async fn did_update_field_type_option(
  226. &self,
  227. view_id: &str,
  228. field_id: &str,
  229. old_field_rev: Option<Arc<FieldRevision>>,
  230. ) -> FlowyResult<()> {
  231. let view_editor = self.get_view_editor(view_id).await?;
  232. // If the id of the grouping field is equal to the updated field's id, then we need to
  233. // update the group setting
  234. if view_editor.group_id().await == field_id {
  235. view_editor.v_update_group_setting(field_id).await?;
  236. }
  237. view_editor
  238. .v_did_update_field_type_option(field_id, old_field_rev)
  239. .await?;
  240. Ok(())
  241. }
  242. pub async fn get_view_editor(&self, view_id: &str) -> FlowyResult<Arc<DatabaseViewEditor>> {
  243. debug_assert!(!view_id.is_empty());
  244. if let Some(editor) = self.view_editors.read().await.get(view_id) {
  245. return Ok(editor.clone());
  246. }
  247. tracing::trace!("{:p} create view:{} editor", self, view_id);
  248. let mut view_editors = self.view_editors.write().await;
  249. let editor = Arc::new(self.make_view_editor(view_id).await?);
  250. view_editors.insert(view_id.to_owned(), editor.clone());
  251. Ok(editor)
  252. }
  253. async fn make_view_editor(&self, view_id: &str) -> FlowyResult<DatabaseViewEditor> {
  254. let rev_manager = make_database_view_rev_manager(&self.user, view_id).await?;
  255. let user_id = self.user.user_id()?;
  256. let token = self.user.token()?;
  257. let view_id = view_id.to_owned();
  258. DatabaseViewEditor::new(
  259. &user_id,
  260. &token,
  261. view_id,
  262. self.delegate.clone(),
  263. self.cell_data_cache.clone(),
  264. rev_manager,
  265. )
  266. .await
  267. }
  268. }
  269. #[tracing::instrument(level = "trace", skip(user), err)]
  270. pub async fn make_database_view_revision_pad(
  271. view_id: &str,
  272. user: Arc<dyn DatabaseUser>,
  273. ) -> FlowyResult<(
  274. DatabaseViewRevisionPad,
  275. RevisionManager<Arc<ConnectionPool>>,
  276. )> {
  277. let mut rev_manager = make_database_view_rev_manager(&user, view_id).await?;
  278. let view_rev_pad = rev_manager
  279. .initialize::<DatabaseViewRevisionSerde>(None)
  280. .await?;
  281. Ok((view_rev_pad, rev_manager))
  282. }
  283. pub async fn make_database_view_rev_manager(
  284. user: &Arc<dyn DatabaseUser>,
  285. view_id: &str,
  286. ) -> FlowyResult<RevisionManager<Arc<ConnectionPool>>> {
  287. let user_id = user.user_id()?;
  288. // Create revision persistence
  289. let pool = user.db_pool()?;
  290. let disk_cache = SQLiteDatabaseViewRevisionPersistence::new(&user_id, pool.clone());
  291. let configuration = RevisionPersistenceConfiguration::new(2, false);
  292. let rev_persistence = RevisionPersistence::new(&user_id, view_id, disk_cache, configuration);
  293. // Create snapshot persistence
  294. const DATABASE_VIEW_SP_PREFIX: &str = "grid_view";
  295. let snapshot_object_id = format!("{}:{}", DATABASE_VIEW_SP_PREFIX, view_id);
  296. let snapshot_persistence =
  297. SQLiteDatabaseRevisionSnapshotPersistence::new(&snapshot_object_id, pool);
  298. let rev_compress = DatabaseViewRevisionMergeable();
  299. Ok(RevisionManager::new(
  300. &user_id,
  301. view_id,
  302. rev_persistence,
  303. rev_compress,
  304. snapshot_persistence,
  305. ))
  306. }
  307. fn listen_on_database_block_event(
  308. mut block_event_rx: broadcast::Receiver<DatabaseBlockEvent>,
  309. view_editors: Arc<RwLock<HashMap<String, Arc<DatabaseViewEditor>>>>,
  310. ) {
  311. tokio::spawn(async move {
  312. loop {
  313. while let Ok(event) = block_event_rx.recv().await {
  314. let read_guard = view_editors.read().await;
  315. let view_editors = read_guard.values();
  316. let event = if view_editors.len() == 1 {
  317. Cow::Owned(event)
  318. } else {
  319. Cow::Borrowed(&event)
  320. };
  321. for view_editor in view_editors {
  322. view_editor.handle_block_event(event.clone()).await;
  323. }
  324. }
  325. }
  326. });
  327. }