mod.rs 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. mod migration;
  2. pub mod rev_sqlite;
  3. pub mod version_1;
  4. mod version_2;
  5. use crate::services::persistence::rev_sqlite::SQLiteFolderRevisionPersistence;
  6. use crate::{
  7. event_map::WorkspaceDatabase,
  8. manager::FolderId,
  9. services::{folder_editor::FolderEditor, persistence::migration::FolderMigration},
  10. };
  11. use flowy_client_sync::client_folder::{FolderOperationsBuilder, FolderPad};
  12. use flowy_database::ConnectionPool;
  13. use flowy_error::{FlowyError, FlowyResult};
  14. use flowy_revision_persistence::{RevisionDiskCache, RevisionState, SyncRecord};
  15. use folder_model::{AppRevision, TrashRevision, ViewRevision, WorkspaceRevision};
  16. use revision_model::Revision;
  17. use std::sync::Arc;
  18. use tokio::sync::RwLock;
  19. pub use version_1::{app_sql::*, trash_sql::*, v1_impl::V1Transaction, view_sql::*, workspace_sql::*};
  20. pub trait FolderPersistenceTransaction {
  21. fn create_workspace(&self, user_id: &str, workspace_rev: WorkspaceRevision) -> FlowyResult<()>;
  22. fn read_workspaces(&self, user_id: &str, workspace_id: Option<String>) -> FlowyResult<Vec<WorkspaceRevision>>;
  23. fn update_workspace(&self, changeset: WorkspaceChangeset) -> FlowyResult<()>;
  24. fn delete_workspace(&self, workspace_id: &str) -> FlowyResult<()>;
  25. fn create_app(&self, app_rev: AppRevision) -> FlowyResult<()>;
  26. fn update_app(&self, changeset: AppChangeset) -> FlowyResult<()>;
  27. fn read_app(&self, app_id: &str) -> FlowyResult<AppRevision>;
  28. fn read_workspace_apps(&self, workspace_id: &str) -> FlowyResult<Vec<AppRevision>>;
  29. fn delete_app(&self, app_id: &str) -> FlowyResult<AppRevision>;
  30. fn move_app(&self, app_id: &str, from: usize, to: usize) -> FlowyResult<()>;
  31. fn create_view(&self, view_rev: ViewRevision) -> FlowyResult<()>;
  32. fn read_view(&self, view_id: &str) -> FlowyResult<ViewRevision>;
  33. fn read_views(&self, belong_to_id: &str) -> FlowyResult<Vec<ViewRevision>>;
  34. fn update_view(&self, changeset: ViewChangeset) -> FlowyResult<()>;
  35. fn delete_view(&self, view_id: &str) -> FlowyResult<ViewRevision>;
  36. fn move_view(&self, view_id: &str, from: usize, to: usize) -> FlowyResult<()>;
  37. fn create_trash(&self, trashes: Vec<TrashRevision>) -> FlowyResult<()>;
  38. fn read_trash(&self, trash_id: Option<String>) -> FlowyResult<Vec<TrashRevision>>;
  39. fn delete_trash(&self, trash_ids: Option<Vec<String>>) -> FlowyResult<()>;
  40. }
  41. pub struct FolderPersistence {
  42. database: Arc<dyn WorkspaceDatabase>,
  43. folder_editor: Arc<RwLock<Option<Arc<FolderEditor>>>>,
  44. }
  45. impl FolderPersistence {
  46. pub fn new(database: Arc<dyn WorkspaceDatabase>, folder_editor: Arc<RwLock<Option<Arc<FolderEditor>>>>) -> Self {
  47. Self {
  48. database,
  49. folder_editor,
  50. }
  51. }
  52. #[deprecated(
  53. since = "0.0.3",
  54. note = "please use `begin_transaction` instead, this interface will be removed in the future"
  55. )]
  56. #[allow(dead_code)]
  57. pub fn begin_transaction_v_1<F, O>(&self, f: F) -> FlowyResult<O>
  58. where
  59. F: for<'a> FnOnce(Box<dyn FolderPersistenceTransaction + 'a>) -> FlowyResult<O>,
  60. {
  61. //[[immediate_transaction]]
  62. // https://sqlite.org/lang_transaction.html
  63. // IMMEDIATE cause the database connection to start a new write immediately,
  64. // without waiting for a write statement. The BEGIN IMMEDIATE might fail
  65. // with SQLITE_BUSY if another write transaction is already active on another
  66. // database connection.
  67. //
  68. // EXCLUSIVE is similar to IMMEDIATE in that a write transaction is started
  69. // immediately. EXCLUSIVE and IMMEDIATE are the same in WAL mode, but in
  70. // other journaling modes, EXCLUSIVE prevents other database connections from
  71. // reading the database while the transaction is underway.
  72. let conn = self.database.db_connection()?;
  73. conn.immediate_transaction::<_, FlowyError, _>(|| f(Box::new(V1Transaction(&conn))))
  74. }
  75. pub async fn begin_transaction<F, O>(&self, f: F) -> FlowyResult<O>
  76. where
  77. F: FnOnce(Arc<dyn FolderPersistenceTransaction>) -> FlowyResult<O>,
  78. {
  79. match self.folder_editor.read().await.clone() {
  80. None => Err(FlowyError::internal().context("FolderEditor should be initialized after user login in.")),
  81. Some(editor) => f(editor),
  82. }
  83. }
  84. pub fn db_pool(&self) -> FlowyResult<Arc<ConnectionPool>> {
  85. self.database.db_pool()
  86. }
  87. pub async fn initialize(&self, user_id: &str, folder_id: &FolderId) -> FlowyResult<()> {
  88. let migrations = FolderMigration::new(user_id, self.database.clone());
  89. if let Some(migrated_folder) = migrations.run_v1_migration()? {
  90. self.save_folder(user_id, folder_id, migrated_folder).await?;
  91. }
  92. migrations.run_v2_migration(folder_id).await?;
  93. migrations.run_v3_migration(folder_id).await?;
  94. Ok(())
  95. }
  96. pub async fn save_folder(&self, user_id: &str, folder_id: &FolderId, folder: FolderPad) -> FlowyResult<()> {
  97. let pool = self.database.db_pool()?;
  98. let json = folder.to_json()?;
  99. let delta_data = FolderOperationsBuilder::new().insert(&json).build().json_bytes();
  100. let revision = Revision::initial_revision(folder_id.as_ref(), delta_data);
  101. let record = SyncRecord {
  102. revision,
  103. state: RevisionState::Sync,
  104. write_to_disk: true,
  105. };
  106. let disk_cache = make_folder_revision_disk_cache(user_id, pool);
  107. disk_cache.delete_and_insert_records(folder_id.as_ref(), None, vec![record])
  108. }
  109. }
  110. pub fn make_folder_revision_disk_cache(
  111. user_id: &str,
  112. pool: Arc<ConnectionPool>,
  113. ) -> Arc<dyn RevisionDiskCache<Arc<ConnectionPool>, Error = FlowyError>> {
  114. Arc::new(SQLiteFolderRevisionPersistence::new(user_id, pool))
  115. }