manager.rs 10 KB


  1. use std::collections::HashMap;
  2. use std::ops::Deref;
  3. use std::sync::Arc;
  4. use appflowy_integrate::collab_builder::AppFlowyCollabBuilder;
  5. use appflowy_integrate::{CollabPersistenceConfig, RocksCollabDB};
  6. use collab::core::collab::MutexCollab;
  7. use collab_database::database::DatabaseData;
  8. use collab_database::user::{DatabaseCollabBuilder, UserDatabase as InnerUserDatabase};
  9. use collab_database::views::{CreateDatabaseParams, CreateViewParams, DatabaseLayout};
  10. use parking_lot::Mutex;
  11. use tokio::sync::RwLock;
  12. use flowy_error::{internal_error, FlowyError, FlowyResult};
  13. use flowy_task::TaskDispatcher;
  14. use crate::deps::{DatabaseCloudService, DatabaseUser2};
  15. use crate::entities::{
  16. DatabaseDescriptionPB, DatabaseLayoutPB, DatabaseSnapshotPB, RepeatedDatabaseDescriptionPB,
  17. };
  18. use crate::services::database::{DatabaseEditor, MutexDatabase};
  19. use crate::services::database_view::DatabaseLayoutDepsResolver;
  20. use crate::services::share::csv::{CSVFormat, CSVImporter, ImportResult};
  21. pub struct DatabaseManager2 {
  22. user: Arc<dyn DatabaseUser2>,
  23. user_database: UserDatabase,
  24. task_scheduler: Arc<RwLock<TaskDispatcher>>,
  25. editors: RwLock<HashMap<String, Arc<DatabaseEditor>>>,
  26. collab_builder: Arc<AppFlowyCollabBuilder>,
  27. cloud_service: Arc<dyn DatabaseCloudService>,
  28. }
  29. impl DatabaseManager2 {
  30. pub fn new(
  31. database_user: Arc<dyn DatabaseUser2>,
  32. task_scheduler: Arc<RwLock<TaskDispatcher>>,
  33. collab_builder: Arc<AppFlowyCollabBuilder>,
  34. cloud_service: Arc<dyn DatabaseCloudService>,
  35. ) -> Self {
  36. Self {
  37. user: database_user,
  38. user_database: UserDatabase::default(),
  39. task_scheduler,
  40. editors: Default::default(),
  41. collab_builder,
  42. cloud_service,
  43. }
  44. }
  45. pub async fn initialize(&self, user_id: i64) -> FlowyResult<()> {
  46. let config = CollabPersistenceConfig::new().snapshot_per_update(10);
  47. let db = self.user.collab_db()?;
  48. *self.user_database.lock() = Some(InnerUserDatabase::new(
  49. user_id,
  50. db,
  51. config,
  52. UserDatabaseCollabBuilderImpl(self.collab_builder.clone()),
  53. ));
  54. // do nothing
  55. Ok(())
  56. }
  57. pub async fn initialize_with_new_user(&self, user_id: i64, _token: &str) -> FlowyResult<()> {
  58. self.initialize(user_id).await?;
  59. Ok(())
  60. }
  61. pub async fn get_all_databases_description(&self) -> RepeatedDatabaseDescriptionPB {
  62. let databases_description = self.with_user_database(vec![], |database| {
  63. database
  64. .get_all_databases()
  65. .into_iter()
  66. .map(DatabaseDescriptionPB::from)
  67. .collect()
  68. });
  69. RepeatedDatabaseDescriptionPB {
  70. items: databases_description,
  71. }
  72. }
  73. pub async fn get_database_with_view_id(&self, view_id: &str) -> FlowyResult<Arc<DatabaseEditor>> {
  74. let database_id = self.get_database_id_with_view_id(view_id).await?;
  75. self.get_database(&database_id).await
  76. }
  77. pub async fn get_database_id_with_view_id(&self, view_id: &str) -> FlowyResult<String> {
  78. let database_id = self.with_user_database(Err(FlowyError::internal()), |database| {
  79. database
  80. .get_database_id_with_view_id(view_id)
  81. .ok_or_else(FlowyError::record_not_found)
  82. })?;
  83. Ok(database_id)
  84. }
  85. pub async fn get_database(&self, database_id: &str) -> FlowyResult<Arc<DatabaseEditor>> {
  86. if let Some(editor) = self.editors.read().await.get(database_id) {
  87. return Ok(editor.clone());
  88. }
  89. self.open_database(database_id).await
  90. }
  91. pub async fn open_database(&self, database_id: &str) -> FlowyResult<Arc<DatabaseEditor>> {
  92. tracing::trace!("create new editor for database {}", database_id);
  93. let mut editors = self.editors.write().await;
  94. let database = MutexDatabase::new(self.with_user_database(
  95. Err(FlowyError::record_not_found()),
  96. |database| {
  97. database
  98. .get_database(database_id)
  99. .ok_or_else(FlowyError::record_not_found)
  100. },
  101. )?);
  102. let editor = Arc::new(DatabaseEditor::new(database, self.task_scheduler.clone()).await?);
  103. editors.insert(database_id.to_string(), editor.clone());
  104. Ok(editor)
  105. }
  106. #[tracing::instrument(level = "debug", skip_all)]
  107. pub async fn close_database_view<T: AsRef<str>>(&self, view_id: T) -> FlowyResult<()> {
  108. // TODO(natan): defer closing the database if the sync is not finished
  109. let view_id = view_id.as_ref();
  110. let database_id = self.with_user_database(None, |databases| {
  111. let database_id = databases.get_database_id_with_view_id(view_id);
  112. if database_id.is_some() {
  113. databases.close_database(database_id.as_ref().unwrap());
  114. }
  115. database_id
  116. });
  117. if let Some(database_id) = database_id {
  118. let mut editors = self.editors.write().await;
  119. if let Some(editor) = editors.get(&database_id) {
  120. if editor.close_view_editor(view_id).await {
  121. editor.close().await;
  122. editors.remove(&database_id);
  123. }
  124. }
  125. }
  126. Ok(())
  127. }
  128. pub async fn delete_database_view(&self, view_id: &str) -> FlowyResult<()> {
  129. let database = self.get_database_with_view_id(view_id).await?;
  130. let _ = database.delete_database_view(view_id).await?;
  131. Ok(())
  132. }
  133. pub async fn duplicate_database(&self, view_id: &str) -> FlowyResult<Vec<u8>> {
  134. let database_data = self.with_user_database(Err(FlowyError::internal()), |database| {
  135. let data = database.get_database_duplicated_data(view_id)?;
  136. let json_bytes = data.to_json_bytes()?;
  137. Ok(json_bytes)
  138. })?;
  139. Ok(database_data)
  140. }
  141. /// Create a new database with the given data that can be deserialized to [DatabaseData].
  142. #[tracing::instrument(level = "trace", skip_all, err)]
  143. pub async fn create_database_with_database_data(
  144. &self,
  145. view_id: &str,
  146. data: Vec<u8>,
  147. ) -> FlowyResult<()> {
  148. let mut database_data = DatabaseData::from_json_bytes(data)?;
  149. database_data.view.id = view_id.to_string();
  150. self.with_user_database(
  151. Err(FlowyError::internal().context("Create database with data failed")),
  152. |database| {
  153. let database = database.create_database_with_data(database_data)?;
  154. Ok(database)
  155. },
  156. )?;
  157. Ok(())
  158. }
  159. pub async fn create_database_with_params(&self, params: CreateDatabaseParams) -> FlowyResult<()> {
  160. let _ = self.with_user_database(
  161. Err(FlowyError::internal().context("Create database with params failed")),
  162. |user_database| {
  163. let database = user_database.create_database(params)?;
  164. Ok(database)
  165. },
  166. )?;
  167. Ok(())
  168. }
  169. /// A linked view is a view that is linked to existing database.
  170. #[tracing::instrument(level = "trace", skip(self), err)]
  171. pub async fn create_linked_view(
  172. &self,
  173. name: String,
  174. layout: DatabaseLayout,
  175. database_id: String,
  176. database_view_id: String,
  177. ) -> FlowyResult<()> {
  178. self.with_user_database(
  179. Err(FlowyError::internal().context("Create database view failed")),
  180. |user_database| {
  181. let mut params = CreateViewParams::new(database_id.clone(), database_view_id, name, layout);
  182. if let Some(database) = user_database.get_database(&database_id) {
  183. if let Some((field, layout_setting)) = DatabaseLayoutDepsResolver::new(database, layout)
  184. .resolve_deps_when_create_database_linked_view()
  185. {
  186. params = params
  187. .with_deps_fields(vec![field])
  188. .with_layout_setting(layout_setting);
  189. }
  190. };
  191. user_database.create_database_linked_view(params)?;
  192. Ok(())
  193. },
  194. )?;
  195. Ok(())
  196. }
  197. pub async fn import_csv(
  198. &self,
  199. view_id: String,
  200. content: String,
  201. format: CSVFormat,
  202. ) -> FlowyResult<ImportResult> {
  203. let params = tokio::task::spawn_blocking(move || {
  204. CSVImporter.import_csv_from_string(view_id, content, format)
  205. })
  206. .await
  207. .map_err(internal_error)??;
  208. let result = ImportResult {
  209. database_id: params.database_id.clone(),
  210. view_id: params.view_id.clone(),
  211. };
  212. self.create_database_with_params(params).await?;
  213. Ok(result)
  214. }
  215. // will implement soon
  216. pub async fn import_csv_from_file(
  217. &self,
  218. _file_path: String,
  219. _format: CSVFormat,
  220. ) -> FlowyResult<()> {
  221. Ok(())
  222. }
  223. pub async fn export_csv(&self, view_id: &str, style: CSVFormat) -> FlowyResult<String> {
  224. let database = self.get_database_with_view_id(view_id).await?;
  225. database.export_csv(style).await
  226. }
  227. pub async fn update_database_layout(
  228. &self,
  229. view_id: &str,
  230. layout: DatabaseLayoutPB,
  231. ) -> FlowyResult<()> {
  232. let database = self.get_database_with_view_id(view_id).await?;
  233. database.update_view_layout(view_id, layout.into()).await
  234. }
  235. pub async fn get_database_snapshots(
  236. &self,
  237. view_id: &str,
  238. ) -> FlowyResult<Vec<DatabaseSnapshotPB>> {
  239. let database_id = self.get_database_id_with_view_id(view_id).await?;
  240. let mut snapshots = vec![];
  241. if let Some(snapshot) = self
  242. .cloud_service
  243. .get_database_latest_snapshot(&database_id)
  244. .await?
  245. .map(|snapshot| DatabaseSnapshotPB {
  246. snapshot_id: snapshot.snapshot_id,
  247. snapshot_desc: "".to_string(),
  248. created_at: snapshot.created_at,
  249. data: snapshot.data,
  250. })
  251. {
  252. snapshots.push(snapshot);
  253. }
  254. Ok(snapshots)
  255. }
  256. fn with_user_database<F, Output>(&self, default_value: Output, f: F) -> Output
  257. where
  258. F: FnOnce(&InnerUserDatabase) -> Output,
  259. {
  260. let database = self.user_database.lock();
  261. match &*database {
  262. None => default_value,
  263. Some(folder) => f(folder),
  264. }
  265. }
  266. /// Only expose this method for testing
  267. #[cfg(debug_assertions)]
  268. pub fn get_cloud_service(&self) -> &Arc<dyn DatabaseCloudService> {
  269. &self.cloud_service
  270. }
  271. }
  272. #[derive(Clone, Default)]
  273. pub struct UserDatabase(Arc<Mutex<Option<InnerUserDatabase>>>);
  274. impl Deref for UserDatabase {
  275. type Target = Arc<Mutex<Option<InnerUserDatabase>>>;
  276. fn deref(&self) -> &Self::Target {
  277. &self.0
  278. }
  279. }
  280. unsafe impl Sync for UserDatabase {}
  281. unsafe impl Send for UserDatabase {}
  282. struct UserDatabaseCollabBuilderImpl(Arc<AppFlowyCollabBuilder>);
  283. impl DatabaseCollabBuilder for UserDatabaseCollabBuilderImpl {
  284. fn build_with_config(
  285. &self,
  286. uid: i64,
  287. object_id: &str,
  288. object_name: &str,
  289. db: Arc<RocksCollabDB>,
  290. config: &CollabPersistenceConfig,
  291. ) -> Arc<MutexCollab> {
  292. self
  293. .0
  294. .build_with_config(uid, object_id, object_name, db, config)
  295. }
  296. }