Forráskód Böngészése

chore: save folder snapshot with custom config (#1670)

Nathan.fooo 2 éve
szülő
commit
a0b5f09b06

+ 2 - 0
frontend/rust-lib/flowy-database/migrations/2023-01-06-032859_folder_rev_snapshot/down.sql

@@ -0,0 +1,2 @@
+-- This file should undo anything in `up.sql`
+DROP TABLE folder_rev_snapshot;

+ 9 - 0
frontend/rust-lib/flowy-database/migrations/2023-01-06-032859_folder_rev_snapshot/up.sql

@@ -0,0 +1,9 @@
+-- Your SQL goes here
+CREATE TABLE folder_rev_snapshot (
+   snapshot_id TEXT NOT NULL PRIMARY KEY DEFAULT '',
+   object_id TEXT NOT NULL DEFAULT '',
+   rev_id BIGINT NOT NULL DEFAULT 0,
+   base_rev_id BIGINT NOT NULL DEFAULT 0,
+   timestamp BIGINT NOT NULL DEFAULT 0,
+   data BLOB NOT NULL DEFAULT (x'')
+);

+ 27 - 17
frontend/rust-lib/flowy-database/src/schema.rs

@@ -1,6 +1,4 @@
-// @generated automatically by Diesel CLI.
-
-diesel::table! {
+table! {
     app_table (id) {
         id -> Text,
         workspace_id -> Text,
@@ -15,7 +13,7 @@ diesel::table! {
     }
 }
 
-diesel::table! {
+table! {
     document_rev_table (id) {
         id -> Integer,
         document_id -> Text,
@@ -26,14 +24,25 @@ diesel::table! {
     }
 }
 
-diesel::table! {
+table! {
+    folder_rev_snapshot (snapshot_id) {
+        snapshot_id -> Text,
+        object_id -> Text,
+        rev_id -> BigInt,
+        base_rev_id -> BigInt,
+        timestamp -> BigInt,
+        data -> Binary,
+    }
+}
+
+table! {
     grid_block_index_table (row_id) {
         row_id -> Text,
         block_id -> Text,
     }
 }
 
-diesel::table! {
+table! {
     grid_meta_rev_table (id) {
         id -> Integer,
         object_id -> Text,
@@ -44,7 +53,7 @@ diesel::table! {
     }
 }
 
-diesel::table! {
+table! {
     grid_rev_snapshot (snapshot_id) {
         snapshot_id -> Text,
         object_id -> Text,
@@ -55,7 +64,7 @@ diesel::table! {
     }
 }
 
-diesel::table! {
+table! {
     grid_rev_table (id) {
         id -> Integer,
         object_id -> Text,
@@ -66,7 +75,7 @@ diesel::table! {
     }
 }
 
-diesel::table! {
+table! {
     grid_view_rev_table (id) {
         id -> Integer,
         object_id -> Text,
@@ -77,14 +86,14 @@ diesel::table! {
     }
 }
 
-diesel::table! {
+table! {
     kv_table (key) {
         key -> Text,
         value -> Binary,
     }
 }
 
-diesel::table! {
+table! {
     rev_snapshot (id) {
         id -> Integer,
         object_id -> Text,
@@ -93,7 +102,7 @@ diesel::table! {
     }
 }
 
-diesel::table! {
+table! {
     rev_table (id) {
         id -> Integer,
         doc_id -> Text,
@@ -105,7 +114,7 @@ diesel::table! {
     }
 }
 
-diesel::table! {
+table! {
     trash_table (id) {
         id -> Text,
         name -> Text,
@@ -116,7 +125,7 @@ diesel::table! {
     }
 }
 
-diesel::table! {
+table! {
     user_table (id) {
         id -> Text,
         name -> Text,
@@ -127,7 +136,7 @@ diesel::table! {
     }
 }
 
-diesel::table! {
+table! {
     view_table (id) {
         id -> Text,
         belong_to_id -> Text,
@@ -143,7 +152,7 @@ diesel::table! {
     }
 }
 
-diesel::table! {
+table! {
     workspace_table (id) {
         id -> Text,
         name -> Text,
@@ -155,9 +164,10 @@ diesel::table! {
     }
 }
 
-diesel::allow_tables_to_appear_in_same_query!(
+allow_tables_to_appear_in_same_query!(
     app_table,
     document_rev_table,
+    folder_rev_snapshot,
     grid_block_index_table,
     grid_meta_rev_table,
     grid_rev_snapshot,

+ 7 - 2
frontend/rust-lib/flowy-folder/src/manager.rs

@@ -23,7 +23,9 @@ use lazy_static::lazy_static;
 use lib_infra::future::FutureResult;
 
 use crate::services::clear_current_workspace;
-use crate::services::persistence::rev_sqlite::SQLiteFolderRevisionPersistence;
+use crate::services::persistence::rev_sqlite::{
+    SQLiteFolderRevisionPersistence, SQLiteFolderRevisionSnapshotPersistence,
+};
 use flowy_http_model::ws_data::ServerRevisionWSData;
 use flowy_sync::client_folder::FolderPad;
 use std::convert::TryFrom;
@@ -174,12 +176,15 @@ impl FolderManager {
         let configuration = RevisionPersistenceConfiguration::new(100, false);
         let rev_persistence = RevisionPersistence::new(user_id, object_id, disk_cache, configuration);
         let rev_compactor = FolderRevisionMergeable();
+
+        let snapshot_object_id = format!("folder:{}", object_id);
+        let snapshot_persistence = SQLiteFolderRevisionSnapshotPersistence::new(&snapshot_object_id, pool);
         let rev_manager = RevisionManager::new(
             user_id,
             folder_id.as_ref(),
             rev_persistence,
             rev_compactor,
-            PhantomSnapshotPersistence(),
+            snapshot_persistence,
         );
 
         let folder_editor = FolderEditor::new(user_id, &folder_id, token, rev_manager, self.web_socket.clone()).await?;

+ 96 - 0
frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/folder_snapshot_sqlite_impl.rs

@@ -0,0 +1,96 @@
+use bytes::Bytes;
+use flowy_database::{
+    prelude::*,
+    schema::{folder_rev_snapshot, folder_rev_snapshot::dsl},
+    ConnectionPool,
+};
+use flowy_error::{internal_error, FlowyResult};
+use flowy_revision::{RevisionSnapshot, RevisionSnapshotDiskCache};
+use lib_infra::util::timestamp;
+use std::sync::Arc;
+
+pub struct SQLiteFolderRevisionSnapshotPersistence {
+    object_id: String,
+    pool: Arc<ConnectionPool>,
+}
+
+impl SQLiteFolderRevisionSnapshotPersistence {
+    pub fn new(object_id: &str, pool: Arc<ConnectionPool>) -> Self {
+        Self {
+            object_id: object_id.to_string(),
+            pool,
+        }
+    }
+
+    fn gen_snapshot_id(&self, rev_id: i64) -> String {
+        format!("{}:{}", self.object_id, rev_id)
+    }
+}
+
+impl RevisionSnapshotDiskCache for SQLiteFolderRevisionSnapshotPersistence {
+    fn should_generate_snapshot_from_range(&self, start_rev_id: i64, current_rev_id: i64) -> bool {
+        (current_rev_id - start_rev_id) >= 2
+    }
+
+    fn write_snapshot(&self, rev_id: i64, data: Vec<u8>) -> FlowyResult<()> {
+        let conn = self.pool.get().map_err(internal_error)?;
+        let snapshot_id = self.gen_snapshot_id(rev_id);
+        let timestamp = timestamp();
+        let record = (
+            dsl::snapshot_id.eq(&snapshot_id),
+            dsl::object_id.eq(&self.object_id),
+            dsl::rev_id.eq(rev_id),
+            dsl::base_rev_id.eq(rev_id),
+            dsl::timestamp.eq(timestamp),
+            dsl::data.eq(data),
+        );
+        let _ = insert_or_ignore_into(dsl::folder_rev_snapshot)
+            .values(record)
+            .execute(&*conn)?;
+        Ok(())
+    }
+
+    fn read_snapshot(&self, rev_id: i64) -> FlowyResult<Option<RevisionSnapshot>> {
+        let conn = self.pool.get().map_err(internal_error)?;
+        let snapshot_id = self.gen_snapshot_id(rev_id);
+        let record = dsl::folder_rev_snapshot
+            .filter(dsl::snapshot_id.eq(&snapshot_id))
+            .first::<FolderSnapshotRecord>(&*conn)?;
+
+        Ok(Some(record.into()))
+    }
+
+    fn read_last_snapshot(&self) -> FlowyResult<Option<RevisionSnapshot>> {
+        let conn = self.pool.get().map_err(internal_error)?;
+        let latest_record = dsl::folder_rev_snapshot
+            .filter(dsl::object_id.eq(&self.object_id))
+            .order(dsl::rev_id.desc())
+            // .select(max(dsl::rev_id))
+            // .select((dsl::id, dsl::object_id, dsl::rev_id, dsl::data))
+            .first::<FolderSnapshotRecord>(&*conn)?;
+        Ok(Some(latest_record.into()))
+    }
+}
+
+#[derive(PartialEq, Clone, Debug, Queryable, Identifiable, Insertable, Associations)]
+#[table_name = "folder_rev_snapshot"]
+#[primary_key("snapshot_id")]
+struct FolderSnapshotRecord {
+    snapshot_id: String,
+    object_id: String,
+    rev_id: i64,
+    base_rev_id: i64,
+    timestamp: i64,
+    data: Vec<u8>,
+}
+
+impl std::convert::From<FolderSnapshotRecord> for RevisionSnapshot {
+    fn from(record: FolderSnapshotRecord) -> Self {
+        RevisionSnapshot {
+            rev_id: record.rev_id,
+            base_rev_id: record.base_rev_id,
+            timestamp: record.timestamp,
+            data: Bytes::from(record.data),
+        }
+    }
+}

+ 3 - 0
frontend/rust-lib/flowy-folder/src/services/persistence/rev_sqlite/mod.rs

@@ -1,2 +1,5 @@
 mod folder_rev_sqlite;
+mod folder_snapshot_sqlite_impl;
+
 pub use folder_rev_sqlite::*;
+pub use folder_snapshot_sqlite_impl::*;

+ 0 - 32
frontend/rust-lib/flowy-grid/src/services/persistence/rev_sqlite/grid_snapshot_sqlite_impl.rs

@@ -115,35 +115,3 @@ impl std::convert::From<GridSnapshotRecord> for RevisionSnapshot {
         }
     }
 }
-
-// pub(crate) fn get_latest_rev_id_from(rev_ids: Vec<i64>, anchor: i64) -> Option<i64> {
-//     let mut target_rev_id = None;
-//     let mut old_step: Option<i64> = None;
-//     for rev_id in rev_ids {
-//         let step = (rev_id - anchor).abs();
-//         if let Some(old_step) = &mut old_step {
-//             if *old_step > step {
-//                 *old_step = step;
-//                 target_rev_id = Some(rev_id);
-//             }
-//         } else {
-//             old_step = Some(step);
-//             target_rev_id = Some(rev_id);
-//         }
-//     }
-//     target_rev_id
-// }
-
-// #[cfg(test)]
-// mod tests {
-//     use crate::services::persistence::rev_sqlite::get_latest_rev_id_from;
-//
-//     #[test]
-//     fn test_latest_rev_id() {
-//         let ids = vec![1, 2, 3, 4, 5, 6];
-//         for (anchor, expected_value) in vec![(3, 3), (7, 6), (1, 1)] {
-//             let value = get_latest_rev_id_from(ids.clone(), anchor).unwrap();
-//             assert_eq!(value, expected_value);
-//         }
-//     }
-// }

+ 17 - 8
frontend/rust-lib/flowy-revision/src/rev_snapshot.rs

@@ -10,14 +10,21 @@ use std::sync::atomic::Ordering::SeqCst;
 use std::sync::Arc;
 
 pub trait RevisionSnapshotDiskCache: Send + Sync {
+    fn should_generate_snapshot_from_range(&self, start_rev_id: i64, current_rev_id: i64) -> bool {
+        (current_rev_id - start_rev_id) >= AUTO_GEN_SNAPSHOT_PER_10_REVISION
+    }
+
     fn write_snapshot(&self, rev_id: i64, data: Vec<u8>) -> FlowyResult<()>;
+
     fn read_snapshot(&self, rev_id: i64) -> FlowyResult<Option<RevisionSnapshot>>;
+
     fn read_last_snapshot(&self) -> FlowyResult<Option<RevisionSnapshot>>;
 }
 
 /// Do nothing but just used to clam the rust compiler about the generic parameter `SP` of `RevisionManager`
 ///  
 pub struct PhantomSnapshotPersistence();
+
 impl RevisionSnapshotDiskCache for PhantomSnapshotPersistence {
     fn write_snapshot(&self, rev_id: i64, data: Vec<u8>) -> FlowyResult<()> {
         Ok(())
@@ -37,7 +44,7 @@ const AUTO_GEN_SNAPSHOT_PER_10_REVISION: i64 = 10;
 pub struct RevisionSnapshotController<Connection> {
     user_id: String,
     object_id: String,
-    disk_cache: Arc<dyn RevisionSnapshotDiskCache>,
+    rev_snapshot_persistence: Arc<dyn RevisionSnapshotDiskCache>,
     rev_id_counter: Arc<RevIdCounter>,
     rev_persistence: Arc<RevisionPersistence<Connection>>,
     rev_compress: Arc<dyn RevisionMergeable>,
@@ -63,7 +70,7 @@ where
         Self {
             user_id: user_id.to_string(),
             object_id: object_id.to_string(),
-            disk_cache,
+            rev_snapshot_persistence: disk_cache,
             rev_id_counter,
             start_rev_id: AtomicI64::new(0),
             rev_persistence: revision_persistence,
@@ -73,7 +80,7 @@ where
 
     pub async fn generate_snapshot(&self) {
         if let Some((rev_id, bytes)) = self.generate_snapshot_data() {
-            if let Err(e) = self.disk_cache.write_snapshot(rev_id, bytes.to_vec()) {
+            if let Err(e) = self.rev_snapshot_persistence.write_snapshot(rev_id, bytes.to_vec()) {
                 tracing::error!("Save snapshot failed: {}", e);
             }
         }
@@ -85,7 +92,7 @@ where
         B: RevisionObjectDeserializer,
     {
         tracing::trace!("Try to find if {} has snapshot", self.object_id);
-        let snapshot = self.disk_cache.read_last_snapshot().ok()??;
+        let snapshot = self.rev_snapshot_persistence.read_last_snapshot().ok()??;
         let snapshot_rev_id = snapshot.rev_id;
         let revision = Revision::new(
             &self.object_id,
@@ -115,10 +122,12 @@ where
         if current_rev_id <= start_rev_id {
             return;
         }
-
-        if (current_rev_id - start_rev_id) >= AUTO_GEN_SNAPSHOT_PER_10_REVISION {
+        if self
+            .rev_snapshot_persistence
+            .should_generate_snapshot_from_range(start_rev_id, current_rev_id)
+        {
             if let Some((rev_id, bytes)) = self.generate_snapshot_data() {
-                let disk_cache = self.disk_cache.clone();
+                let disk_cache = self.rev_snapshot_persistence.clone();
                 tokio::spawn(async move {
                     let _ = disk_cache.write_snapshot(rev_id, bytes.to_vec());
                 });
@@ -161,7 +170,7 @@ impl<Connection> std::ops::Deref for RevisionSnapshotController<Connection> {
     type Target = Arc<dyn RevisionSnapshotDiskCache>;
 
     fn deref(&self) -> &Self::Target {
-        &self.disk_cache
+        &self.rev_snapshot_persistence
     }
 }