manager.rs 8.4 KB


  1. use std::collections::HashMap;
  2. use std::ops::Deref;
  3. use std::sync::Arc;
  4. use appflowy_integrate::collab_builder::AppFlowyCollabBuilder;
  5. use appflowy_integrate::{CollabPersistenceConfig, RocksCollabDB};
  6. use collab::core::collab::MutexCollab;
  7. use collab_database::database::DatabaseData;
  8. use collab_database::user::{DatabaseCollabBuilder, UserDatabase as InnerUserDatabase};
  9. use collab_database::views::{CreateDatabaseParams, CreateViewParams};
  10. use parking_lot::Mutex;
  11. use tokio::sync::RwLock;
  12. use flowy_error::{internal_error, FlowyError, FlowyResult};
  13. use flowy_task::TaskDispatcher;
  14. use crate::entities::{DatabaseDescriptionPB, DatabaseLayoutPB, RepeatedDatabaseDescriptionPB};
  15. use crate::services::database::{DatabaseEditor, MutexDatabase};
  16. use crate::services::share::csv::{CSVFormat, CSVImporter, ImportResult};
  17. pub trait DatabaseUser2: Send + Sync {
  18. fn user_id(&self) -> Result<i64, FlowyError>;
  19. fn token(&self) -> Result<Option<String>, FlowyError>;
  20. fn collab_db(&self) -> Result<Arc<RocksCollabDB>, FlowyError>;
  21. }
  22. pub struct DatabaseManager2 {
  23. user: Arc<dyn DatabaseUser2>,
  24. user_database: UserDatabase,
  25. task_scheduler: Arc<RwLock<TaskDispatcher>>,
  26. editors: RwLock<HashMap<String, Arc<DatabaseEditor>>>,
  27. collab_builder: Arc<AppFlowyCollabBuilder>,
  28. }
  29. impl DatabaseManager2 {
  30. pub fn new(
  31. database_user: Arc<dyn DatabaseUser2>,
  32. task_scheduler: Arc<RwLock<TaskDispatcher>>,
  33. collab_builder: Arc<AppFlowyCollabBuilder>,
  34. ) -> Self {
  35. Self {
  36. user: database_user,
  37. user_database: UserDatabase::default(),
  38. task_scheduler,
  39. editors: Default::default(),
  40. collab_builder,
  41. }
  42. }
  43. pub async fn initialize(&self, user_id: i64) -> FlowyResult<()> {
  44. let config = CollabPersistenceConfig::new().snapshot_per_update(10);
  45. let db = self.user.collab_db()?;
  46. *self.user_database.lock() = Some(InnerUserDatabase::new(
  47. user_id,
  48. db,
  49. config,
  50. UserDatabaseCollabBuilderImpl(self.collab_builder.clone()),
  51. ));
  52. // do nothing
  53. Ok(())
  54. }
  55. pub async fn initialize_with_new_user(&self, user_id: i64, _token: &str) -> FlowyResult<()> {
  56. self.initialize(user_id).await?;
  57. Ok(())
  58. }
  59. pub async fn get_all_databases_description(&self) -> RepeatedDatabaseDescriptionPB {
  60. let databases_description = self.with_user_database(vec![], |database| {
  61. database
  62. .get_all_databases()
  63. .into_iter()
  64. .map(DatabaseDescriptionPB::from)
  65. .collect()
  66. });
  67. RepeatedDatabaseDescriptionPB {
  68. items: databases_description,
  69. }
  70. }
  71. pub async fn get_database_with_view_id(&self, view_id: &str) -> FlowyResult<Arc<DatabaseEditor>> {
  72. let database_id = self.get_database_id_with_view_id(view_id).await?;
  73. self.get_database(&database_id).await
  74. }
  75. pub async fn get_database_id_with_view_id(&self, view_id: &str) -> FlowyResult<String> {
  76. let database_id = self.with_user_database(Err(FlowyError::internal()), |database| {
  77. database
  78. .get_database_id_with_view_id(view_id)
  79. .ok_or_else(FlowyError::record_not_found)
  80. })?;
  81. Ok(database_id)
  82. }
  83. pub async fn get_database(&self, database_id: &str) -> FlowyResult<Arc<DatabaseEditor>> {
  84. if let Some(editor) = self.editors.read().await.get(database_id) {
  85. return Ok(editor.clone());
  86. }
  87. tracing::trace!("create new editor for database {}", database_id);
  88. let mut editors = self.editors.write().await;
  89. let database = MutexDatabase::new(self.with_user_database(
  90. Err(FlowyError::record_not_found()),
  91. |database| {
  92. database
  93. .get_database(database_id)
  94. .ok_or_else(FlowyError::record_not_found)
  95. },
  96. )?);
  97. let editor = Arc::new(DatabaseEditor::new(database, self.task_scheduler.clone()).await?);
  98. editors.insert(database_id.to_string(), editor.clone());
  99. Ok(editor)
  100. }
  101. #[tracing::instrument(level = "debug", skip_all)]
  102. pub async fn close_database_view<T: AsRef<str>>(&self, view_id: T) -> FlowyResult<()> {
  103. let view_id = view_id.as_ref();
  104. let database_id = self.with_user_database(None, |database| {
  105. database.get_database_id_with_view_id(view_id)
  106. });
  107. if let Some(database_id) = database_id {
  108. let mut editors = self.editors.write().await;
  109. if let Some(editor) = editors.get(&database_id) {
  110. if editor.close_view_editor(view_id).await {
  111. editor.close().await;
  112. editors.remove(&database_id);
  113. }
  114. }
  115. }
  116. Ok(())
  117. }
  118. pub async fn delete_database_view(&self, view_id: &str) -> FlowyResult<()> {
  119. let database = self.get_database_with_view_id(view_id).await?;
  120. let _ = database.delete_database_view(view_id).await?;
  121. Ok(())
  122. }
  123. pub async fn duplicate_database(&self, view_id: &str) -> FlowyResult<Vec<u8>> {
  124. let database_data = self.with_user_database(Err(FlowyError::internal()), |database| {
  125. let data = database.get_database_duplicated_data(view_id)?;
  126. let json_bytes = data.to_json_bytes()?;
  127. Ok(json_bytes)
  128. })?;
  129. Ok(database_data)
  130. }
  131. #[tracing::instrument(level = "trace", skip_all, err)]
  132. pub async fn create_database_with_database_data(
  133. &self,
  134. view_id: &str,
  135. data: Vec<u8>,
  136. ) -> FlowyResult<()> {
  137. let mut database_data = DatabaseData::from_json_bytes(data)?;
  138. database_data.view.id = view_id.to_string();
  139. self.with_user_database(
  140. Err(FlowyError::internal().context("Create database with data failed")),
  141. |database| {
  142. let database = database.create_database_with_data(database_data)?;
  143. Ok(database)
  144. },
  145. )?;
  146. Ok(())
  147. }
  148. pub async fn create_database_with_params(&self, params: CreateDatabaseParams) -> FlowyResult<()> {
  149. let _ = self.with_user_database(
  150. Err(FlowyError::internal().context("Create database with params failed")),
  151. |user_database| {
  152. let database = user_database.create_database(params)?;
  153. Ok(database)
  154. },
  155. )?;
  156. Ok(())
  157. }
  158. #[tracing::instrument(level = "trace", skip(self), err)]
  159. pub async fn create_linked_view(
  160. &self,
  161. name: String,
  162. layout: DatabaseLayoutPB,
  163. database_id: String,
  164. database_view_id: String,
  165. ) -> FlowyResult<()> {
  166. self.with_user_database(
  167. Err(FlowyError::internal().context("Create database view failed")),
  168. |user_database| {
  169. let params = CreateViewParams::new(database_id, database_view_id, name, layout.into());
  170. user_database.create_database_linked_view(params)?;
  171. Ok(())
  172. },
  173. )?;
  174. Ok(())
  175. }
  176. pub async fn import_csv(
  177. &self,
  178. view_id: String,
  179. content: String,
  180. format: CSVFormat,
  181. ) -> FlowyResult<ImportResult> {
  182. let params = tokio::task::spawn_blocking(move || {
  183. CSVImporter.import_csv_from_string(view_id, content, format)
  184. })
  185. .await
  186. .map_err(internal_error)??;
  187. let result = ImportResult {
  188. database_id: params.database_id.clone(),
  189. view_id: params.view_id.clone(),
  190. };
  191. self.create_database_with_params(params).await?;
  192. Ok(result)
  193. }
  194. // will implement soon
  195. pub async fn import_csv_from_file(
  196. &self,
  197. _file_path: String,
  198. _format: CSVFormat,
  199. ) -> FlowyResult<()> {
  200. Ok(())
  201. }
  202. pub async fn export_csv(&self, view_id: &str, style: CSVFormat) -> FlowyResult<String> {
  203. let database = self.get_database_with_view_id(view_id).await?;
  204. database.export_csv(style).await
  205. }
  206. pub async fn update_database_layout(
  207. &self,
  208. view_id: &str,
  209. layout: DatabaseLayoutPB,
  210. ) -> FlowyResult<()> {
  211. let database = self.get_database_with_view_id(view_id).await?;
  212. database.update_view_layout(view_id, layout.into()).await
  213. }
  214. fn with_user_database<F, Output>(&self, default_value: Output, f: F) -> Output
  215. where
  216. F: FnOnce(&InnerUserDatabase) -> Output,
  217. {
  218. let database = self.user_database.lock();
  219. match &*database {
  220. None => default_value,
  221. Some(folder) => f(folder),
  222. }
  223. }
  224. }
  225. #[derive(Clone, Default)]
  226. pub struct UserDatabase(Arc<Mutex<Option<InnerUserDatabase>>>);
  227. impl Deref for UserDatabase {
  228. type Target = Arc<Mutex<Option<InnerUserDatabase>>>;
  229. fn deref(&self) -> &Self::Target {
  230. &self.0
  231. }
  232. }
  233. unsafe impl Sync for UserDatabase {}
  234. unsafe impl Send for UserDatabase {}
  235. struct UserDatabaseCollabBuilderImpl(Arc<AppFlowyCollabBuilder>);
  236. impl DatabaseCollabBuilder for UserDatabaseCollabBuilderImpl {
  237. fn build_with_config(
  238. &self,
  239. uid: i64,
  240. object_id: &str,
  241. object_name: &str,
  242. db: Arc<RocksCollabDB>,
  243. config: &CollabPersistenceConfig,
  244. ) -> Arc<MutexCollab> {
  245. self
  246. .0
  247. .build_with_config(uid, object_id, object_name, db, config)
  248. }
  249. }