manager.rs 14 KB


  1. use crate::entities::LayoutTypePB;
  2. use crate::services::database::{
  3. make_database_block_rev_manager, DatabaseEditor, DatabaseRefIndexerQuery,
  4. DatabaseRevisionCloudService, DatabaseRevisionMergeable, DatabaseRevisionSerde,
  5. };
  6. use crate::services::database_view::{
  7. make_database_view_rev_manager, make_database_view_revision_pad, DatabaseViewEditor,
  8. };
  9. use crate::services::persistence::block_index::BlockRowIndexer;
  10. use crate::services::persistence::database_ref::{DatabaseInfo, DatabaseRef, DatabaseRefIndexer};
  11. use crate::services::persistence::kv::DatabaseKVPersistence;
  12. use crate::services::persistence::migration::DatabaseMigration;
  13. use crate::services::persistence::rev_sqlite::{
  14. SQLiteDatabaseRevisionPersistence, SQLiteDatabaseRevisionSnapshotPersistence,
  15. };
  16. use crate::services::persistence::DatabaseDBConnection;
  17. use std::collections::HashMap;
  18. use database_model::{
  19. gen_database_id, BuildDatabaseContext, DatabaseRevision, DatabaseViewRevision,
  20. };
  21. use flowy_client_sync::client_database::{
  22. make_database_block_operations, make_database_operations, make_database_view_operations,
  23. };
  24. use flowy_error::{FlowyError, FlowyResult};
  25. use flowy_revision::{
  26. RevisionManager, RevisionPersistence, RevisionPersistenceConfiguration, RevisionWebSocket,
  27. };
  28. use flowy_sqlite::ConnectionPool;
  29. use flowy_task::TaskDispatcher;
  30. use revision_model::Revision;
  31. use std::sync::Arc;
  32. use tokio::sync::RwLock;
  33. pub trait DatabaseUser: Send + Sync {
  34. fn user_id(&self) -> Result<String, FlowyError>;
  35. fn token(&self) -> Result<String, FlowyError>;
  36. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError>;
  37. }
  38. pub struct DatabaseManager {
  39. editors_by_database_id: RwLock<HashMap<String, Arc<DatabaseEditor>>>,
  40. database_user: Arc<dyn DatabaseUser>,
  41. block_indexer: Arc<BlockRowIndexer>,
  42. database_ref_indexer: Arc<DatabaseRefIndexer>,
  43. #[allow(dead_code)]
  44. kv_persistence: Arc<DatabaseKVPersistence>,
  45. task_scheduler: Arc<RwLock<TaskDispatcher>>,
  46. #[allow(dead_code)]
  47. migration: DatabaseMigration,
  48. }
  49. impl DatabaseManager {
  50. pub fn new(
  51. database_user: Arc<dyn DatabaseUser>,
  52. _rev_web_socket: Arc<dyn RevisionWebSocket>,
  53. task_scheduler: Arc<RwLock<TaskDispatcher>>,
  54. database_db: Arc<dyn DatabaseDBConnection>,
  55. ) -> Self {
  56. let editors_by_database_id = RwLock::new(HashMap::new());
  57. let kv_persistence = Arc::new(DatabaseKVPersistence::new(database_db.clone()));
  58. let block_indexer = Arc::new(BlockRowIndexer::new(database_db.clone()));
  59. let database_ref_indexer = Arc::new(DatabaseRefIndexer::new(database_db.clone()));
  60. let migration = DatabaseMigration::new(
  61. database_user.clone(),
  62. database_db,
  63. database_ref_indexer.clone(),
  64. );
  65. Self {
  66. editors_by_database_id,
  67. database_user,
  68. kv_persistence,
  69. block_indexer,
  70. database_ref_indexer,
  71. task_scheduler,
  72. migration,
  73. }
  74. }
  75. pub async fn initialize_with_new_user(&self, user_id: &str, _token: &str) -> FlowyResult<()> {
  76. self.migration.run(user_id).await?;
  77. Ok(())
  78. }
  79. pub async fn initialize(&self, user_id: &str, _token: &str) -> FlowyResult<()> {
  80. self.migration.run(user_id).await?;
  81. Ok(())
  82. }
  83. #[tracing::instrument(level = "debug", skip_all, err)]
  84. pub async fn create_database<T: AsRef<str>>(
  85. &self,
  86. database_id: &str,
  87. view_id: T,
  88. name: &str,
  89. revisions: Vec<Revision>,
  90. ) -> FlowyResult<()> {
  91. let db_pool = self.database_user.db_pool()?;
  92. let _ = self
  93. .database_ref_indexer
  94. .bind(database_id, view_id.as_ref(), true, name);
  95. let rev_manager = self.make_database_rev_manager(database_id, db_pool)?;
  96. rev_manager.reset_object(revisions).await?;
  97. Ok(())
  98. }
  99. #[tracing::instrument(level = "debug", skip_all, err)]
  100. async fn create_database_view<T: AsRef<str>>(
  101. &self,
  102. view_id: T,
  103. revisions: Vec<Revision>,
  104. ) -> FlowyResult<()> {
  105. let view_id = view_id.as_ref();
  106. let rev_manager = make_database_view_rev_manager(&self.database_user, view_id).await?;
  107. rev_manager.reset_object(revisions).await?;
  108. Ok(())
  109. }
  110. pub async fn create_database_block<T: AsRef<str>>(
  111. &self,
  112. block_id: T,
  113. revisions: Vec<Revision>,
  114. ) -> FlowyResult<()> {
  115. let block_id = block_id.as_ref();
  116. let rev_manager = make_database_block_rev_manager(&self.database_user, block_id)?;
  117. rev_manager.reset_object(revisions).await?;
  118. Ok(())
  119. }
  120. pub async fn open_database_view<T: AsRef<str>>(
  121. &self,
  122. view_id: T,
  123. ) -> FlowyResult<Arc<DatabaseEditor>> {
  124. let view_id = view_id.as_ref();
  125. let database_info = self.database_ref_indexer.get_database_with_view(view_id)?;
  126. self
  127. .get_or_create_database_editor(&database_info.database_id, view_id)
  128. .await
  129. }
  130. #[tracing::instrument(level = "debug", skip_all)]
  131. pub async fn close_database_view<T: AsRef<str>>(&self, view_id: T) -> FlowyResult<()> {
  132. let view_id = view_id.as_ref();
  133. let database_info = self.database_ref_indexer.get_database_with_view(view_id)?;
  134. tracing::Span::current().record("database_id", &database_info.database_id);
  135. // Create a temporary reference database_editor in case of holding the write lock
  136. // of editors_by_database_id too long.
  137. let database_editor = self
  138. .editors_by_database_id
  139. .write()
  140. .await
  141. .remove(&database_info.database_id);
  142. if let Some(database_editor) = database_editor {
  143. database_editor.close_view_editor(view_id).await;
  144. if database_editor.number_of_ref_views().await == 0 {
  145. database_editor.dispose().await;
  146. } else {
  147. self
  148. .editors_by_database_id
  149. .write()
  150. .await
  151. .insert(database_info.database_id, database_editor);
  152. }
  153. }
  154. Ok(())
  155. }
  156. // #[tracing::instrument(level = "debug", skip(self), err)]
  157. pub async fn get_database_editor(&self, view_id: &str) -> FlowyResult<Arc<DatabaseEditor>> {
  158. let database_info = self.database_ref_indexer.get_database_with_view(view_id)?;
  159. let database_editor = self
  160. .editors_by_database_id
  161. .read()
  162. .await
  163. .get(&database_info.database_id)
  164. .cloned();
  165. match database_editor {
  166. None => {
  167. // Drop the read_guard ASAP in case of the following read/write lock
  168. self.open_database_view(view_id).await
  169. },
  170. Some(editor) => Ok(editor),
  171. }
  172. }
  173. pub async fn get_databases(&self) -> FlowyResult<Vec<DatabaseInfo>> {
  174. self.database_ref_indexer.get_all_databases()
  175. }
  176. pub async fn get_database_ref_views(&self, database_id: &str) -> FlowyResult<Vec<DatabaseRef>> {
  177. self
  178. .database_ref_indexer
  179. .get_ref_views_with_database(database_id)
  180. }
  181. async fn get_or_create_database_editor(
  182. &self,
  183. database_id: &str,
  184. view_id: &str,
  185. ) -> FlowyResult<Arc<DatabaseEditor>> {
  186. let user = self.database_user.clone();
  187. let create_view_editor = |database_editor: Arc<DatabaseEditor>| async move {
  188. let user_id = user.user_id()?;
  189. let (view_pad, view_rev_manager) = make_database_view_revision_pad(view_id, user).await?;
  190. DatabaseViewEditor::from_pad(
  191. &user_id,
  192. database_editor.database_view_data.clone(),
  193. database_editor.cell_data_cache.clone(),
  194. view_rev_manager,
  195. view_pad,
  196. )
  197. .await
  198. };
  199. let database_editor = self
  200. .editors_by_database_id
  201. .read()
  202. .await
  203. .get(database_id)
  204. .cloned();
  205. match database_editor {
  206. None => {
  207. let mut editors_by_database_id = self.editors_by_database_id.write().await;
  208. let db_pool = self.database_user.db_pool()?;
  209. let database_editor = self.make_database_rev_editor(view_id, db_pool).await?;
  210. editors_by_database_id.insert(database_id.to_string(), database_editor.clone());
  211. Ok(database_editor)
  212. },
  213. Some(database_editor) => {
  214. let is_open = database_editor.is_view_open(view_id).await;
  215. if !is_open {
  216. let database_view_editor = create_view_editor(database_editor.clone()).await?;
  217. database_editor.open_view_editor(database_view_editor).await;
  218. }
  219. Ok(database_editor)
  220. },
  221. }
  222. }
  223. #[tracing::instrument(level = "trace", skip(self, pool), err)]
  224. async fn make_database_rev_editor(
  225. &self,
  226. view_id: &str,
  227. pool: Arc<ConnectionPool>,
  228. ) -> Result<Arc<DatabaseEditor>, FlowyError> {
  229. let user = self.database_user.clone();
  230. let (base_view_pad, base_view_rev_manager) =
  231. make_database_view_revision_pad(view_id, user.clone()).await?;
  232. let mut database_id = base_view_pad.database_id.clone();
  233. tracing::debug!("Open database: {} with view: {}", database_id, view_id);
  234. if database_id.is_empty() {
  235. // Before the database_id concept comes up, we used the view_id directly. So if
  236. // the database_id is empty, which means we can used the view_id. After the version 0.1.1,
  237. // we start to used the database_id that enables binding different views to the same database.
  238. database_id = view_id.to_owned();
  239. }
  240. let token = user.token()?;
  241. let cloud = Arc::new(DatabaseRevisionCloudService::new(token));
  242. let mut rev_manager = self.make_database_rev_manager(&database_id, pool.clone())?;
  243. let database_pad = Arc::new(RwLock::new(
  244. rev_manager
  245. .initialize::<DatabaseRevisionSerde>(Some(cloud))
  246. .await?,
  247. ));
  248. let user_id = user.user_id()?;
  249. let database_editor = DatabaseEditor::new(
  250. &database_id,
  251. user,
  252. database_pad,
  253. rev_manager,
  254. self.block_indexer.clone(),
  255. self.database_ref_indexer.clone(),
  256. self.task_scheduler.clone(),
  257. )
  258. .await?;
  259. let base_view_editor = DatabaseViewEditor::from_pad(
  260. &user_id,
  261. database_editor.database_view_data.clone(),
  262. database_editor.cell_data_cache.clone(),
  263. base_view_rev_manager,
  264. base_view_pad,
  265. )
  266. .await?;
  267. database_editor.open_view_editor(base_view_editor).await;
  268. Ok(database_editor)
  269. }
  270. #[tracing::instrument(level = "trace", skip(self, pool), err)]
  271. pub fn make_database_rev_manager(
  272. &self,
  273. database_id: &str,
  274. pool: Arc<ConnectionPool>,
  275. ) -> FlowyResult<RevisionManager<Arc<ConnectionPool>>> {
  276. let user_id = self.database_user.user_id()?;
  277. // Create revision persistence
  278. let disk_cache = SQLiteDatabaseRevisionPersistence::new(&user_id, pool.clone());
  279. let configuration = RevisionPersistenceConfiguration::new(6, false);
  280. let rev_persistence =
  281. RevisionPersistence::new(&user_id, database_id, disk_cache, configuration);
  282. // Create snapshot persistence
  283. const DATABASE_SP_PREFIX: &str = "grid";
  284. let snapshot_object_id = format!("{}:{}", DATABASE_SP_PREFIX, database_id);
  285. let snapshot_persistence =
  286. SQLiteDatabaseRevisionSnapshotPersistence::new(&snapshot_object_id, pool);
  287. let rev_compress = DatabaseRevisionMergeable();
  288. let rev_manager = RevisionManager::new(
  289. &user_id,
  290. database_id,
  291. rev_persistence,
  292. rev_compress,
  293. snapshot_persistence,
  294. );
  295. Ok(rev_manager)
  296. }
  297. }
  298. pub async fn link_existing_database(
  299. view_id: &str,
  300. name: String,
  301. database_id: &str,
  302. layout: LayoutTypePB,
  303. database_manager: Arc<DatabaseManager>,
  304. ) -> FlowyResult<()> {
  305. tracing::trace!(
  306. "Link database view: {} with database: {}",
  307. view_id,
  308. database_id
  309. );
  310. let database_view_rev = DatabaseViewRevision::new(
  311. database_id.to_string(),
  312. view_id.to_owned(),
  313. false,
  314. name.clone(),
  315. layout.into(),
  316. );
  317. let database_view_ops = make_database_view_operations(&database_view_rev);
  318. let database_view_bytes = database_view_ops.json_bytes();
  319. let revision = Revision::initial_revision(view_id, database_view_bytes);
  320. database_manager
  321. .create_database_view(view_id, vec![revision])
  322. .await?;
  323. let _ = database_manager
  324. .database_ref_indexer
  325. .bind(database_id, view_id, false, &name);
  326. Ok(())
  327. }
  328. pub async fn create_new_database(
  329. view_id: &str,
  330. name: String,
  331. layout: LayoutTypePB,
  332. database_manager: Arc<DatabaseManager>,
  333. build_context: BuildDatabaseContext,
  334. ) -> FlowyResult<()> {
  335. let BuildDatabaseContext {
  336. field_revs,
  337. block_metas,
  338. blocks,
  339. database_view_data,
  340. layout_setting,
  341. } = build_context;
  342. for block_meta_data in &blocks {
  343. let block_id = &block_meta_data.block_id;
  344. // Indexing the block's rows
  345. block_meta_data.rows.iter().for_each(|row| {
  346. let _ = database_manager
  347. .block_indexer
  348. .insert(&row.block_id, &row.id);
  349. });
  350. // Create database's block
  351. let database_block_ops = make_database_block_operations(block_meta_data);
  352. let database_block_bytes = database_block_ops.json_bytes();
  353. let revision = Revision::initial_revision(block_id, database_block_bytes);
  354. database_manager
  355. .create_database_block(&block_id, vec![revision])
  356. .await?;
  357. }
  358. let database_id = gen_database_id();
  359. let database_rev = DatabaseRevision::from_build_context(&database_id, field_revs, block_metas);
  360. // Create database
  361. tracing::trace!("Create new database: {}", database_id);
  362. let database_ops = make_database_operations(&database_rev);
  363. let database_bytes = database_ops.json_bytes();
  364. let revision = Revision::initial_revision(&database_id, database_bytes);
  365. database_manager
  366. .create_database(&database_id, &view_id, &name, vec![revision])
  367. .await?;
  368. // Create database view
  369. tracing::trace!("Create new database view: {}", view_id);
  370. let mut database_view_rev = if database_view_data.is_empty() {
  371. DatabaseViewRevision::new(database_id, view_id.to_owned(), true, name, layout.into())
  372. } else {
  373. DatabaseViewRevision::from_json(database_view_data)?
  374. };
  375. tracing::trace!("Initial calendar layout setting: {:?}", layout_setting);
  376. database_view_rev.layout_settings = layout_setting;
  377. let database_view_ops = make_database_view_operations(&database_view_rev);
  378. let database_view_bytes = database_view_ops.json_bytes();
  379. let revision = Revision::initial_revision(view_id, database_view_bytes);
  380. database_manager
  381. .create_database_view(view_id, vec![revision])
  382. .await?;
  383. Ok(())
  384. }
  385. impl DatabaseRefIndexerQuery for DatabaseRefIndexer {
  386. fn get_ref_views(&self, database_id: &str) -> FlowyResult<Vec<DatabaseRef>> {
  387. self.get_ref_views_with_database(database_id)
  388. }
  389. }
  390. impl DatabaseRefIndexerQuery for Arc<DatabaseRefIndexer> {
  391. fn get_ref_views(&self, database_id: &str) -> FlowyResult<Vec<DatabaseRef>> {
  392. (**self).get_ref_views(database_id)
  393. }
  394. }