소스 검색

config folder sync on backend

appflowy 3 년 전
부모
커밋
23c4924532

+ 32 - 15
backend/src/context.rs

@@ -6,13 +6,11 @@ use actix::Addr;
 use actix_web::web::Data;
 
 use crate::services::{
-    document::{
-        persistence::DocumentKVPersistence,
-        ws_receiver::{make_document_ws_receiver, HttpDocumentCloudPersistence},
-    },
-    folder::ws_receiver::make_folder_ws_receiver,
+    document::ws_receiver::{make_document_ws_receiver, HttpDocumentCloudPersistence},
+    folder::ws_receiver::{make_folder_ws_receiver, HttpFolderCloudPersistence},
+    kv::revision_kv::RevisionKVPersistence,
 };
-use flowy_collaboration::server_document::ServerDocumentManager;
+use flowy_collaboration::{server_document::ServerDocumentManager, server_folder::ServerFolderManager};
 use lib_ws::WSChannel;
 use sqlx::PgPool;
 use std::sync::Arc;
@@ -23,6 +21,7 @@ pub struct AppContext {
     pub persistence: Data<Arc<FlowyPersistence>>,
     pub ws_receivers: Data<WebSocketReceivers>,
     pub document_manager: Data<Arc<ServerDocumentManager>>,
+    pub folder_manager: Data<Arc<ServerFolderManager>>,
 }
 
 impl AppContext {
@@ -30,16 +29,22 @@ impl AppContext {
         let ws_server = Data::new(ws_server);
         let mut ws_receivers = WebSocketReceivers::new();
 
-        let kv_store = make_document_kv_store(pg_pool.clone());
-        let flowy_persistence = Arc::new(FlowyPersistence { pg_pool, kv_store });
+        let document_store = make_document_kv_store(pg_pool.clone());
+        let folder_store = make_folder_kv_store(pg_pool.clone());
+        let flowy_persistence = Arc::new(FlowyPersistence {
+            pg_pool,
+            document_store,
+            folder_store,
+        });
 
-        let document_persistence = Arc::new(HttpDocumentCloudPersistence(flowy_persistence.kv_store()));
+        let document_persistence = Arc::new(HttpDocumentCloudPersistence(flowy_persistence.document_kv_store()));
         let document_manager = Arc::new(ServerDocumentManager::new(document_persistence));
-
         let document_ws_receiver = make_document_ws_receiver(flowy_persistence.clone(), document_manager.clone());
         ws_receivers.set(WSChannel::Document, document_ws_receiver);
 
-        let folder_ws_receiver = make_folder_ws_receiver(flowy_persistence.clone());
+        let folder_persistence = Arc::new(HttpFolderCloudPersistence(flowy_persistence.folder_kv_store()));
+        let folder_manager = Arc::new(ServerFolderManager::new(folder_persistence));
+        let folder_ws_receiver = make_folder_ws_receiver(flowy_persistence.clone(), folder_manager.clone());
         ws_receivers.set(WSChannel::Folder, folder_ws_receiver);
 
         AppContext {
@@ -47,23 +52,35 @@ impl AppContext {
             persistence: Data::new(flowy_persistence),
             ws_receivers: Data::new(ws_receivers),
             document_manager: Data::new(document_manager),
+            folder_manager: Data::new(folder_manager),
         }
     }
 }
 
-fn make_document_kv_store(pg_pool: PgPool) -> Arc<DocumentKVPersistence> {
+pub type DocumentRevisionKV = RevisionKVPersistence;
+pub type FolderRevisionKV = RevisionKVPersistence;
+
+fn make_document_kv_store(pg_pool: PgPool) -> Arc<DocumentRevisionKV> {
+    let kv_impl = Arc::new(PostgresKV { pg_pool });
+    Arc::new(DocumentRevisionKV::new(kv_impl))
+}
+
+fn make_folder_kv_store(pg_pool: PgPool) -> Arc<FolderRevisionKV> {
     let kv_impl = Arc::new(PostgresKV { pg_pool });
-    Arc::new(DocumentKVPersistence::new(kv_impl))
+    Arc::new(FolderRevisionKV::new(kv_impl))
 }
 
 #[derive(Clone)]
 pub struct FlowyPersistence {
     pg_pool: PgPool,
-    kv_store: Arc<DocumentKVPersistence>,
+    document_store: Arc<DocumentRevisionKV>,
+    folder_store: Arc<FolderRevisionKV>,
 }
 
 impl FlowyPersistence {
     pub fn pg_pool(&self) -> PgPool { self.pg_pool.clone() }
 
-    pub fn kv_store(&self) -> Arc<DocumentKVPersistence> { self.kv_store.clone() }
+    pub fn document_kv_store(&self) -> Arc<DocumentRevisionKV> { self.document_store.clone() }
+
+    pub fn folder_kv_store(&self) -> Arc<FolderRevisionKV> { self.folder_store.clone() }
 }

+ 15 - 133
backend/src/services/document/persistence.rs

@@ -1,44 +1,33 @@
-use crate::{
-    services::kv::{KVStore, KeyValue},
-    util::serde_ext::parse_from_bytes,
-};
 use anyhow::Context;
 use backend_service::errors::{internal_error, ServerError};
-use bytes::Bytes;
+
 use flowy_collaboration::{
-    protobuf::{
-        CreateDocParams,
-        DocumentId,
-        DocumentInfo,
-        RepeatedRevision as RepeatedRevisionPB,
-        ResetDocumentParams,
-        Revision as RevisionPB,
-    },
+    protobuf::{CreateDocParams, DocumentId, DocumentInfo, ResetDocumentParams},
     server_document::ServerDocumentManager,
     util::make_document_info_pb_from_revisions_pb,
 };
 
-use protobuf::Message;
+use crate::services::kv::revision_kv::RevisionKVPersistence;
 use std::sync::Arc;
 use uuid::Uuid;
 
-#[tracing::instrument(level = "debug", skip(kv_store, params), err)]
+#[tracing::instrument(level = "debug", skip(document_store, params), err)]
 pub(crate) async fn create_document(
-    kv_store: &Arc<DocumentKVPersistence>,
+    document_store: &Arc<RevisionKVPersistence>,
     mut params: CreateDocParams,
 ) -> Result<(), ServerError> {
     let revisions = params.take_revisions().take_items();
-    let _ = kv_store.set_revision(revisions.into()).await?;
+    let _ = document_store.set_revision(revisions.into()).await?;
     Ok(())
 }
 
-#[tracing::instrument(level = "debug", skip(kv_store), err)]
+#[tracing::instrument(level = "debug", skip(document_store), err)]
 pub async fn read_document(
-    kv_store: &Arc<DocumentKVPersistence>,
+    document_store: &Arc<RevisionKVPersistence>,
     params: DocumentId,
 ) -> Result<DocumentInfo, ServerError> {
     let _ = Uuid::parse_str(&params.doc_id).context("Parse document id to uuid failed")?;
-    let revisions = kv_store.get_revisions(&params.doc_id, None).await?;
+    let revisions = document_store.get_revisions(&params.doc_id, None).await?;
     match make_document_info_pb_from_revisions_pb(&params.doc_id, revisions) {
         Ok(Some(document_info)) => Ok(document_info),
         Ok(None) => Err(ServerError::record_not_found().context(format!("{} not exist", params.doc_id))),
@@ -63,119 +52,12 @@ pub async fn reset_document(
     Ok(())
 }
 
-#[tracing::instrument(level = "debug", skip(kv_store), err)]
-pub(crate) async fn delete_document(kv_store: &Arc<DocumentKVPersistence>, doc_id: Uuid) -> Result<(), ServerError> {
+#[tracing::instrument(level = "debug", skip(document_store), err)]
+pub(crate) async fn delete_document(
+    document_store: &Arc<RevisionKVPersistence>,
+    doc_id: Uuid,
+) -> Result<(), ServerError> {
     // TODO: delete revisions may cause time issue. Maybe delete asynchronously?
-    let _ = kv_store.delete_revisions(&doc_id.to_string(), None).await?;
+    let _ = document_store.delete_revisions(&doc_id.to_string(), None).await?;
     Ok(())
 }
-
-pub struct DocumentKVPersistence {
-    inner: Arc<KVStore>,
-}
-
-impl std::ops::Deref for DocumentKVPersistence {
-    type Target = Arc<KVStore>;
-
-    fn deref(&self) -> &Self::Target { &self.inner }
-}
-
-impl std::ops::DerefMut for DocumentKVPersistence {
-    fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner }
-}
-
-impl DocumentKVPersistence {
-    pub(crate) fn new(kv_store: Arc<KVStore>) -> Self { DocumentKVPersistence { inner: kv_store } }
-
-    pub(crate) async fn set_revision(&self, revisions: Vec<RevisionPB>) -> Result<(), ServerError> {
-        let items = revisions_to_key_value_items(revisions)?;
-        self.inner
-            .transaction(|mut t| Box::pin(async move { t.batch_set(items).await }))
-            .await
-    }
-
-    pub(crate) async fn get_revisions<T: Into<Option<Vec<i64>>>>(
-        &self,
-        doc_id: &str,
-        rev_ids: T,
-    ) -> Result<RepeatedRevisionPB, ServerError> {
-        let rev_ids = rev_ids.into();
-        let items = match rev_ids {
-            None => {
-                let doc_id = doc_id.to_owned();
-                self.inner
-                    .transaction(|mut t| Box::pin(async move { t.batch_get_start_with(&doc_id).await }))
-                    .await?
-            },
-            Some(rev_ids) => {
-                let keys = rev_ids
-                    .into_iter()
-                    .map(|rev_id| make_revision_key(doc_id, rev_id))
-                    .collect::<Vec<String>>();
-
-                self.inner
-                    .transaction(|mut t| Box::pin(async move { t.batch_get(keys).await }))
-                    .await?
-            },
-        };
-
-        Ok(key_value_items_to_revisions(items))
-    }
-
-    pub(crate) async fn delete_revisions<T: Into<Option<Vec<i64>>>>(
-        &self,
-        doc_id: &str,
-        rev_ids: T,
-    ) -> Result<(), ServerError> {
-        match rev_ids.into() {
-            None => {
-                let doc_id = doc_id.to_owned();
-                self.inner
-                    .transaction(|mut t| Box::pin(async move { t.batch_delete_key_start_with(&doc_id).await }))
-                    .await
-            },
-            Some(rev_ids) => {
-                let keys = rev_ids
-                    .into_iter()
-                    .map(|rev_id| make_revision_key(doc_id, rev_id))
-                    .collect::<Vec<String>>();
-
-                self.inner
-                    .transaction(|mut t| Box::pin(async move { t.batch_delete(keys).await }))
-                    .await
-            },
-        }
-    }
-}
-
-#[inline]
-pub fn revisions_to_key_value_items(revisions: Vec<RevisionPB>) -> Result<Vec<KeyValue>, ServerError> {
-    let mut items = vec![];
-    for revision in revisions {
-        let key = make_revision_key(&revision.object_id, revision.rev_id);
-
-        if revision.delta_data.is_empty() {
-            return Err(ServerError::internal().context("The delta_data of RevisionPB should not be empty"));
-        }
-
-        let value = Bytes::from(revision.write_to_bytes().unwrap());
-        items.push(KeyValue { key, value });
-    }
-    Ok(items)
-}
-
-#[inline]
-fn key_value_items_to_revisions(items: Vec<KeyValue>) -> RepeatedRevisionPB {
-    let mut revisions = items
-        .into_iter()
-        .filter_map(|kv| parse_from_bytes::<RevisionPB>(&kv.value).ok())
-        .collect::<Vec<RevisionPB>>();
-
-    revisions.sort_by(|a, b| a.rev_id.cmp(&b.rev_id));
-    let mut repeated_revision = RepeatedRevisionPB::new();
-    repeated_revision.set_items(revisions.into());
-    repeated_revision
-}
-
-#[inline]
-fn make_revision_key(doc_id: &str, rev_id: i64) -> String { format!("{}:{}", doc_id, rev_id) }

+ 2 - 2
backend/src/services/document/router.rs

@@ -23,7 +23,7 @@ pub async fn create_document_handler(
     persistence: Data<Arc<FlowyPersistence>>,
 ) -> Result<HttpResponse, ServerError> {
     let params: CreateDocParamsPB = parse_from_payload(payload).await?;
-    let kv_store = persistence.kv_store();
+    let kv_store = persistence.document_kv_store();
     let _ = create_document(&kv_store, params).await?;
     Ok(FlowyResponse::success().into())
 }
@@ -34,7 +34,7 @@ pub async fn read_document_handler(
     persistence: Data<Arc<FlowyPersistence>>,
 ) -> Result<HttpResponse, ServerError> {
     let params: DocumentIdPB = parse_from_payload(payload).await?;
-    let kv_store = persistence.kv_store();
+    let kv_store = persistence.document_kv_store();
     let doc = read_document(&kv_store, params).await?;
     let response = FlowyResponse::success().pb(doc)?;
     Ok(response.into())

+ 22 - 29
backend/src/services/document/ws_receiver.rs

@@ -1,10 +1,11 @@
 use crate::{
-    context::FlowyPersistence,
+    context::{DocumentRevisionKV, FlowyPersistence},
     services::{
         document::{
-            persistence::{create_document, read_document, revisions_to_key_value_items, DocumentKVPersistence},
+            persistence::{create_document, read_document},
             ws_actor::{DocumentWSActorMessage, DocumentWebSocketActor},
         },
+        kv::revision_kv::revisions_to_key_value_items,
         web_socket::{WSClientData, WebSocketReceiver},
     },
 };
@@ -76,9 +77,9 @@ impl WebSocketReceiver for DocumentWebSocketReceiver {
     }
 }
 
-pub struct HttpDocumentCloudPersistence(pub Arc<DocumentKVPersistence>);
+pub struct HttpDocumentCloudPersistence(pub Arc<DocumentRevisionKV>);
 impl Debug for HttpDocumentCloudPersistence {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("DocumentPersistenceImpl") }
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("HttpDocumentCloudPersistence") }
 }
 
 impl DocumentCloudPersistence for HttpDocumentCloudPersistence {
@@ -87,11 +88,11 @@ impl DocumentCloudPersistence for HttpDocumentCloudPersistence {
             doc_id: doc_id.to_string(),
             ..Default::default()
         };
-        let kv_store = self.0.clone();
+        let document_store = self.0.clone();
         Box::pin(async move {
-            let mut pb_doc = read_document(&kv_store, params)
+            let mut pb_doc = read_document(&document_store, params)
                 .await
-                .map_err(server_error_to_collaborate_error)?;
+                .map_err(|e| e.to_collaborate_error())?;
             let doc = (&mut pb_doc)
                 .try_into()
                 .map_err(|e| CollaborateError::internal().context(e))?;
@@ -104,7 +105,7 @@ impl DocumentCloudPersistence for HttpDocumentCloudPersistence {
         doc_id: &str,
         repeated_revision: RepeatedRevisionPB,
     ) -> BoxResultFuture<Option<DocumentInfo>, CollaborateError> {
-        let kv_store = self.0.clone();
+        let document_store = self.0.clone();
         let doc_id = doc_id.to_owned();
         Box::pin(async move {
             let document_info = make_document_info_from_revisions_pb(&doc_id, repeated_revision.clone())?;
@@ -112,9 +113,9 @@ impl DocumentCloudPersistence for HttpDocumentCloudPersistence {
             let mut params = CreateDocParamsPB::new();
             params.set_id(doc_id);
             params.set_revisions(repeated_revision);
-            let _ = create_document(&kv_store, params)
+            let _ = create_document(&document_store, params)
                 .await
-                .map_err(server_error_to_collaborate_error)?;
+                .map_err(|e| e.to_collaborate_error())?;
             Ok(document_info)
         })
     }
@@ -124,28 +125,28 @@ impl DocumentCloudPersistence for HttpDocumentCloudPersistence {
         doc_id: &str,
         rev_ids: Option<Vec<i64>>,
     ) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
-        let kv_store = self.0.clone();
+        let document_store = self.0.clone();
         let doc_id = doc_id.to_owned();
         let f = || async move {
-            let mut repeated_revision = kv_store.get_revisions(&doc_id, rev_ids).await?;
-            Ok(repeated_revision.take_items().into())
+            let mut repeated_revision = document_store.get_revisions(&doc_id, rev_ids).await?;
+            Ok::<Vec<RevisionPB>, ServerError>(repeated_revision.take_items().into())
         };
 
-        Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
+        Box::pin(async move { f().await.map_err(|e| e.to_collaborate_error()) })
     }
 
     fn save_document_revisions(
         &self,
         mut repeated_revision: RepeatedRevisionPB,
     ) -> BoxResultFuture<(), CollaborateError> {
-        let kv_store = self.0.clone();
+        let document_store = self.0.clone();
         let f = || async move {
             let revisions = repeated_revision.take_items().into();
-            let _ = kv_store.set_revision(revisions).await?;
-            Ok(())
+            let _ = document_store.set_revision(revisions).await?;
+            Ok::<(), ServerError>(())
         };
 
-        Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
+        Box::pin(async move { f().await.map_err(|e| e.to_collaborate_error()) })
     }
 
     fn reset_document(
@@ -153,10 +154,10 @@ impl DocumentCloudPersistence for HttpDocumentCloudPersistence {
         doc_id: &str,
         mut repeated_revision: RepeatedRevisionPB,
     ) -> BoxResultFuture<(), CollaborateError> {
-        let kv_store = self.0.clone();
+        let document_store = self.0.clone();
         let doc_id = doc_id.to_owned();
         let f = || async move {
-            kv_store
+            document_store
                 .transaction(|mut transaction| {
                     Box::pin(async move {
                         let _ = transaction.batch_delete_key_start_with(&doc_id).await?;
@@ -167,14 +168,6 @@ impl DocumentCloudPersistence for HttpDocumentCloudPersistence {
                 })
                 .await
         };
-        Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
-    }
-}
-
-fn server_error_to_collaborate_error(error: ServerError) -> CollaborateError {
-    if error.is_record_not_found() {
-        CollaborateError::record_not_found()
-    } else {
-        CollaborateError::internal().context(error)
+        Box::pin(async move { f().await.map_err(|e| e.to_collaborate_error()) })
     }
 }

+ 1 - 1
backend/src/services/folder/trash/router.rs

@@ -48,7 +48,7 @@ pub async fn delete_handler(
     logged_user: LoggedUser,
 ) -> Result<HttpResponse, ServerError> {
     let pool = persistence.pg_pool();
-    let kv_store = persistence.kv_store();
+    let kv_store = persistence.document_kv_store();
     let params: RepeatedTrashId = parse_from_payload(payload).await?;
     let mut transaction = pool
         .begin()

+ 14 - 16
backend/src/services/folder/trash/trash.rs

@@ -1,12 +1,10 @@
 use crate::{
+    context::DocumentRevisionKV,
     entities::logged_user::LoggedUser,
-    services::{
-        document::persistence::DocumentKVPersistence,
-        folder::{
-            app::controller::{delete_app, read_app_table},
-            trash::persistence::{TrashTable, TRASH_TABLE},
-            view::{delete_view, read_view_table},
-        },
+    services::folder::{
+        app::controller::{delete_app, read_app_table},
+        trash::persistence::{TrashTable, TRASH_TABLE},
+        view::{delete_view, read_view_table},
     },
     util::sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder},
 };
@@ -39,10 +37,10 @@ pub(crate) async fn create_trash(
     Ok(())
 }
 
-#[tracing::instrument(skip(transaction, kv_store, user), fields(delete_rows), err)]
+#[tracing::instrument(skip(transaction, document_store, user), fields(delete_rows), err)]
 pub(crate) async fn delete_all_trash(
     transaction: &mut DBTransaction<'_>,
-    kv_store: &Arc<DocumentKVPersistence>,
+    document_store: &Arc<DocumentRevisionKV>,
     user: &LoggedUser,
 ) -> Result<(), ServerError> {
     let (sql, args) = SqlBuilder::select(TRASH_TABLE)
@@ -57,7 +55,7 @@ pub(crate) async fn delete_all_trash(
         .collect::<Vec<(Uuid, i32)>>();
     tracing::Span::current().record("delete_rows", &format!("{:?}", rows).as_str());
     let affected_row_count = rows.len();
-    let _ = delete_trash_associate_targets(transaction as &mut DBTransaction<'_>, kv_store, rows).await?;
+    let _ = delete_trash_associate_targets(transaction as &mut DBTransaction<'_>, document_store, rows).await?;
 
     let (sql, args) = SqlBuilder::delete(TRASH_TABLE)
         .and_where_eq("user_id", &user.user_id)
@@ -72,10 +70,10 @@ pub(crate) async fn delete_all_trash(
     Ok(())
 }
 
-#[tracing::instrument(skip(transaction, kv_store), err)]
+#[tracing::instrument(skip(transaction, document_store), err)]
 pub(crate) async fn delete_trash(
     transaction: &mut DBTransaction<'_>,
-    kv_store: &Arc<DocumentKVPersistence>,
+    document_store: &Arc<DocumentRevisionKV>,
     records: Vec<(Uuid, i32)>,
 ) -> Result<(), ServerError> {
     for (trash_id, _) in records {
@@ -92,7 +90,7 @@ pub(crate) async fn delete_trash(
 
         let _ = delete_trash_associate_targets(
             transaction as &mut DBTransaction<'_>,
-            kv_store,
+            document_store,
             vec![(trash_table.id, trash_table.ty)],
         )
         .await?;
@@ -107,10 +105,10 @@ pub(crate) async fn delete_trash(
     Ok(())
 }
 
-#[tracing::instrument(skip(transaction, kv_store, targets), err)]
+#[tracing::instrument(skip(transaction, document_store, targets), err)]
 async fn delete_trash_associate_targets(
     transaction: &mut DBTransaction<'_>,
-    kv_store: &Arc<DocumentKVPersistence>,
+    document_store: &Arc<DocumentRevisionKV>,
     targets: Vec<(Uuid, i32)>,
 ) -> Result<(), ServerError> {
     for (id, ty) in targets {
@@ -119,7 +117,7 @@ async fn delete_trash_associate_targets(
             Some(ty) => match ty {
                 TrashType::Unknown => {},
                 TrashType::View => {
-                    let _ = delete_view(transaction as &mut DBTransaction<'_>, kv_store, vec![id]).await;
+                    let _ = delete_view(transaction as &mut DBTransaction<'_>, document_store, vec![id]).await;
                 },
                 TrashType::App => {
                     let _ = delete_app(transaction as &mut DBTransaction<'_>, id).await;

+ 8 - 7
backend/src/services/folder/view/controller.rs

@@ -1,13 +1,14 @@
 use crate::{
     entities::logged_user::LoggedUser,
     services::{
-        document::persistence::{create_document, delete_document, DocumentKVPersistence},
+        document::persistence::{create_document, delete_document},
         folder::{trash::read_trash_ids, view::persistence::*},
     },
     util::sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder},
 };
 use backend_service::errors::{invalid_params, ServerError};
 
+use crate::context::DocumentRevisionKV;
 use chrono::Utc;
 use flowy_collaboration::{
     client_document::default::initial_delta,
@@ -48,10 +49,10 @@ pub(crate) async fn update_view(
     Ok(())
 }
 
-#[tracing::instrument(skip(transaction, kv_store), err)]
+#[tracing::instrument(skip(transaction, document_store), err)]
 pub(crate) async fn delete_view(
     transaction: &mut DBTransaction<'_>,
-    kv_store: &Arc<DocumentKVPersistence>,
+    document_store: &Arc<DocumentRevisionKV>,
     view_ids: Vec<Uuid>,
 ) -> Result<(), ServerError> {
     for view_id in view_ids {
@@ -61,15 +62,15 @@ pub(crate) async fn delete_view(
             .await
             .map_err(map_sqlx_error)?;
 
-        let _ = delete_document(kv_store, view_id).await?;
+        let _ = delete_document(document_store, view_id).await?;
     }
     Ok(())
 }
 
-#[tracing::instrument(name = "create_view", level = "debug", skip(transaction, kv_store), err)]
+#[tracing::instrument(name = "create_view", level = "debug", skip(transaction, document_store), err)]
 pub(crate) async fn create_view(
     transaction: &mut DBTransaction<'_>,
-    kv_store: Arc<DocumentKVPersistence>,
+    document_store: Arc<DocumentRevisionKV>,
     params: CreateViewParamsPB,
     user_id: &str,
 ) -> Result<ViewPB, ServerError> {
@@ -98,7 +99,7 @@ pub(crate) async fn create_view(
     let mut create_doc_params = CreateDocParamsPB::new();
     create_doc_params.set_revisions(repeated_revision.try_into().unwrap());
     create_doc_params.set_id(view.id.clone());
-    let _ = create_document(&kv_store, create_doc_params).await?;
+    let _ = create_document(&document_store, create_doc_params).await?;
 
     Ok(view)
 }

+ 2 - 2
backend/src/services/folder/view/router.rs

@@ -37,7 +37,7 @@ pub async fn create_handler(
     user: LoggedUser,
 ) -> Result<HttpResponse, ServerError> {
     let params: CreateViewParamsPB = parse_from_payload(payload).await?;
-    let kv_store = persistence.kv_store();
+    let kv_store = persistence.document_kv_store();
     let pool = persistence.pg_pool();
     let mut transaction = pool
         .begin()
@@ -114,7 +114,7 @@ pub async fn delete_handler(
 ) -> Result<HttpResponse, ServerError> {
     let params: QueryViewRequestPB = parse_from_payload(payload).await?;
     let pool = persistence.pg_pool();
-    let kv_store = persistence.kv_store();
+    let kv_store = persistence.document_kv_store();
     let view_ids = check_view_ids(params.view_ids.to_vec())?;
     let mut transaction = pool
         .begin()

+ 15 - 4
backend/src/services/folder/ws_actor.rs

@@ -12,6 +12,7 @@ use flowy_collaboration::{
         ClientRevisionWSData as ClientRevisionWSDataPB,
         ClientRevisionWSDataType as ClientRevisionWSDataTypePB,
     },
+    server_folder::ServerFolderManager,
     synchronizer::{RevisionSyncResponse, RevisionUser},
 };
 use futures::stream::StreamExt;
@@ -29,12 +30,14 @@ pub enum FolderWSActorMessage {
 
 pub struct FolderWebSocketActor {
     receiver: Option<mpsc::Receiver<FolderWSActorMessage>>,
+    folder_manager: Arc<ServerFolderManager>,
 }
 
 impl FolderWebSocketActor {
-    pub fn new(receiver: mpsc::Receiver<FolderWSActorMessage>) -> Self {
+    pub fn new(receiver: mpsc::Receiver<FolderWSActorMessage>, folder_manager: Arc<ServerFolderManager>) -> Self {
         Self {
             receiver: Some(receiver),
+            folder_manager,
         }
     }
 
@@ -79,13 +82,21 @@ impl FolderWebSocketActor {
             folder_client_data.ty
         );
 
-        let _user = Arc::new(FolderRevisionUser { user, socket });
+        let user = Arc::new(FolderRevisionUser { user, socket });
         match &folder_client_data.ty {
             ClientRevisionWSDataTypePB::ClientPushRev => {
-                todo!()
+                let _ = self
+                    .folder_manager
+                    .handle_client_revisions(user, folder_client_data)
+                    .await
+                    .map_err(internal_error)?;
             },
             ClientRevisionWSDataTypePB::ClientPing => {
-                todo!()
+                let _ = self
+                    .folder_manager
+                    .handle_client_ping(user, folder_client_data)
+                    .await
+                    .map_err(internal_error)?;
             },
         }
         Ok(())

+ 111 - 2
backend/src/services/folder/ws_receiver.rs

@@ -5,13 +5,26 @@ use crate::{
         web_socket::{WSClientData, WebSocketReceiver},
     },
 };
+use std::fmt::{Debug, Formatter};
 
+use crate::{context::FolderRevisionKV, services::kv::revision_kv::revisions_to_key_value_items};
+use flowy_collaboration::{
+    entities::folder_info::FolderInfo,
+    errors::CollaborateError,
+    protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB},
+    server_folder::{FolderCloudPersistence, ServerFolderManager},
+    util::make_folder_from_revisions_pb,
+};
+use lib_infra::future::BoxResultFuture;
 use std::sync::Arc;
 use tokio::sync::{mpsc, oneshot};
 
-pub fn make_folder_ws_receiver(persistence: Arc<FlowyPersistence>) -> Arc<FolderWebSocketReceiver> {
+pub fn make_folder_ws_receiver(
+    persistence: Arc<FlowyPersistence>,
+    folder_manager: Arc<ServerFolderManager>,
+) -> Arc<FolderWebSocketReceiver> {
     let (ws_sender, rx) = tokio::sync::mpsc::channel(1000);
-    let actor = FolderWebSocketActor::new(rx);
+    let actor = FolderWebSocketActor::new(rx, folder_manager);
     tokio::task::spawn(actor.run());
     Arc::new(FolderWebSocketReceiver::new(persistence, ws_sender))
 }
@@ -51,3 +64,99 @@ impl WebSocketReceiver for FolderWebSocketReceiver {
         });
     }
 }
+
+pub struct HttpFolderCloudPersistence(pub Arc<FolderRevisionKV>);
+impl Debug for HttpFolderCloudPersistence {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str("HttpFolderCloudPersistence") }
+}
+
+impl FolderCloudPersistence for HttpFolderCloudPersistence {
+    fn read_folder(&self, _user_id: &str, folder_id: &str) -> BoxResultFuture<FolderInfo, CollaborateError> {
+        let folder_store = self.0.clone();
+        let folder_id = folder_id.to_owned();
+        Box::pin(async move {
+            let revisions = folder_store
+                .get_revisions(&folder_id, None)
+                .await
+                .map_err(|e| e.to_collaborate_error())?;
+            match make_folder_from_revisions_pb(&folder_id, revisions)? {
+                Some(folder_info) => Ok(folder_info),
+                None => Err(CollaborateError::record_not_found().context(format!("{} not exist", folder_id))),
+            }
+        })
+    }
+
+    fn create_folder(
+        &self,
+        _user_id: &str,
+        folder_id: &str,
+        mut repeated_revision: RepeatedRevisionPB,
+    ) -> BoxResultFuture<Option<FolderInfo>, CollaborateError> {
+        let folder_store = self.0.clone();
+        let folder_id = folder_id.to_owned();
+        Box::pin(async move {
+            let folder_info = make_folder_from_revisions_pb(&folder_id, repeated_revision.clone())?;
+            let revisions: Vec<RevisionPB> = repeated_revision.take_items().into();
+            let _ = folder_store
+                .set_revision(revisions)
+                .await
+                .map_err(|e| e.to_collaborate_error())?;
+            Ok(folder_info)
+        })
+    }
+
+    fn save_folder_revisions(
+        &self,
+        mut repeated_revision: RepeatedRevisionPB,
+    ) -> BoxResultFuture<(), CollaborateError> {
+        let folder_store = self.0.clone();
+        Box::pin(async move {
+            let revisions: Vec<RevisionPB> = repeated_revision.take_items().into();
+            let _ = folder_store
+                .set_revision(revisions)
+                .await
+                .map_err(|e| e.to_collaborate_error())?;
+            Ok(())
+        })
+    }
+
+    fn read_folder_revisions(
+        &self,
+        folder_id: &str,
+        rev_ids: Option<Vec<i64>>,
+    ) -> BoxResultFuture<Vec<RevisionPB>, CollaborateError> {
+        let folder_store = self.0.clone();
+        let folder_id = folder_id.to_owned();
+        Box::pin(async move {
+            let mut repeated_revision = folder_store
+                .get_revisions(&folder_id, rev_ids)
+                .await
+                .map_err(|e| e.to_collaborate_error())?;
+            let revisions: Vec<RevisionPB> = repeated_revision.take_items().into();
+            Ok(revisions)
+        })
+    }
+
+    fn reset_folder(
+        &self,
+        folder_id: &str,
+        mut repeated_revision: RepeatedRevisionPB,
+    ) -> BoxResultFuture<(), CollaborateError> {
+        let folder_store = self.0.clone();
+        let folder_id = folder_id.to_owned();
+        Box::pin(async move {
+            let _ = folder_store
+                .transaction(|mut transaction| {
+                    Box::pin(async move {
+                        let _ = transaction.batch_delete_key_start_with(&folder_id).await?;
+                        let items = revisions_to_key_value_items(repeated_revision.take_items().into())?;
+                        let _ = transaction.batch_set(items).await?;
+                        Ok(())
+                    })
+                })
+                .await
+                .map_err(|e| e.to_collaborate_error())?;
+            Ok(())
+        })
+    }
+}

+ 1 - 0
backend/src/services/kv/mod.rs

@@ -1,5 +1,6 @@
 #![allow(clippy::module_inception)]
 mod kv;
+pub mod revision_kv;
 
 use async_trait::async_trait;
 use bytes::Bytes;

+ 120 - 0
backend/src/services/kv/revision_kv.rs

@@ -0,0 +1,120 @@
+use crate::{
+    services::kv::{KVStore, KeyValue},
+    util::serde_ext::parse_from_bytes,
+};
+use backend_service::errors::ServerError;
+use bytes::Bytes;
+use flowy_collaboration::protobuf::{RepeatedRevision as RepeatedRevisionPB, Revision as RevisionPB};
+
+use protobuf::Message;
+use std::sync::Arc;
+
+pub struct RevisionKVPersistence {
+    inner: Arc<KVStore>,
+}
+
+impl std::ops::Deref for RevisionKVPersistence {
+    type Target = Arc<KVStore>;
+
+    fn deref(&self) -> &Self::Target { &self.inner }
+}
+
+impl std::ops::DerefMut for RevisionKVPersistence {
+    fn deref_mut(&mut self) -> &mut Self::Target { &mut self.inner }
+}
+
+impl RevisionKVPersistence {
+    pub(crate) fn new(kv_store: Arc<KVStore>) -> Self { RevisionKVPersistence { inner: kv_store } }
+
+    pub(crate) async fn set_revision(&self, revisions: Vec<RevisionPB>) -> Result<(), ServerError> {
+        let items = revisions_to_key_value_items(revisions)?;
+        self.inner
+            .transaction(|mut t| Box::pin(async move { t.batch_set(items).await }))
+            .await
+    }
+
+    pub(crate) async fn get_revisions<T: Into<Option<Vec<i64>>>>(
+        &self,
+        object_id: &str,
+        rev_ids: T,
+    ) -> Result<RepeatedRevisionPB, ServerError> {
+        let rev_ids = rev_ids.into();
+        let items = match rev_ids {
+            None => {
+                let object_id = object_id.to_owned();
+                self.inner
+                    .transaction(|mut t| Box::pin(async move { t.batch_get_start_with(&object_id).await }))
+                    .await?
+            },
+            Some(rev_ids) => {
+                let keys = rev_ids
+                    .into_iter()
+                    .map(|rev_id| make_revision_key(object_id, rev_id))
+                    .collect::<Vec<String>>();
+
+                self.inner
+                    .transaction(|mut t| Box::pin(async move { t.batch_get(keys).await }))
+                    .await?
+            },
+        };
+
+        Ok(key_value_items_to_revisions(items))
+    }
+
+    pub(crate) async fn delete_revisions<T: Into<Option<Vec<i64>>>>(
+        &self,
+        object_id: &str,
+        rev_ids: T,
+    ) -> Result<(), ServerError> {
+        match rev_ids.into() {
+            None => {
+                let object_id = object_id.to_owned();
+                self.inner
+                    .transaction(|mut t| Box::pin(async move { t.batch_delete_key_start_with(&object_id).await }))
+                    .await
+            },
+            Some(rev_ids) => {
+                let keys = rev_ids
+                    .into_iter()
+                    .map(|rev_id| make_revision_key(object_id, rev_id))
+                    .collect::<Vec<String>>();
+
+                self.inner
+                    .transaction(|mut t| Box::pin(async move { t.batch_delete(keys).await }))
+                    .await
+            },
+        }
+    }
+}
+
+#[inline]
+pub fn revisions_to_key_value_items(revisions: Vec<RevisionPB>) -> Result<Vec<KeyValue>, ServerError> {
+    let mut items = vec![];
+    for revision in revisions {
+        let key = make_revision_key(&revision.object_id, revision.rev_id);
+
+        if revision.delta_data.is_empty() {
+            return Err(ServerError::internal().context("The delta_data of RevisionPB should not be empty"));
+        }
+
+        let value = Bytes::from(revision.write_to_bytes().unwrap());
+        items.push(KeyValue { key, value });
+    }
+    Ok(items)
+}
+
+#[inline]
+fn key_value_items_to_revisions(items: Vec<KeyValue>) -> RepeatedRevisionPB {
+    let mut revisions = items
+        .into_iter()
+        .filter_map(|kv| parse_from_bytes::<RevisionPB>(&kv.value).ok())
+        .collect::<Vec<RevisionPB>>();
+
+    revisions.sort_by(|a, b| a.rev_id.cmp(&b.rev_id));
+    let mut repeated_revision = RepeatedRevisionPB::new();
+    repeated_revision.set_items(revisions.into());
+    repeated_revision
+}
+
+#[inline]
+fn make_revision_key(object_id: &str, rev_id: i64) -> String { format!("{}:{}", object_id, rev_id) }

+ 4 - 4
backend/tests/api_test/kv_test.rs

@@ -5,7 +5,7 @@ use std::str;
 #[actix_rt::test]
 async fn kv_set_test() {
     let server = spawn_server().await;
-    let kv = server.app_ctx.persistence.kv_store();
+    let kv = server.app_ctx.persistence.document_kv_store();
     let s1 = "123".to_string();
     let key = "1";
 
@@ -18,7 +18,7 @@ async fn kv_set_test() {
 #[actix_rt::test]
 async fn kv_delete_test() {
     let server = spawn_server().await;
-    let kv = server.app_ctx.persistence.kv_store();
+    let kv = server.app_ctx.persistence.document_kv_store();
     let s1 = "123".to_string();
     let key = "1";
 
@@ -30,7 +30,7 @@ async fn kv_delete_test() {
 #[actix_rt::test]
 async fn kv_batch_set_test() {
     let server = spawn_server().await;
-    let kv = server.app_ctx.persistence.kv_store();
+    let kv = server.app_ctx.persistence.document_kv_store();
     let kvs = vec![
         KeyValue {
             key: "1".to_string(),
@@ -54,7 +54,7 @@ async fn kv_batch_set_test() {
 #[actix_rt::test]
 async fn kv_batch_get_start_with_test() {
     let server = spawn_server().await;
-    let kv = server.app_ctx.persistence.kv_store();
+    let kv = server.app_ctx.persistence.document_kv_store();
     let kvs = vec![
         KeyValue {
             key: "abc:1".to_string(),

+ 1 - 1
backend/tests/document_test/edit_script.rs

@@ -113,7 +113,7 @@ async fn run_scripts(context: Arc<RwLock<ScriptContext>>, scripts: Vec<DocScript
                 },
                 DocScript::AssertServer(s, rev_id) => {
                     sleep(Duration::from_millis(2000)).await;
-                    let persistence = Data::new(context.read().server.app_ctx.persistence.kv_store());
+                    let persistence = Data::new(context.read().server.app_ctx.persistence.document_kv_store());
                     let doc_identifier: DocumentIdPB = DocumentId {
                         doc_id
                     }.try_into().unwrap();

+ 2 - 2
frontend/app_flowy/lib/workspace/infrastructure/repos/trash_repo.dart

@@ -33,11 +33,11 @@ class TrashRepo {
   }
 
   Future<Either<Unit, FlowyError>> restoreAll() {
-    return WorkspaceEventRestoreAll().send();
+    return WorkspaceEventRestoreAllTrash().send();
   }
 
   Future<Either<Unit, FlowyError>> deleteAll() {
-    return WorkspaceEventDeleteAll().send();
+    return WorkspaceEventDeleteAllTrash().send();
   }
 }
 

+ 1 - 0
frontend/app_flowy/packages/flowy_sdk/lib/dispatch/dispatch.dart

@@ -4,6 +4,7 @@ import 'package:flowy_log/flowy_log.dart';
 // ignore: unnecessary_import
 import 'package:flowy_sdk/protobuf/dart-ffi/ffi_response.pb.dart';
 import 'package:flowy_sdk/protobuf/dart-ffi/ffi_request.pb.dart';
+import 'package:flowy_sdk/protobuf/flowy-collaboration/document_info.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-error/errors.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-net/event.pb.dart';
 import 'package:flowy_sdk/protobuf/flowy-net/network_state.pb.dart';

+ 9 - 1
shared-lib/backend-service/src/errors.rs

@@ -5,7 +5,7 @@ use serde_repr::*;
 use std::{fmt, fmt::Debug};
 
 pub type Result<T> = std::result::Result<T, ServerError>;
-
+use flowy_collaboration::errors::CollaborateError;
 #[derive(thiserror::Error, Debug, Serialize, Deserialize, Clone)]
 pub struct ServerError {
     pub code: ErrorCode,
@@ -47,6 +47,14 @@ impl ServerError {
     pub fn is_record_not_found(&self) -> bool { self.code == ErrorCode::RecordNotFound }
 
     pub fn is_unauthorized(&self) -> bool { self.code == ErrorCode::UserUnauthorized }
+
+    pub fn to_collaborate_error(&self) -> CollaborateError {
+        if self.is_record_not_found() {
+            CollaborateError::record_not_found()
+        } else {
+            CollaborateError::internal().context(self.msg.clone())
+        }
+    }
 }
 
 pub fn internal_error<T>(e: T) -> ServerError