manager.rs 14 KB


  1. use crate::entities::DatabaseLayoutPB;
  2. use crate::services::database::{
  3. make_database_block_rev_manager, DatabaseEditor, DatabaseRefIndexerQuery,
  4. DatabaseRevisionCloudService, DatabaseRevisionMergeable, DatabaseRevisionSerde,
  5. };
  6. use crate::services::database_view::{
  7. make_database_view_rev_manager, make_database_view_revision_pad, DatabaseViewEditor,
  8. };
  9. use crate::services::persistence::block_index::BlockRowIndexer;
  10. use crate::services::persistence::database_ref::{DatabaseInfo, DatabaseRefs, DatabaseViewRef};
  11. use crate::services::persistence::kv::DatabaseKVPersistence;
  12. use crate::services::persistence::migration::DatabaseMigration;
  13. use crate::services::persistence::rev_sqlite::{
  14. SQLiteDatabaseRevisionPersistence, SQLiteDatabaseRevisionSnapshotPersistence,
  15. };
  16. use crate::services::persistence::DatabaseDBConnection;
  17. use std::collections::HashMap;
  18. use database_model::{
  19. gen_database_id, BuildDatabaseContext, DatabaseRevision, DatabaseViewRevision,
  20. };
  21. use flowy_client_sync::client_database::{
  22. make_database_block_operations, make_database_operations, make_database_view_operations,
  23. };
  24. use flowy_error::{FlowyError, FlowyResult};
  25. use flowy_revision::{
  26. RevisionManager, RevisionPersistence, RevisionPersistenceConfiguration, RevisionWebSocket,
  27. };
  28. use flowy_sqlite::ConnectionPool;
  29. use flowy_task::TaskDispatcher;
  30. use lib_infra::future::Fut;
  31. use revision_model::Revision;
  32. use std::sync::Arc;
  33. use tokio::sync::RwLock;
  34. pub trait DatabaseUser: Send + Sync {
  35. fn user_id(&self) -> Result<i64, FlowyError>;
  36. fn token(&self) -> Result<String, FlowyError>;
  37. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError>;
  38. }
  39. pub struct DatabaseManager {
  40. editors_by_database_id: RwLock<HashMap<String, Arc<DatabaseEditor>>>,
  41. database_user: Arc<dyn DatabaseUser>,
  42. block_indexer: Arc<BlockRowIndexer>,
  43. database_refs: Arc<DatabaseRefs>,
  44. #[allow(dead_code)]
  45. kv_persistence: Arc<DatabaseKVPersistence>,
  46. task_scheduler: Arc<RwLock<TaskDispatcher>>,
  47. #[allow(dead_code)]
  48. migration: DatabaseMigration,
  49. }
  50. impl DatabaseManager {
  51. pub fn new(
  52. database_user: Arc<dyn DatabaseUser>,
  53. _rev_web_socket: Arc<dyn RevisionWebSocket>,
  54. task_scheduler: Arc<RwLock<TaskDispatcher>>,
  55. database_db: Arc<dyn DatabaseDBConnection>,
  56. ) -> Self {
  57. let editors_by_database_id = RwLock::new(HashMap::new());
  58. let kv_persistence = Arc::new(DatabaseKVPersistence::new(database_db.clone()));
  59. let block_indexer = Arc::new(BlockRowIndexer::new(database_db.clone()));
  60. let database_refs = Arc::new(DatabaseRefs::new(database_db));
  61. let migration = DatabaseMigration::new(database_user.clone(), database_refs.clone());
  62. Self {
  63. editors_by_database_id,
  64. database_user,
  65. kv_persistence,
  66. block_indexer,
  67. database_refs,
  68. task_scheduler,
  69. migration,
  70. }
  71. }
  72. pub async fn initialize_with_new_user(&self, _user_id: i64, _token: &str) -> FlowyResult<()> {
  73. Ok(())
  74. }
  75. pub async fn initialize(
  76. &self,
  77. user_id: i64,
  78. _token: &str,
  79. get_views_fn: Fut<Vec<(String, String, DatabaseLayoutPB)>>,
  80. ) -> FlowyResult<()> {
  81. self.migration.run(user_id, get_views_fn).await?;
  82. Ok(())
  83. }
  84. #[tracing::instrument(level = "debug", skip_all, err)]
  85. pub async fn create_database<T: AsRef<str>>(
  86. &self,
  87. database_id: &str,
  88. view_id: T,
  89. name: &str,
  90. revisions: Vec<Revision>,
  91. ) -> FlowyResult<()> {
  92. let db_pool = self.database_user.db_pool()?;
  93. let _ = self
  94. .database_refs
  95. .bind(database_id, view_id.as_ref(), true, name);
  96. let rev_manager = self.make_database_rev_manager(database_id, db_pool)?;
  97. rev_manager.reset_object(revisions).await?;
  98. Ok(())
  99. }
  100. #[tracing::instrument(level = "debug", skip_all, err)]
  101. async fn create_database_view<T: AsRef<str>>(
  102. &self,
  103. view_id: T,
  104. revisions: Vec<Revision>,
  105. ) -> FlowyResult<()> {
  106. let view_id = view_id.as_ref();
  107. let pool = self.database_user.db_pool()?;
  108. let rev_manager = make_database_view_rev_manager(pool, view_id).await?;
  109. rev_manager.reset_object(revisions).await?;
  110. Ok(())
  111. }
  112. pub async fn create_database_block<T: AsRef<str>>(
  113. &self,
  114. block_id: T,
  115. revisions: Vec<Revision>,
  116. ) -> FlowyResult<()> {
  117. let block_id = block_id.as_ref();
  118. let rev_manager = make_database_block_rev_manager(&self.database_user, block_id)?;
  119. rev_manager.reset_object(revisions).await?;
  120. Ok(())
  121. }
  122. #[tracing::instrument(level = "trace", skip_all, err)]
  123. pub async fn open_database_view<T: AsRef<str>>(
  124. &self,
  125. view_id: T,
  126. ) -> FlowyResult<Arc<DatabaseEditor>> {
  127. let view_id = view_id.as_ref();
  128. let database_info = self.database_refs.get_database_with_view(view_id)?;
  129. self
  130. .get_or_create_database_editor(&database_info.database_id, view_id)
  131. .await
  132. }
  133. #[tracing::instrument(level = "debug", skip_all)]
  134. pub async fn close_database_view<T: AsRef<str>>(&self, view_id: T) -> FlowyResult<()> {
  135. let view_id = view_id.as_ref();
  136. let database_info = self.database_refs.get_database_with_view(view_id)?;
  137. tracing::Span::current().record("database_id", &database_info.database_id);
  138. // Create a temporary reference database_editor in case of holding the write lock
  139. // of editors_by_database_id too long.
  140. let database_editor = self
  141. .editors_by_database_id
  142. .write()
  143. .await
  144. .remove(&database_info.database_id);
  145. if let Some(database_editor) = database_editor {
  146. database_editor.close_view_editor(view_id).await;
  147. if database_editor.number_of_ref_views().await == 0 {
  148. database_editor.dispose().await;
  149. } else {
  150. self
  151. .editors_by_database_id
  152. .write()
  153. .await
  154. .insert(database_info.database_id, database_editor);
  155. }
  156. }
  157. Ok(())
  158. }
  159. // #[tracing::instrument(level = "debug", skip(self), err)]
  160. pub async fn get_database_editor(&self, view_id: &str) -> FlowyResult<Arc<DatabaseEditor>> {
  161. let database_info = self.database_refs.get_database_with_view(view_id)?;
  162. let database_editor = self
  163. .editors_by_database_id
  164. .read()
  165. .await
  166. .get(&database_info.database_id)
  167. .cloned();
  168. match database_editor {
  169. None => {
  170. // Drop the read_guard ASAP in case of the following read/write lock
  171. self.open_database_view(view_id).await
  172. },
  173. Some(editor) => Ok(editor),
  174. }
  175. }
  176. pub async fn get_databases(&self) -> FlowyResult<Vec<DatabaseInfo>> {
  177. self.database_refs.get_all_databases()
  178. }
  179. pub async fn get_database_ref_views(
  180. &self,
  181. database_id: &str,
  182. ) -> FlowyResult<Vec<DatabaseViewRef>> {
  183. self.database_refs.get_ref_views_with_database(database_id)
  184. }
  185. async fn get_or_create_database_editor(
  186. &self,
  187. database_id: &str,
  188. view_id: &str,
  189. ) -> FlowyResult<Arc<DatabaseEditor>> {
  190. let user = self.database_user.clone();
  191. let create_view_editor = |database_editor: Arc<DatabaseEditor>| async move {
  192. let (view_pad, view_rev_manager) = make_database_view_revision_pad(view_id, user).await?;
  193. DatabaseViewEditor::from_pad(
  194. database_editor.database_view_data.clone(),
  195. database_editor.cell_data_cache.clone(),
  196. view_rev_manager,
  197. view_pad,
  198. )
  199. .await
  200. };
  201. let database_editor = self
  202. .editors_by_database_id
  203. .read()
  204. .await
  205. .get(database_id)
  206. .cloned();
  207. match database_editor {
  208. None => {
  209. let mut editors_by_database_id = self.editors_by_database_id.write().await;
  210. let db_pool = self.database_user.db_pool()?;
  211. let database_editor = self.make_database_rev_editor(view_id, db_pool).await?;
  212. editors_by_database_id.insert(database_id.to_string(), database_editor.clone());
  213. Ok(database_editor)
  214. },
  215. Some(database_editor) => {
  216. let is_open = database_editor.is_view_open(view_id).await;
  217. if !is_open {
  218. let database_view_editor = create_view_editor(database_editor.clone()).await?;
  219. database_editor.open_view_editor(database_view_editor).await;
  220. }
  221. Ok(database_editor)
  222. },
  223. }
  224. }
  225. #[tracing::instrument(level = "trace", skip(self, pool), err)]
  226. async fn make_database_rev_editor(
  227. &self,
  228. view_id: &str,
  229. pool: Arc<ConnectionPool>,
  230. ) -> Result<Arc<DatabaseEditor>, FlowyError> {
  231. let user = self.database_user.clone();
  232. let (base_view_pad, base_view_rev_manager) =
  233. make_database_view_revision_pad(view_id, user.clone()).await?;
  234. let mut database_id = base_view_pad.database_id.clone();
  235. tracing::debug!("Open database: {} with view: {}", database_id, view_id);
  236. if database_id.is_empty() {
  237. // Before the database_id concept comes up, we used the view_id directly. So if
  238. // the database_id is empty, which means we can used the view_id. After the version 0.1.1,
  239. // we start to used the database_id that enables binding different views to the same database.
  240. database_id = view_id.to_owned();
  241. }
  242. let token = user.token()?;
  243. let cloud = Arc::new(DatabaseRevisionCloudService::new(token));
  244. let mut rev_manager = self.make_database_rev_manager(&database_id, pool.clone())?;
  245. let database_pad = Arc::new(RwLock::new(
  246. rev_manager
  247. .initialize::<DatabaseRevisionSerde>(Some(cloud))
  248. .await?,
  249. ));
  250. let database_editor = DatabaseEditor::new(
  251. &database_id,
  252. user,
  253. database_pad,
  254. rev_manager,
  255. self.block_indexer.clone(),
  256. self.database_refs.clone(),
  257. self.task_scheduler.clone(),
  258. )
  259. .await?;
  260. let base_view_editor = DatabaseViewEditor::from_pad(
  261. database_editor.database_view_data.clone(),
  262. database_editor.cell_data_cache.clone(),
  263. base_view_rev_manager,
  264. base_view_pad,
  265. )
  266. .await?;
  267. database_editor.open_view_editor(base_view_editor).await;
  268. Ok(database_editor)
  269. }
  270. #[tracing::instrument(level = "trace", skip(self, pool), err)]
  271. pub fn make_database_rev_manager(
  272. &self,
  273. database_id: &str,
  274. pool: Arc<ConnectionPool>,
  275. ) -> FlowyResult<RevisionManager<Arc<ConnectionPool>>> {
  276. // Create revision persistence
  277. let disk_cache = SQLiteDatabaseRevisionPersistence::new(pool.clone());
  278. let configuration = RevisionPersistenceConfiguration::new(6, false);
  279. let rev_persistence = RevisionPersistence::new(database_id, disk_cache, configuration);
  280. // Create snapshot persistence
  281. const DATABASE_SP_PREFIX: &str = "grid";
  282. let snapshot_object_id = format!("{}:{}", DATABASE_SP_PREFIX, database_id);
  283. let snapshot_persistence =
  284. SQLiteDatabaseRevisionSnapshotPersistence::new(&snapshot_object_id, pool);
  285. let rev_compress = DatabaseRevisionMergeable();
  286. let rev_manager = RevisionManager::new(
  287. database_id,
  288. rev_persistence,
  289. rev_compress,
  290. snapshot_persistence,
  291. );
  292. Ok(rev_manager)
  293. }
  294. }
  295. pub async fn link_existing_database(
  296. view_id: &str,
  297. name: String,
  298. database_id: &str,
  299. layout: DatabaseLayoutPB,
  300. database_manager: Arc<DatabaseManager>,
  301. ) -> FlowyResult<()> {
  302. tracing::trace!(
  303. "Link database view: {} with database: {}",
  304. view_id,
  305. database_id
  306. );
  307. let database_view_rev = DatabaseViewRevision::new(
  308. database_id.to_string(),
  309. view_id.to_owned(),
  310. false,
  311. name.clone(),
  312. layout.into(),
  313. );
  314. let database_view_ops = make_database_view_operations(&database_view_rev);
  315. let database_view_bytes = database_view_ops.json_bytes();
  316. let revision = Revision::initial_revision(view_id, database_view_bytes);
  317. database_manager
  318. .create_database_view(view_id, vec![revision])
  319. .await?;
  320. let _ = database_manager
  321. .database_refs
  322. .bind(database_id, view_id, false, &name);
  323. Ok(())
  324. }
  325. pub async fn create_new_database(
  326. view_id: &str,
  327. name: String,
  328. layout: DatabaseLayoutPB,
  329. database_manager: Arc<DatabaseManager>,
  330. build_context: BuildDatabaseContext,
  331. ) -> FlowyResult<()> {
  332. let BuildDatabaseContext {
  333. field_revs,
  334. block_metas,
  335. blocks,
  336. database_view_data,
  337. layout_setting,
  338. } = build_context;
  339. for block_meta_data in &blocks {
  340. let block_id = &block_meta_data.block_id;
  341. // Indexing the block's rows
  342. block_meta_data.rows.iter().for_each(|row| {
  343. let _ = database_manager
  344. .block_indexer
  345. .insert(&row.block_id, &row.id);
  346. });
  347. // Create database's block
  348. let database_block_ops = make_database_block_operations(block_meta_data);
  349. let database_block_bytes = database_block_ops.json_bytes();
  350. let revision = Revision::initial_revision(block_id, database_block_bytes);
  351. database_manager
  352. .create_database_block(&block_id, vec![revision])
  353. .await?;
  354. }
  355. let database_id = gen_database_id();
  356. let database_rev = DatabaseRevision::from_build_context(&database_id, field_revs, block_metas);
  357. // Create database
  358. tracing::trace!("Create new database: {}", database_id);
  359. let database_ops = make_database_operations(&database_rev);
  360. let database_bytes = database_ops.json_bytes();
  361. let revision = Revision::initial_revision(&database_id, database_bytes);
  362. database_manager
  363. .create_database(&database_id, &view_id, &name, vec![revision])
  364. .await?;
  365. // Create database view
  366. tracing::trace!("Create new database view: {}", view_id);
  367. let mut database_view_rev = if database_view_data.is_empty() {
  368. DatabaseViewRevision::new(database_id, view_id.to_owned(), true, name, layout.into())
  369. } else {
  370. DatabaseViewRevision::from_json(database_view_data)?
  371. };
  372. database_view_rev.layout_settings = layout_setting;
  373. let database_view_ops = make_database_view_operations(&database_view_rev);
  374. let database_view_bytes = database_view_ops.json_bytes();
  375. let revision = Revision::initial_revision(view_id, database_view_bytes);
  376. database_manager
  377. .create_database_view(view_id, vec![revision])
  378. .await?;
  379. Ok(())
  380. }
  381. impl DatabaseRefIndexerQuery for DatabaseRefs {
  382. fn get_ref_views(&self, database_id: &str) -> FlowyResult<Vec<DatabaseViewRef>> {
  383. self.get_ref_views_with_database(database_id)
  384. }
  385. }
  386. impl DatabaseRefIndexerQuery for Arc<DatabaseRefs> {
  387. fn get_ref_views(&self, database_id: &str) -> FlowyResult<Vec<DatabaseViewRef>> {
  388. (**self).get_ref_views(database_id)
  389. }
  390. }