manager.rs 14 KB


  1. use crate::entities::LayoutTypePB;
  2. use crate::services::database::{
  3. make_database_block_rev_manager, DatabaseEditor, DatabaseRefIndexerQuery,
  4. DatabaseRevisionCloudService, DatabaseRevisionMergeable, DatabaseRevisionSerde,
  5. };
  6. use crate::services::database_view::{
  7. make_database_view_rev_manager, make_database_view_revision_pad, DatabaseViewEditor,
  8. };
  9. use crate::services::persistence::block_index::BlockRowIndexer;
  10. use crate::services::persistence::database_ref::{DatabaseInfo, DatabaseRefs, DatabaseViewRef};
  11. use crate::services::persistence::kv::DatabaseKVPersistence;
  12. use crate::services::persistence::migration::DatabaseMigration;
  13. use crate::services::persistence::rev_sqlite::{
  14. SQLiteDatabaseRevisionPersistence, SQLiteDatabaseRevisionSnapshotPersistence,
  15. };
  16. use crate::services::persistence::DatabaseDBConnection;
  17. use std::collections::HashMap;
  18. use database_model::{
  19. gen_database_id, BuildDatabaseContext, DatabaseRevision, DatabaseViewRevision,
  20. };
  21. use flowy_client_sync::client_database::{
  22. make_database_block_operations, make_database_operations, make_database_view_operations,
  23. };
  24. use flowy_error::{FlowyError, FlowyResult};
  25. use flowy_revision::{
  26. RevisionManager, RevisionPersistence, RevisionPersistenceConfiguration, RevisionWebSocket,
  27. };
  28. use flowy_sqlite::ConnectionPool;
  29. use flowy_task::TaskDispatcher;
  30. use lib_infra::future::Fut;
  31. use revision_model::Revision;
  32. use std::sync::Arc;
  33. use tokio::sync::RwLock;
  34. pub trait DatabaseUser: Send + Sync {
  35. fn user_id(&self) -> Result<String, FlowyError>;
  36. fn token(&self) -> Result<String, FlowyError>;
  37. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError>;
  38. }
  39. pub struct DatabaseManager {
  40. editors_by_database_id: RwLock<HashMap<String, Arc<DatabaseEditor>>>,
  41. database_user: Arc<dyn DatabaseUser>,
  42. block_indexer: Arc<BlockRowIndexer>,
  43. database_refs: Arc<DatabaseRefs>,
  44. #[allow(dead_code)]
  45. kv_persistence: Arc<DatabaseKVPersistence>,
  46. task_scheduler: Arc<RwLock<TaskDispatcher>>,
  47. #[allow(dead_code)]
  48. migration: DatabaseMigration,
  49. }
  50. impl DatabaseManager {
  51. pub fn new(
  52. database_user: Arc<dyn DatabaseUser>,
  53. _rev_web_socket: Arc<dyn RevisionWebSocket>,
  54. task_scheduler: Arc<RwLock<TaskDispatcher>>,
  55. database_db: Arc<dyn DatabaseDBConnection>,
  56. ) -> Self {
  57. let editors_by_database_id = RwLock::new(HashMap::new());
  58. let kv_persistence = Arc::new(DatabaseKVPersistence::new(database_db.clone()));
  59. let block_indexer = Arc::new(BlockRowIndexer::new(database_db.clone()));
  60. let database_refs = Arc::new(DatabaseRefs::new(database_db));
  61. let migration = DatabaseMigration::new(database_user.clone(), database_refs.clone());
  62. Self {
  63. editors_by_database_id,
  64. database_user,
  65. kv_persistence,
  66. block_indexer,
  67. database_refs,
  68. task_scheduler,
  69. migration,
  70. }
  71. }
  72. pub async fn initialize_with_new_user(&self, _user_id: &str, _token: &str) -> FlowyResult<()> {
  73. Ok(())
  74. }
  75. pub async fn initialize(
  76. &self,
  77. user_id: &str,
  78. _token: &str,
  79. get_views_fn: Fut<Vec<(String, String, LayoutTypePB)>>,
  80. ) -> FlowyResult<()> {
  81. self.migration.run(user_id, get_views_fn).await?;
  82. Ok(())
  83. }
  84. #[tracing::instrument(level = "debug", skip_all, err)]
  85. pub async fn create_database<T: AsRef<str>>(
  86. &self,
  87. database_id: &str,
  88. view_id: T,
  89. name: &str,
  90. revisions: Vec<Revision>,
  91. ) -> FlowyResult<()> {
  92. let db_pool = self.database_user.db_pool()?;
  93. let _ = self
  94. .database_refs
  95. .bind(database_id, view_id.as_ref(), true, name);
  96. let rev_manager = self.make_database_rev_manager(database_id, db_pool)?;
  97. rev_manager.reset_object(revisions).await?;
  98. Ok(())
  99. }
  100. #[tracing::instrument(level = "debug", skip_all, err)]
  101. async fn create_database_view<T: AsRef<str>>(
  102. &self,
  103. view_id: T,
  104. revisions: Vec<Revision>,
  105. ) -> FlowyResult<()> {
  106. let view_id = view_id.as_ref();
  107. let user_id = self.database_user.user_id()?;
  108. let pool = self.database_user.db_pool()?;
  109. let rev_manager = make_database_view_rev_manager(&user_id, pool, view_id).await?;
  110. rev_manager.reset_object(revisions).await?;
  111. Ok(())
  112. }
  113. pub async fn create_database_block<T: AsRef<str>>(
  114. &self,
  115. block_id: T,
  116. revisions: Vec<Revision>,
  117. ) -> FlowyResult<()> {
  118. let block_id = block_id.as_ref();
  119. let rev_manager = make_database_block_rev_manager(&self.database_user, block_id)?;
  120. rev_manager.reset_object(revisions).await?;
  121. Ok(())
  122. }
  123. #[tracing::instrument(level = "trace", skip_all, err)]
  124. pub async fn open_database_view<T: AsRef<str>>(
  125. &self,
  126. view_id: T,
  127. ) -> FlowyResult<Arc<DatabaseEditor>> {
  128. let view_id = view_id.as_ref();
  129. let database_info = self.database_refs.get_database_with_view(view_id)?;
  130. self
  131. .get_or_create_database_editor(&database_info.database_id, view_id)
  132. .await
  133. }
  134. #[tracing::instrument(level = "debug", skip_all)]
  135. pub async fn close_database_view<T: AsRef<str>>(&self, view_id: T) -> FlowyResult<()> {
  136. let view_id = view_id.as_ref();
  137. let database_info = self.database_refs.get_database_with_view(view_id)?;
  138. tracing::Span::current().record("database_id", &database_info.database_id);
  139. // Create a temporary reference database_editor in case of holding the write lock
  140. // of editors_by_database_id too long.
  141. let database_editor = self
  142. .editors_by_database_id
  143. .write()
  144. .await
  145. .remove(&database_info.database_id);
  146. if let Some(database_editor) = database_editor {
  147. database_editor.close_view_editor(view_id).await;
  148. if database_editor.number_of_ref_views().await == 0 {
  149. database_editor.dispose().await;
  150. } else {
  151. self
  152. .editors_by_database_id
  153. .write()
  154. .await
  155. .insert(database_info.database_id, database_editor);
  156. }
  157. }
  158. Ok(())
  159. }
  160. // #[tracing::instrument(level = "debug", skip(self), err)]
  161. pub async fn get_database_editor(&self, view_id: &str) -> FlowyResult<Arc<DatabaseEditor>> {
  162. let database_info = self.database_refs.get_database_with_view(view_id)?;
  163. let database_editor = self
  164. .editors_by_database_id
  165. .read()
  166. .await
  167. .get(&database_info.database_id)
  168. .cloned();
  169. match database_editor {
  170. None => {
  171. // Drop the read_guard ASAP in case of the following read/write lock
  172. self.open_database_view(view_id).await
  173. },
  174. Some(editor) => Ok(editor),
  175. }
  176. }
  177. pub async fn get_databases(&self) -> FlowyResult<Vec<DatabaseInfo>> {
  178. self.database_refs.get_all_databases()
  179. }
  180. pub async fn get_database_ref_views(
  181. &self,
  182. database_id: &str,
  183. ) -> FlowyResult<Vec<DatabaseViewRef>> {
  184. self.database_refs.get_ref_views_with_database(database_id)
  185. }
  186. async fn get_or_create_database_editor(
  187. &self,
  188. database_id: &str,
  189. view_id: &str,
  190. ) -> FlowyResult<Arc<DatabaseEditor>> {
  191. let user = self.database_user.clone();
  192. let create_view_editor = |database_editor: Arc<DatabaseEditor>| async move {
  193. let user_id = user.user_id()?;
  194. let (view_pad, view_rev_manager) = make_database_view_revision_pad(view_id, user).await?;
  195. DatabaseViewEditor::from_pad(
  196. &user_id,
  197. database_editor.database_view_data.clone(),
  198. database_editor.cell_data_cache.clone(),
  199. view_rev_manager,
  200. view_pad,
  201. )
  202. .await
  203. };
  204. let database_editor = self
  205. .editors_by_database_id
  206. .read()
  207. .await
  208. .get(database_id)
  209. .cloned();
  210. match database_editor {
  211. None => {
  212. let mut editors_by_database_id = self.editors_by_database_id.write().await;
  213. let db_pool = self.database_user.db_pool()?;
  214. let database_editor = self.make_database_rev_editor(view_id, db_pool).await?;
  215. editors_by_database_id.insert(database_id.to_string(), database_editor.clone());
  216. Ok(database_editor)
  217. },
  218. Some(database_editor) => {
  219. let is_open = database_editor.is_view_open(view_id).await;
  220. if !is_open {
  221. let database_view_editor = create_view_editor(database_editor.clone()).await?;
  222. database_editor.open_view_editor(database_view_editor).await;
  223. }
  224. Ok(database_editor)
  225. },
  226. }
  227. }
  228. #[tracing::instrument(level = "trace", skip(self, pool), err)]
  229. async fn make_database_rev_editor(
  230. &self,
  231. view_id: &str,
  232. pool: Arc<ConnectionPool>,
  233. ) -> Result<Arc<DatabaseEditor>, FlowyError> {
  234. let user = self.database_user.clone();
  235. let (base_view_pad, base_view_rev_manager) =
  236. make_database_view_revision_pad(view_id, user.clone()).await?;
  237. let mut database_id = base_view_pad.database_id.clone();
  238. tracing::debug!("Open database: {} with view: {}", database_id, view_id);
  239. if database_id.is_empty() {
  240. // Before the database_id concept comes up, we used the view_id directly. So if
  241. // the database_id is empty, which means we can used the view_id. After the version 0.1.1,
  242. // we start to used the database_id that enables binding different views to the same database.
  243. database_id = view_id.to_owned();
  244. }
  245. let token = user.token()?;
  246. let cloud = Arc::new(DatabaseRevisionCloudService::new(token));
  247. let mut rev_manager = self.make_database_rev_manager(&database_id, pool.clone())?;
  248. let database_pad = Arc::new(RwLock::new(
  249. rev_manager
  250. .initialize::<DatabaseRevisionSerde>(Some(cloud))
  251. .await?,
  252. ));
  253. let user_id = user.user_id()?;
  254. let database_editor = DatabaseEditor::new(
  255. &database_id,
  256. user,
  257. database_pad,
  258. rev_manager,
  259. self.block_indexer.clone(),
  260. self.database_refs.clone(),
  261. self.task_scheduler.clone(),
  262. )
  263. .await?;
  264. let base_view_editor = DatabaseViewEditor::from_pad(
  265. &user_id,
  266. database_editor.database_view_data.clone(),
  267. database_editor.cell_data_cache.clone(),
  268. base_view_rev_manager,
  269. base_view_pad,
  270. )
  271. .await?;
  272. database_editor.open_view_editor(base_view_editor).await;
  273. Ok(database_editor)
  274. }
  275. #[tracing::instrument(level = "trace", skip(self, pool), err)]
  276. pub fn make_database_rev_manager(
  277. &self,
  278. database_id: &str,
  279. pool: Arc<ConnectionPool>,
  280. ) -> FlowyResult<RevisionManager<Arc<ConnectionPool>>> {
  281. let user_id = self.database_user.user_id()?;
  282. // Create revision persistence
  283. let disk_cache = SQLiteDatabaseRevisionPersistence::new(&user_id, pool.clone());
  284. let configuration = RevisionPersistenceConfiguration::new(6, false);
  285. let rev_persistence =
  286. RevisionPersistence::new(&user_id, database_id, disk_cache, configuration);
  287. // Create snapshot persistence
  288. const DATABASE_SP_PREFIX: &str = "grid";
  289. let snapshot_object_id = format!("{}:{}", DATABASE_SP_PREFIX, database_id);
  290. let snapshot_persistence =
  291. SQLiteDatabaseRevisionSnapshotPersistence::new(&snapshot_object_id, pool);
  292. let rev_compress = DatabaseRevisionMergeable();
  293. let rev_manager = RevisionManager::new(
  294. &user_id,
  295. database_id,
  296. rev_persistence,
  297. rev_compress,
  298. snapshot_persistence,
  299. );
  300. Ok(rev_manager)
  301. }
  302. }
  303. pub async fn link_existing_database(
  304. view_id: &str,
  305. name: String,
  306. database_id: &str,
  307. layout: LayoutTypePB,
  308. database_manager: Arc<DatabaseManager>,
  309. ) -> FlowyResult<()> {
  310. tracing::trace!(
  311. "Link database view: {} with database: {}",
  312. view_id,
  313. database_id
  314. );
  315. let database_view_rev = DatabaseViewRevision::new(
  316. database_id.to_string(),
  317. view_id.to_owned(),
  318. false,
  319. name.clone(),
  320. layout.into(),
  321. );
  322. let database_view_ops = make_database_view_operations(&database_view_rev);
  323. let database_view_bytes = database_view_ops.json_bytes();
  324. let revision = Revision::initial_revision(view_id, database_view_bytes);
  325. database_manager
  326. .create_database_view(view_id, vec![revision])
  327. .await?;
  328. let _ = database_manager
  329. .database_refs
  330. .bind(database_id, view_id, false, &name);
  331. Ok(())
  332. }
  333. pub async fn create_new_database(
  334. view_id: &str,
  335. name: String,
  336. layout: LayoutTypePB,
  337. database_manager: Arc<DatabaseManager>,
  338. build_context: BuildDatabaseContext,
  339. ) -> FlowyResult<()> {
  340. let BuildDatabaseContext {
  341. field_revs,
  342. block_metas,
  343. blocks,
  344. database_view_data,
  345. layout_setting,
  346. } = build_context;
  347. for block_meta_data in &blocks {
  348. let block_id = &block_meta_data.block_id;
  349. // Indexing the block's rows
  350. block_meta_data.rows.iter().for_each(|row| {
  351. let _ = database_manager
  352. .block_indexer
  353. .insert(&row.block_id, &row.id);
  354. });
  355. // Create database's block
  356. let database_block_ops = make_database_block_operations(block_meta_data);
  357. let database_block_bytes = database_block_ops.json_bytes();
  358. let revision = Revision::initial_revision(block_id, database_block_bytes);
  359. database_manager
  360. .create_database_block(&block_id, vec![revision])
  361. .await?;
  362. }
  363. let database_id = gen_database_id();
  364. let database_rev = DatabaseRevision::from_build_context(&database_id, field_revs, block_metas);
  365. // Create database
  366. tracing::trace!("Create new database: {}", database_id);
  367. let database_ops = make_database_operations(&database_rev);
  368. let database_bytes = database_ops.json_bytes();
  369. let revision = Revision::initial_revision(&database_id, database_bytes);
  370. database_manager
  371. .create_database(&database_id, &view_id, &name, vec![revision])
  372. .await?;
  373. // Create database view
  374. tracing::trace!("Create new database view: {}", view_id);
  375. let mut database_view_rev = if database_view_data.is_empty() {
  376. DatabaseViewRevision::new(database_id, view_id.to_owned(), true, name, layout.into())
  377. } else {
  378. DatabaseViewRevision::from_json(database_view_data)?
  379. };
  380. database_view_rev.layout_settings = layout_setting;
  381. let database_view_ops = make_database_view_operations(&database_view_rev);
  382. let database_view_bytes = database_view_ops.json_bytes();
  383. let revision = Revision::initial_revision(view_id, database_view_bytes);
  384. database_manager
  385. .create_database_view(view_id, vec![revision])
  386. .await?;
  387. Ok(())
  388. }
  389. impl DatabaseRefIndexerQuery for DatabaseRefs {
  390. fn get_ref_views(&self, database_id: &str) -> FlowyResult<Vec<DatabaseViewRef>> {
  391. self.get_ref_views_with_database(database_id)
  392. }
  393. }
  394. impl DatabaseRefIndexerQuery for Arc<DatabaseRefs> {
  395. fn get_ref_views(&self, database_id: &str) -> FlowyResult<Vec<DatabaseViewRef>> {
  396. (**self).get_ref_views(database_id)
  397. }
  398. }