manager.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. use crate::entities::LayoutTypePB;
  2. use crate::services::database::{
  3. make_database_block_rev_manager, DatabaseEditor, DatabaseRefIndexerQuery,
  4. DatabaseRevisionCloudService, DatabaseRevisionMergeable, DatabaseRevisionSerde,
  5. };
  6. use crate::services::database_view::{
  7. make_database_view_rev_manager, make_database_view_revision_pad, DatabaseViewEditor,
  8. };
  9. use crate::services::persistence::block_index::BlockRowIndexer;
  10. use crate::services::persistence::database_ref::{DatabaseInfo, DatabaseRef, DatabaseRefIndexer};
  11. use crate::services::persistence::kv::DatabaseKVPersistence;
  12. use crate::services::persistence::migration::DatabaseMigration;
  13. use crate::services::persistence::rev_sqlite::{
  14. SQLiteDatabaseRevisionPersistence, SQLiteDatabaseRevisionSnapshotPersistence,
  15. };
  16. use crate::services::persistence::DatabaseDBConnection;
  17. use std::collections::HashMap;
  18. use database_model::{
  19. gen_database_id, BuildDatabaseContext, DatabaseRevision, DatabaseViewRevision,
  20. };
  21. use flowy_client_sync::client_database::{
  22. make_database_block_operations, make_database_operations, make_database_view_operations,
  23. };
  24. use flowy_error::{FlowyError, FlowyResult};
  25. use flowy_revision::{
  26. RevisionManager, RevisionPersistence, RevisionPersistenceConfiguration, RevisionWebSocket,
  27. };
  28. use flowy_sqlite::ConnectionPool;
  29. use flowy_task::TaskDispatcher;
  30. use revision_model::Revision;
  31. use std::sync::Arc;
  32. use tokio::sync::RwLock;
  33. pub trait DatabaseUser: Send + Sync {
  34. fn user_id(&self) -> Result<String, FlowyError>;
  35. fn token(&self) -> Result<String, FlowyError>;
  36. fn db_pool(&self) -> Result<Arc<ConnectionPool>, FlowyError>;
  37. }
  38. pub struct DatabaseManager {
  39. editors_by_database_id: RwLock<HashMap<String, Arc<DatabaseEditor>>>,
  40. database_user: Arc<dyn DatabaseUser>,
  41. block_indexer: Arc<BlockRowIndexer>,
  42. database_ref_indexer: Arc<DatabaseRefIndexer>,
  43. #[allow(dead_code)]
  44. kv_persistence: Arc<DatabaseKVPersistence>,
  45. task_scheduler: Arc<RwLock<TaskDispatcher>>,
  46. #[allow(dead_code)]
  47. migration: DatabaseMigration,
  48. }
  49. impl DatabaseManager {
  50. pub fn new(
  51. database_user: Arc<dyn DatabaseUser>,
  52. _rev_web_socket: Arc<dyn RevisionWebSocket>,
  53. task_scheduler: Arc<RwLock<TaskDispatcher>>,
  54. database_db: Arc<dyn DatabaseDBConnection>,
  55. ) -> Self {
  56. let editors_by_database_id = RwLock::new(HashMap::new());
  57. let kv_persistence = Arc::new(DatabaseKVPersistence::new(database_db.clone()));
  58. let block_indexer = Arc::new(BlockRowIndexer::new(database_db.clone()));
  59. let database_ref_indexer = Arc::new(DatabaseRefIndexer::new(database_db.clone()));
  60. let migration = DatabaseMigration::new(
  61. database_user.clone(),
  62. database_db,
  63. database_ref_indexer.clone(),
  64. );
  65. Self {
  66. editors_by_database_id,
  67. database_user,
  68. kv_persistence,
  69. block_indexer,
  70. database_ref_indexer,
  71. task_scheduler,
  72. migration,
  73. }
  74. }
  75. pub async fn initialize_with_new_user(&self, user_id: &str, _token: &str) -> FlowyResult<()> {
  76. self.migration.run(user_id).await?;
  77. Ok(())
  78. }
  79. pub async fn initialize(&self, user_id: &str, _token: &str) -> FlowyResult<()> {
  80. self.migration.run(user_id).await?;
  81. Ok(())
  82. }
  83. #[tracing::instrument(level = "debug", skip_all, err)]
  84. pub async fn create_database<T: AsRef<str>>(
  85. &self,
  86. database_id: &str,
  87. view_id: T,
  88. name: &str,
  89. revisions: Vec<Revision>,
  90. ) -> FlowyResult<()> {
  91. let db_pool = self.database_user.db_pool()?;
  92. let _ = self
  93. .database_ref_indexer
  94. .bind(database_id, view_id.as_ref(), true, name);
  95. let rev_manager = self.make_database_rev_manager(database_id, db_pool)?;
  96. rev_manager.reset_object(revisions).await?;
  97. Ok(())
  98. }
  99. #[tracing::instrument(level = "debug", skip_all, err)]
  100. async fn create_database_view<T: AsRef<str>>(
  101. &self,
  102. view_id: T,
  103. revisions: Vec<Revision>,
  104. ) -> FlowyResult<()> {
  105. let view_id = view_id.as_ref();
  106. let rev_manager = make_database_view_rev_manager(&self.database_user, view_id).await?;
  107. rev_manager.reset_object(revisions).await?;
  108. Ok(())
  109. }
  110. #[tracing::instrument(level = "debug", skip_all, err)]
  111. pub async fn create_database_block<T: AsRef<str>>(
  112. &self,
  113. block_id: T,
  114. revisions: Vec<Revision>,
  115. ) -> FlowyResult<()> {
  116. let block_id = block_id.as_ref();
  117. let rev_manager = make_database_block_rev_manager(&self.database_user, block_id)?;
  118. rev_manager.reset_object(revisions).await?;
  119. Ok(())
  120. }
  121. pub async fn open_database_view<T: AsRef<str>>(
  122. &self,
  123. view_id: T,
  124. ) -> FlowyResult<Arc<DatabaseEditor>> {
  125. let view_id = view_id.as_ref();
  126. let database_info = self.database_ref_indexer.get_database_with_view(view_id)?;
  127. self
  128. .get_or_create_database_editor(&database_info.database_id, view_id)
  129. .await
  130. }
  131. pub async fn close_database_view<T: AsRef<str>>(&self, view_id: T) -> FlowyResult<()> {
  132. let view_id = view_id.as_ref();
  133. let database_info = self.database_ref_indexer.get_database_with_view(view_id)?;
  134. tracing::Span::current().record("database_id", &database_info.database_id);
  135. let mut should_remove_editor = false;
  136. if let Some(database_editor) = self
  137. .editors_by_database_id
  138. .write()
  139. .await
  140. .get(&database_info.database_id)
  141. {
  142. database_editor.close_view_editor(view_id).await;
  143. should_remove_editor = database_editor.number_of_ref_views().await == 0;
  144. if should_remove_editor {
  145. database_editor.dispose().await;
  146. }
  147. }
  148. if should_remove_editor {
  149. tracing::debug!("Close database base editor: {}", database_info.database_id);
  150. self
  151. .editors_by_database_id
  152. .write()
  153. .await
  154. .remove(&database_info.database_id);
  155. }
  156. Ok(())
  157. }
  158. // #[tracing::instrument(level = "debug", skip(self), err)]
  159. pub async fn get_database_editor(&self, view_id: &str) -> FlowyResult<Arc<DatabaseEditor>> {
  160. let database_info = self.database_ref_indexer.get_database_with_view(view_id)?;
  161. let database_editor = self
  162. .editors_by_database_id
  163. .read()
  164. .await
  165. .get(&database_info.database_id)
  166. .cloned();
  167. match database_editor {
  168. None => {
  169. // Drop the read_guard ASAP in case of the following read/write lock
  170. self.open_database_view(view_id).await
  171. },
  172. Some(editor) => Ok(editor),
  173. }
  174. }
  175. pub async fn get_databases(&self) -> FlowyResult<Vec<DatabaseInfo>> {
  176. self.database_ref_indexer.get_all_databases()
  177. }
  178. pub async fn get_database_ref_views(&self, database_id: &str) -> FlowyResult<Vec<DatabaseRef>> {
  179. self
  180. .database_ref_indexer
  181. .get_ref_views_with_database(database_id)
  182. }
  183. async fn get_or_create_database_editor(
  184. &self,
  185. database_id: &str,
  186. view_id: &str,
  187. ) -> FlowyResult<Arc<DatabaseEditor>> {
  188. if let Some(database_editor) = self.editors_by_database_id.read().await.get(database_id) {
  189. let user_id = self.database_user.user_id()?;
  190. let (view_pad, view_rev_manager) =
  191. make_database_view_revision_pad(view_id, self.database_user.clone()).await?;
  192. let view_editor = DatabaseViewEditor::from_pad(
  193. &user_id,
  194. database_editor.database_view_data.clone(),
  195. database_editor.cell_data_cache.clone(),
  196. view_rev_manager,
  197. view_pad,
  198. )
  199. .await?;
  200. database_editor.open_view_editor(view_editor).await;
  201. return Ok(database_editor.clone());
  202. }
  203. // Lock the database_editors
  204. let mut editors_by_database_id = self.editors_by_database_id.write().await;
  205. let db_pool = self.database_user.db_pool()?;
  206. let editor = self.make_database_rev_editor(view_id, db_pool).await?;
  207. editors_by_database_id.insert(database_id.to_string(), editor.clone());
  208. Ok(editor)
  209. }
  210. #[tracing::instrument(level = "trace", skip(self, pool), err)]
  211. async fn make_database_rev_editor(
  212. &self,
  213. view_id: &str,
  214. pool: Arc<ConnectionPool>,
  215. ) -> Result<Arc<DatabaseEditor>, FlowyError> {
  216. let user = self.database_user.clone();
  217. tracing::debug!("Open database view: {}", view_id);
  218. let (base_view_pad, base_view_rev_manager) =
  219. make_database_view_revision_pad(view_id, user.clone()).await?;
  220. let mut database_id = base_view_pad.database_id.clone();
  221. tracing::debug!("Open database: {}", database_id);
  222. if database_id.is_empty() {
  223. // Before the database_id concept comes up, we used the view_id directly. So if
  224. // the database_id is empty, which means we can used the view_id. After the version 0.1.1,
  225. // we start to used the database_id that enables binding different views to the same database.
  226. database_id = view_id.to_owned();
  227. }
  228. let token = user.token()?;
  229. let cloud = Arc::new(DatabaseRevisionCloudService::new(token));
  230. let mut rev_manager = self.make_database_rev_manager(&database_id, pool.clone())?;
  231. let database_pad = Arc::new(RwLock::new(
  232. rev_manager
  233. .initialize::<DatabaseRevisionSerde>(Some(cloud))
  234. .await?,
  235. ));
  236. let user_id = user.user_id()?;
  237. let database_editor = DatabaseEditor::new(
  238. &database_id,
  239. user,
  240. database_pad,
  241. rev_manager,
  242. self.block_indexer.clone(),
  243. self.database_ref_indexer.clone(),
  244. self.task_scheduler.clone(),
  245. )
  246. .await?;
  247. let base_view_editor = DatabaseViewEditor::from_pad(
  248. &user_id,
  249. database_editor.database_view_data.clone(),
  250. database_editor.cell_data_cache.clone(),
  251. base_view_rev_manager,
  252. base_view_pad,
  253. )
  254. .await?;
  255. database_editor.open_view_editor(base_view_editor).await;
  256. Ok(database_editor)
  257. }
  258. #[tracing::instrument(level = "trace", skip(self, pool), err)]
  259. pub fn make_database_rev_manager(
  260. &self,
  261. database_id: &str,
  262. pool: Arc<ConnectionPool>,
  263. ) -> FlowyResult<RevisionManager<Arc<ConnectionPool>>> {
  264. let user_id = self.database_user.user_id()?;
  265. // Create revision persistence
  266. let disk_cache = SQLiteDatabaseRevisionPersistence::new(&user_id, pool.clone());
  267. let configuration = RevisionPersistenceConfiguration::new(6, false);
  268. let rev_persistence =
  269. RevisionPersistence::new(&user_id, database_id, disk_cache, configuration);
  270. // Create snapshot persistence
  271. const DATABASE_SP_PREFIX: &str = "grid";
  272. let snapshot_object_id = format!("{}:{}", DATABASE_SP_PREFIX, database_id);
  273. let snapshot_persistence =
  274. SQLiteDatabaseRevisionSnapshotPersistence::new(&snapshot_object_id, pool);
  275. let rev_compress = DatabaseRevisionMergeable();
  276. let rev_manager = RevisionManager::new(
  277. &user_id,
  278. database_id,
  279. rev_persistence,
  280. rev_compress,
  281. snapshot_persistence,
  282. );
  283. Ok(rev_manager)
  284. }
  285. }
  286. pub async fn link_existing_database(
  287. view_id: &str,
  288. name: String,
  289. database_id: &str,
  290. layout: LayoutTypePB,
  291. database_manager: Arc<DatabaseManager>,
  292. ) -> FlowyResult<()> {
  293. tracing::trace!(
  294. "Link database view: {} with database: {}",
  295. view_id,
  296. database_id
  297. );
  298. let database_view_rev = DatabaseViewRevision::new(
  299. database_id.to_string(),
  300. view_id.to_owned(),
  301. false,
  302. name.clone(),
  303. layout.into(),
  304. );
  305. let database_view_ops = make_database_view_operations(&database_view_rev);
  306. let database_view_bytes = database_view_ops.json_bytes();
  307. let revision = Revision::initial_revision(view_id, database_view_bytes);
  308. database_manager
  309. .create_database_view(view_id, vec![revision])
  310. .await?;
  311. let _ = database_manager
  312. .database_ref_indexer
  313. .bind(database_id, view_id, false, &name);
  314. Ok(())
  315. }
  316. pub async fn create_new_database(
  317. view_id: &str,
  318. name: String,
  319. layout: LayoutTypePB,
  320. database_manager: Arc<DatabaseManager>,
  321. build_context: BuildDatabaseContext,
  322. ) -> FlowyResult<()> {
  323. let BuildDatabaseContext {
  324. field_revs,
  325. block_metas,
  326. blocks,
  327. database_view_data,
  328. } = build_context;
  329. for block_meta_data in &blocks {
  330. let block_id = &block_meta_data.block_id;
  331. // Indexing the block's rows
  332. block_meta_data.rows.iter().for_each(|row| {
  333. let _ = database_manager
  334. .block_indexer
  335. .insert(&row.block_id, &row.id);
  336. });
  337. // Create database's block
  338. let database_block_ops = make_database_block_operations(block_meta_data);
  339. let database_block_bytes = database_block_ops.json_bytes();
  340. let revision = Revision::initial_revision(block_id, database_block_bytes);
  341. database_manager
  342. .create_database_block(&block_id, vec![revision])
  343. .await?;
  344. }
  345. let database_id = gen_database_id();
  346. let database_rev = DatabaseRevision::from_build_context(&database_id, field_revs, block_metas);
  347. // Create database
  348. tracing::trace!("Create new database: {}", database_id);
  349. let database_ops = make_database_operations(&database_rev);
  350. let database_bytes = database_ops.json_bytes();
  351. let revision = Revision::initial_revision(&database_id, database_bytes);
  352. database_manager
  353. .create_database(&database_id, &view_id, &name, vec![revision])
  354. .await?;
  355. // Create database view
  356. tracing::trace!("Create new database view: {}", view_id);
  357. let database_view_rev = if database_view_data.is_empty() {
  358. DatabaseViewRevision::new(database_id, view_id.to_owned(), true, name, layout.into())
  359. } else {
  360. DatabaseViewRevision::from_json(database_view_data)?
  361. };
  362. let database_view_ops = make_database_view_operations(&database_view_rev);
  363. let database_view_bytes = database_view_ops.json_bytes();
  364. let revision = Revision::initial_revision(view_id, database_view_bytes);
  365. database_manager
  366. .create_database_view(view_id, vec![revision])
  367. .await?;
  368. Ok(())
  369. }
  370. impl DatabaseRefIndexerQuery for DatabaseRefIndexer {
  371. fn get_ref_views(&self, database_id: &str) -> FlowyResult<Vec<DatabaseRef>> {
  372. self.get_ref_views_with_database(database_id)
  373. }
  374. }
  375. impl DatabaseRefIndexerQuery for Arc<DatabaseRefIndexer> {
  376. fn get_ref_views(&self, database_id: &str) -> FlowyResult<Vec<DatabaseRef>> {
  377. (**self).get_ref_views(database_id)
  378. }
  379. }