Browse Source

generic kv

appflowy 3 years ago
parent
commit
049b8828fe

+ 12 - 0
backend/Cargo.lock

@@ -405,6 +405,17 @@ dependencies = [
  "syn",
 ]
 
+[[package]]
+name = "async-trait"
+version = "0.1.52"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "061a7acccaa286c011ddc30970520b98fa40e00c9d644633fb26b5fc63a265e3"
+dependencies = [
+ "proc-macro2",
+ "quote",
+ "syn",
+]
+
 [[package]]
 name = "atoi"
 version = "0.4.0"
@@ -446,6 +457,7 @@ dependencies = [
  "actix-web-actors",
  "anyhow",
  "async-stream",
+ "async-trait",
  "backend",
  "backend-service",
  "bcrypt",

+ 2 - 0
backend/Cargo.toml

@@ -24,6 +24,7 @@ bytes = "1"
 toml = "0.5.8"
 dashmap = "4.0"
 log = "0.4.14"
+async-trait = "0.1.52"
 
 # tracing
 tracing = { version = "0.1", features = ["log"] }
@@ -34,6 +35,7 @@ tracing-appender = "0.1"
 tracing-core = "0.1"
 tracing-log = { version = "0.1.1"}
 
+
 # serde
 serde_json = "1.0"
 serde = { version = "1.0", features = ["derive"] }

+ 1 - 1
backend/src/context.rs

@@ -5,7 +5,7 @@ use crate::services::{
 use actix::Addr;
 use actix_web::web::Data;
 
-use crate::services::document::{controller::make_document_ws_receiver, persistence::DocumentKVPersistence};
+use crate::services::document::{persistence::DocumentKVPersistence, ws_receiver::make_document_ws_receiver};
 use lib_ws::WSModule;
 use sqlx::PgPool;
 use std::sync::Arc;

+ 3 - 3
backend/src/services/core/view/controller.rs

@@ -2,7 +2,7 @@ use crate::{
     entities::logged_user::LoggedUser,
     services::{
         core::{trash::read_trash_ids, view::persistence::*},
-        document::persistence::{create_doc, delete_doc, DocumentKVPersistence},
+        document::persistence::{create_document, delete_document, DocumentKVPersistence},
     },
     util::sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder},
 };
@@ -59,7 +59,7 @@ pub(crate) async fn delete_view(
             .await
             .map_err(map_sqlx_error)?;
 
-        let _ = delete_doc(kv_store, view_id).await?;
+        let _ = delete_document(kv_store, view_id).await?;
     }
     Ok(())
 }
@@ -100,7 +100,7 @@ pub(crate) async fn create_view(
     create_doc_params.set_revisions(repeated_revision);
     create_doc_params.set_id(view.id.clone());
 
-    let _ = create_doc(&kv_store, create_doc_params).await?;
+    let _ = create_document(&kv_store, create_doc_params).await?;
 
     Ok(view)
 }

+ 1 - 1
backend/src/services/document/mod.rs

@@ -1,6 +1,6 @@
 #![allow(clippy::module_inception)]
 
-pub(crate) mod controller;
 pub(crate) mod persistence;
 pub(crate) mod router;
 pub(crate) mod ws_actor;
+pub(crate) mod ws_receiver;

+ 78 - 2
backend/src/services/document/persistence/kv_store.rs → backend/src/services/document/persistence.rs

@@ -1,12 +1,62 @@
 use crate::{
+    context::FlowyPersistence,
     services::kv::{KVStore, KeyValue},
     util::serde_ext::parse_from_bytes,
 };
-use backend_service::errors::ServerError;
+use anyhow::Context;
+use backend_service::errors::{internal_error, ServerError};
 use bytes::Bytes;
-use flowy_collaboration::protobuf::{RepeatedRevision, Revision};
+use flowy_collaboration::protobuf::{
+    CreateDocParams,
+    DocIdentifier,
+    DocumentInfo,
+    RepeatedRevision,
+    ResetDocumentParams,
+    Revision,
+};
+use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta};
 use protobuf::Message;
+use sqlx::PgPool;
 use std::sync::Arc;
+use uuid::Uuid;
+
+#[tracing::instrument(level = "debug", skip(kv_store), err)]
+pub(crate) async fn create_document(
+    kv_store: &Arc<DocumentKVPersistence>,
+    mut params: CreateDocParams,
+) -> Result<(), ServerError> {
+    let revisions = params.take_revisions().take_items();
+    let _ = kv_store.batch_set_revision(revisions.into()).await?;
+    Ok(())
+}
+
+#[tracing::instrument(level = "debug", skip(persistence), err)]
+pub(crate) async fn read_document(
+    persistence: &Arc<FlowyPersistence>,
+    params: DocIdentifier,
+) -> Result<DocumentInfo, ServerError> {
+    let _ = Uuid::parse_str(&params.doc_id).context("Parse document id to uuid failed")?;
+
+    let kv_store = persistence.kv_store();
+    let revisions = kv_store.batch_get_revisions(&params.doc_id, None).await?;
+    make_doc_from_revisions(&params.doc_id, revisions)
+}
+
+#[tracing::instrument(level = "debug", skip(kv_store, params), fields(delta), err)]
+pub async fn reset_document(
+    kv_store: &Arc<DocumentKVPersistence>,
+    params: ResetDocumentParams,
+) -> Result<(), ServerError> {
+    // TODO: Reset document requires atomic operation
+    // let _ = kv_store.batch_delete_revisions(&doc_id.to_string(), None).await?;
+    todo!()
+}
+
+#[tracing::instrument(level = "debug", skip(kv_store), err)]
+pub(crate) async fn delete_document(kv_store: &Arc<DocumentKVPersistence>, doc_id: Uuid) -> Result<(), ServerError> {
+    let _ = kv_store.batch_delete_revisions(&doc_id.to_string(), None).await?;
+    Ok(())
+}
 
 pub struct DocumentKVPersistence {
     inner: Arc<dyn KVStore>,
@@ -111,3 +161,29 @@ fn key_value_items_to_revisions(items: Vec<KeyValue>) -> RepeatedRevision {
 
 #[inline]
 fn make_revision_key(doc_id: &str, rev_id: i64) -> String { format!("{}:{}", doc_id, rev_id) }
+
+#[inline]
+fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevision) -> Result<DocumentInfo, ServerError> {
+    let revisions = revisions.take_items();
+    if revisions.is_empty() {
+        return Err(ServerError::record_not_found().context(format!("{} not exist", doc_id)));
+    }
+
+    let mut document_delta = RichTextDelta::new();
+    let mut base_rev_id = 0;
+    let mut rev_id = 0;
+    // TODO: generate delta from revision should be wrapped into function.
+    for revision in revisions {
+        base_rev_id = revision.base_rev_id;
+        rev_id = revision.rev_id;
+        let delta = RichTextDelta::from_bytes(revision.delta_data).map_err(internal_error)?;
+        document_delta = document_delta.compose(&delta).map_err(internal_error)?;
+    }
+    let text = document_delta.to_json();
+    let mut document_info = DocumentInfo::new();
+    document_info.set_doc_id(doc_id.to_owned());
+    document_info.set_text(text);
+    document_info.set_base_rev_id(base_rev_id);
+    document_info.set_rev_id(rev_id);
+    Ok(document_info)
+}

+ 0 - 5
backend/src/services/document/persistence/mod.rs

@@ -1,5 +0,0 @@
-mod kv_store;
-mod postgres;
-
-pub use kv_store::*;
-pub use postgres::*;

+ 0 - 78
backend/src/services/document/persistence/postgres.rs

@@ -1,78 +0,0 @@
-use crate::{context::FlowyPersistence, services::document::persistence::DocumentKVPersistence};
-use anyhow::Context;
-use backend_service::errors::{internal_error, ServerError};
-use flowy_collaboration::protobuf::{
-    CreateDocParams,
-    DocIdentifier,
-    DocumentInfo,
-    RepeatedRevision,
-    ResetDocumentParams,
-};
-use lib_ot::{core::OperationTransformable, rich_text::RichTextDelta};
-use sqlx::PgPool;
-use std::sync::Arc;
-use uuid::Uuid;
-
-#[tracing::instrument(level = "debug", skip(kv_store), err)]
-pub(crate) async fn create_doc(
-    kv_store: &Arc<DocumentKVPersistence>,
-    mut params: CreateDocParams,
-) -> Result<(), ServerError> {
-    let revisions = params.take_revisions().take_items();
-    let _ = kv_store.batch_set_revision(revisions.into()).await?;
-    Ok(())
-}
-
-#[tracing::instrument(level = "debug", skip(persistence), err)]
-pub(crate) async fn read_doc(
-    persistence: &Arc<FlowyPersistence>,
-    params: DocIdentifier,
-) -> Result<DocumentInfo, ServerError> {
-    let _ = Uuid::parse_str(&params.doc_id).context("Parse document id to uuid failed")?;
-
-    let kv_store = persistence.kv_store();
-    let revisions = kv_store.batch_get_revisions(&params.doc_id, None).await?;
-    make_doc_from_revisions(&params.doc_id, revisions)
-}
-
-#[tracing::instrument(level = "debug", skip(_pool, _params), fields(delta), err)]
-pub async fn reset_document(_pool: &PgPool, _params: ResetDocumentParams) -> Result<(), ServerError> {
-    unimplemented!()
-}
-
-#[tracing::instrument(level = "debug", skip(kv_store), err)]
-pub(crate) async fn delete_doc(kv_store: &Arc<DocumentKVPersistence>, doc_id: Uuid) -> Result<(), ServerError> {
-    let _ = kv_store.batch_delete_revisions(&doc_id.to_string(), None).await?;
-    Ok(())
-}
-
-#[derive(Debug, Clone, sqlx::FromRow)]
-struct DocTable {
-    id: uuid::Uuid,
-    rev_id: i64,
-}
-
-fn make_doc_from_revisions(doc_id: &str, mut revisions: RepeatedRevision) -> Result<DocumentInfo, ServerError> {
-    let revisions = revisions.take_items();
-    if revisions.is_empty() {
-        return Err(ServerError::record_not_found().context(format!("{} not exist", doc_id)));
-    }
-    
-    let mut document_delta = RichTextDelta::new();
-    let mut base_rev_id = 0;
-    let mut rev_id = 0;
-    // TODO: generate delta from revision should be wrapped into function.
-    for revision in revisions {
-        base_rev_id = revision.base_rev_id;
-        rev_id = revision.rev_id;
-        let delta = RichTextDelta::from_bytes(revision.delta_data).map_err(internal_error)?;
-        document_delta = document_delta.compose(&delta).map_err(internal_error)?;
-    }
-    let text = document_delta.to_json();
-    let mut document_info = DocumentInfo::new();
-    document_info.set_doc_id(doc_id.to_owned());
-    document_info.set_text(text);
-    document_info.set_base_rev_id(base_rev_id);
-    document_info.set_rev_id(rev_id);
-    Ok(document_info)
-}

+ 11 - 7
backend/src/services/document/router.rs

@@ -1,6 +1,6 @@
 use crate::{
     context::FlowyPersistence,
-    services::document::persistence::{create_doc, read_doc},
+    services::document::persistence::{create_document, read_document, reset_document},
     util::serde_ext::parse_from_payload,
 };
 use actix_web::{
@@ -18,7 +18,7 @@ pub async fn create_document_handler(
 ) -> Result<HttpResponse, ServerError> {
     let params: CreateDocParams = parse_from_payload(payload).await?;
     let kv_store = persistence.kv_store();
-    let _ = create_doc(&kv_store, params).await?;
+    let _ = create_document(&kv_store, params).await?;
     Ok(FlowyResponse::success().into())
 }
 
@@ -28,13 +28,17 @@ pub async fn read_document_handler(
     persistence: Data<Arc<FlowyPersistence>>,
 ) -> Result<HttpResponse, ServerError> {
     let params: DocIdentifier = parse_from_payload(payload).await?;
-    let doc = read_doc(persistence.get_ref(), params).await?;
+    let doc = read_document(persistence.get_ref(), params).await?;
     let response = FlowyResponse::success().pb(doc)?;
     Ok(response.into())
 }
 
-pub async fn reset_document_handler(payload: Payload, _pool: Data<PgPool>) -> Result<HttpResponse, ServerError> {
-    let _params: ResetDocumentParams = parse_from_payload(payload).await?;
-    // Ok(FlowyResponse::success().into())
-    unimplemented!()
+pub async fn reset_document_handler(
+    payload: Payload,
+    persistence: Data<Arc<FlowyPersistence>>,
+) -> Result<HttpResponse, ServerError> {
+    let params: ResetDocumentParams = parse_from_payload(payload).await?;
+    let kv_store = persistence.kv_store();
+    let _ = reset_document(&kv_store, params).await?;
+    Ok(FlowyResponse::success().into())
 }

+ 12 - 12
backend/src/services/document/controller.rs → backend/src/services/document/ws_receiver.rs

@@ -1,6 +1,6 @@
 use crate::services::{
     document::{
-        persistence::{create_doc, read_doc},
+        persistence::{create_document, read_document},
         ws_actor::{DocumentWebSocketActor, WSActorMessage},
     },
     web_socket::{WSClientData, WebSocketReceiver},
@@ -16,7 +16,7 @@ use flowy_collaboration::{
     errors::CollaborateError,
     protobuf::DocIdentifier,
 };
-use lib_infra::future::FutureResultSend;
+use lib_infra::future::{BoxResultFuture, FutureResultSend};
 
 use flowy_collaboration::sync::{DocumentPersistence, ServerDocumentManager};
 use std::{
@@ -78,14 +78,14 @@ impl Debug for DocumentPersistenceImpl {
 }
 
 impl DocumentPersistence for DocumentPersistenceImpl {
-    fn read_doc(&self, doc_id: &str) -> FutureResultSend<DocumentInfo, CollaborateError> {
+    fn read_doc(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
         let params = DocIdentifier {
             doc_id: doc_id.to_string(),
             ..Default::default()
         };
         let persistence = self.0.clone();
-        FutureResultSend::new(async move {
-            let mut pb_doc = read_doc(&persistence, params)
+        Box::pin(async move {
+            let mut pb_doc = read_document(&persistence, params)
                 .await
                 .map_err(server_error_to_collaborate_error)?;
             let doc = (&mut pb_doc)
@@ -95,23 +95,23 @@ impl DocumentPersistence for DocumentPersistenceImpl {
         })
     }
 
-    fn create_doc(&self, doc_id: &str, revisions: Vec<Revision>) -> FutureResultSend<DocumentInfo, CollaborateError> {
+    fn create_doc(&self, doc_id: &str, revisions: Vec<Revision>) -> BoxResultFuture<DocumentInfo, CollaborateError> {
         let kv_store = self.0.kv_store();
         let doc_id = doc_id.to_owned();
-        FutureResultSend::new(async move {
+        Box::pin(async move {
             let doc = DocumentInfo::from_revisions(&doc_id, revisions.clone())?;
             let doc_id = doc_id.to_owned();
             let revisions = RepeatedRevision::new(revisions);
             let params = CreateDocParams { id: doc_id, revisions };
             let pb_params: flowy_collaboration::protobuf::CreateDocParams = params.try_into().unwrap();
-            let _ = create_doc(&kv_store, pb_params)
+            let _ = create_document(&kv_store, pb_params)
                 .await
                 .map_err(server_error_to_collaborate_error)?;
             Ok(doc)
         })
     }
 
-    fn get_revisions(&self, doc_id: &str, rev_ids: Vec<i64>) -> FutureResultSend<Vec<Revision>, CollaborateError> {
+    fn get_revisions(&self, doc_id: &str, rev_ids: Vec<i64>) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
         let kv_store = self.0.kv_store();
         let doc_id = doc_id.to_owned();
         let f = || async move {
@@ -123,10 +123,10 @@ impl DocumentPersistence for DocumentPersistenceImpl {
             Ok(revisions)
         };
 
-        FutureResultSend::new(async move { f().await.map_err(server_error_to_collaborate_error) })
+        Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
     }
 
-    fn get_doc_revisions(&self, doc_id: &str) -> FutureResultSend<Vec<Revision>, CollaborateError> {
+    fn get_doc_revisions(&self, doc_id: &str) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
         let kv_store = self.0.kv_store();
         let doc_id = doc_id.to_owned();
         let f = || async move {
@@ -136,7 +136,7 @@ impl DocumentPersistence for DocumentPersistenceImpl {
             Ok(revisions)
         };
 
-        FutureResultSend::new(async move { f().await.map_err(server_error_to_collaborate_error) })
+        Box::pin(async move { f().await.map_err(server_error_to_collaborate_error) })
     }
 }
 

+ 143 - 184
backend/src/services/kv/kv.rs

@@ -1,12 +1,13 @@
 use crate::{
-    services::kv::{KVStore, KeyValue},
-    util::sqlx_ext::{map_sqlx_error, SqlBuilder},
+    services::kv::{KVAction, KVStore, KeyValue},
+    util::sqlx_ext::{map_sqlx_error, DBTransaction, SqlBuilder},
 };
-
 use anyhow::Context;
+use async_trait::async_trait;
 use backend_service::errors::ServerError;
 use bytes::Bytes;
-use lib_infra::future::FutureResultSend;
+use futures_core::future::BoxFuture;
+use lib_infra::future::{BoxResultFuture, FutureResultSend};
 use sql_builder::SqlBuilder as RawSqlBuilder;
 use sqlx::{
     postgres::{PgArguments, PgRow},
@@ -16,6 +17,7 @@ use sqlx::{
     Postgres,
     Row,
 };
+use std::{future::Future, pin::Pin};
 
 const KV_TABLE: &str = "kv_table";
 
@@ -23,221 +25,178 @@ pub(crate) struct PostgresKV {
     pub(crate) pg_pool: PgPool,
 }
 
-impl KVStore for PostgresKV {
-    fn get(&self, key: &str) -> FutureResultSend<Option<Bytes>, ServerError> {
-        let pg_pool = self.pg_pool.clone();
-        let id = key.to_string();
-        FutureResultSend::new(async move {
-            let mut transaction = pg_pool
-                .begin()
-                .await
-                .context("[KV]:Failed to acquire a Postgres connection")?;
-
-            let (sql, args) = SqlBuilder::select(KV_TABLE)
-                .add_field("*")
-                .and_where_eq("id", &id)
-                .build()?;
+impl PostgresKV {
+    async fn transaction<F, O>(&self, f: F) -> Result<O, ServerError>
+    where
+        F: for<'a> FnOnce(&'a mut DBTransaction<'_>) -> BoxFuture<'a, Result<O, ServerError>>,
+    {
+        let mut transaction = self
+            .pg_pool
+            .begin()
+            .await
+            .context("[KV]:Failed to acquire a Postgres connection")?;
+
+        let result = f(&mut transaction).await;
+
+        transaction
+            .commit()
+            .await
+            .context("[KV]:Failed to commit SQL transaction.")?;
+
+        result
+    }
+}
 
-            let result = sqlx::query_as_with::<Postgres, KVTable, PgArguments>(&sql, args)
-                .fetch_one(&mut transaction)
-                .await;
+impl KVStore for PostgresKV {}
 
-            let result = match result {
-                Ok(val) => Ok(Some(Bytes::from(val.blob))),
-                Err(error) => match error {
-                    Error::RowNotFound => Ok(None),
-                    _ => Err(map_sqlx_error(error)),
-                },
-            };
+pub(crate) struct PostgresTransaction<'a> {
+    pub(crate) transaction: DBTransaction<'a>,
+}
 
-            transaction
-                .commit()
-                .await
-                .context("[KV]:Failed to commit SQL transaction.")?;
+impl<'a> PostgresTransaction<'a> {}
 
-            result
+#[async_trait]
+impl KVAction for PostgresKV {
+    async fn get(&self, key: &str) -> Result<Option<Bytes>, ServerError> {
+        let id = key.to_string();
+        self.transaction(|transaction| {
+            Box::pin(async move {
+                let (sql, args) = SqlBuilder::select(KV_TABLE)
+                    .add_field("*")
+                    .and_where_eq("id", &id)
+                    .build()?;
+
+                let result = sqlx::query_as_with::<Postgres, KVTable, PgArguments>(&sql, args)
+                    .fetch_one(transaction)
+                    .await;
+
+                let result = match result {
+                    Ok(val) => Ok(Some(Bytes::from(val.blob))),
+                    Err(error) => match error {
+                        Error::RowNotFound => Ok(None),
+                        _ => Err(map_sqlx_error(error)),
+                    },
+                };
+                result
+            })
         })
+        .await
     }
 
-    fn set(&self, key: &str, bytes: Bytes) -> FutureResultSend<(), ServerError> {
+    async fn set(&self, key: &str, bytes: Bytes) -> Result<(), ServerError> {
         self.batch_set(vec![KeyValue {
             key: key.to_string(),
             value: bytes,
         }])
+        .await
     }
 
-    fn delete(&self, key: &str) -> FutureResultSend<(), ServerError> {
-        let pg_pool = self.pg_pool.clone();
+    async fn remove(&self, key: &str) -> Result<(), ServerError> {
         let id = key.to_string();
-
-        FutureResultSend::new(async move {
-            let mut transaction = pg_pool
-                .begin()
-                .await
-                .context("[KV]:Failed to acquire a Postgres connection")?;
-
-            let (sql, args) = SqlBuilder::delete(KV_TABLE).and_where_eq("id", &id).build()?;
-            let _ = sqlx::query_with(&sql, args)
-                .execute(&mut transaction)
-                .await
-                .map_err(map_sqlx_error)?;
-
-            transaction
-                .commit()
-                .await
-                .context("[KV]:Failed to commit SQL transaction.")?;
-
-            Ok(())
+        self.transaction(|transaction| {
+            Box::pin(async move {
+                let (sql, args) = SqlBuilder::delete(KV_TABLE).and_where_eq("id", &id).build()?;
+                let _ = sqlx::query_with(&sql, args)
+                    .execute(transaction)
+                    .await
+                    .map_err(map_sqlx_error)?;
+                Ok(())
+            })
         })
+        .await
     }
 
-    fn batch_set(&self, kvs: Vec<KeyValue>) -> FutureResultSend<(), ServerError> {
-        let pg_pool = self.pg_pool.clone();
-        FutureResultSend::new(async move {
-            let mut transaction = pg_pool
-                .begin()
-                .await
-                .context("[KV]:Failed to acquire a Postgres connection")?;
-
-            SqlBuilder::create(KV_TABLE).add_field("id").add_field("blob");
-            let mut builder = RawSqlBuilder::insert_into(KV_TABLE);
-            let m_builder = builder.field("id").field("blob");
-
-            let mut args = PgArguments::default();
-            kvs.iter().enumerate().for_each(|(index, _)| {
-                let index = index * 2 + 1;
-                m_builder.values(&[format!("${}", index), format!("${}", index + 1)]);
-            });
-
-            for kv in kvs {
-                args.add(kv.key);
-                args.add(kv.value.to_vec());
-            }
-
-            let sql = m_builder.sql()?;
-            let _ = sqlx::query_with(&sql, args)
-                .execute(&mut transaction)
-                .await
-                .map_err(map_sqlx_error)?;
-
-            transaction
-                .commit()
-                .await
-                .context("[KV]:Failed to commit SQL transaction.")?;
-
-            Ok::<(), ServerError>(())
+    async fn batch_set(&self, kvs: Vec<KeyValue>) -> Result<(), ServerError> {
+        self.transaction(|transaction| {
+            Box::pin(async move {
+                let mut builder = RawSqlBuilder::insert_into(KV_TABLE);
+                let m_builder = builder.field("id").field("blob");
+
+                let mut args = PgArguments::default();
+                kvs.iter().enumerate().for_each(|(index, _)| {
+                    let index = index * 2 + 1;
+                    m_builder.values(&[format!("${}", index), format!("${}", index + 1)]);
+                });
+
+                for kv in kvs {
+                    args.add(kv.key);
+                    args.add(kv.value.to_vec());
+                }
+
+                let sql = m_builder.sql()?;
+                let _ = sqlx::query_with(&sql, args)
+                    .execute(transaction)
+                    .await
+                    .map_err(map_sqlx_error)?;
+
+                Ok::<(), ServerError>(())
+            })
         })
+        .await
     }
 
-    fn batch_get(&self, keys: Vec<String>) -> FutureResultSend<Vec<KeyValue>, ServerError> {
-        let pg_pool = self.pg_pool.clone();
-        FutureResultSend::new(async move {
-            let mut transaction = pg_pool
-                .begin()
-                .await
-                .context("[KV]:Failed to acquire a Postgres connection")?;
-
-            let sql = RawSqlBuilder::select_from(KV_TABLE)
-                .field("id")
-                .field("blob")
-                .and_where_in_quoted("id", &keys)
-                .sql()?;
-
-            let rows = sqlx::query(&sql)
-                .fetch_all(&mut transaction)
-                .await
-                .map_err(map_sqlx_error)?;
-            let kvs = rows_to_key_values(rows);
-
-            transaction
-                .commit()
-                .await
-                .context("[KV]:Failed to commit SQL transaction.")?;
-
-            Ok::<Vec<KeyValue>, ServerError>(kvs)
+    async fn batch_get(&self, keys: Vec<String>) -> Result<Vec<KeyValue>, ServerError> {
+        self.transaction(|transaction| {
+            Box::pin(async move {
+                let sql = RawSqlBuilder::select_from(KV_TABLE)
+                    .field("id")
+                    .field("blob")
+                    .and_where_in_quoted("id", &keys)
+                    .sql()?;
+
+                let rows = sqlx::query(&sql).fetch_all(transaction).await.map_err(map_sqlx_error)?;
+                let kvs = rows_to_key_values(rows);
+                Ok::<Vec<KeyValue>, ServerError>(kvs)
+            })
         })
+        .await
     }
 
-    fn batch_get_start_with(&self, key: &str) -> FutureResultSend<Vec<KeyValue>, ServerError> {
-        let pg_pool = self.pg_pool.clone();
-        let prefix = key.to_owned();
-        FutureResultSend::new(async move {
-            let mut transaction = pg_pool
-                .begin()
-                .await
-                .context("[KV]:Failed to acquire a Postgres connection")?;
-
-            let sql = RawSqlBuilder::select_from(KV_TABLE)
-                .field("id")
-                .field("blob")
-                .and_where_like_left("id", &prefix)
-                .sql()?;
-
-            let rows = sqlx::query(&sql)
-                .fetch_all(&mut transaction)
-                .await
-                .map_err(map_sqlx_error)?;
+    async fn batch_delete(&self, keys: Vec<String>) -> Result<(), ServerError> {
+        self.transaction(|transaction| {
+            Box::pin(async move {
+                let sql = RawSqlBuilder::delete_from(KV_TABLE).and_where_in("id", &keys).sql()?;
+                let _ = sqlx::query(&sql).execute(transaction).await.map_err(map_sqlx_error)?;
 
-            let kvs = rows_to_key_values(rows);
-
-            transaction
-                .commit()
-                .await
-                .context("[KV]:Failed to commit SQL transaction.")?;
-
-            Ok::<Vec<KeyValue>, ServerError>(kvs)
+                Ok::<(), ServerError>(())
+            })
         })
+        .await
     }
 
-    fn batch_delete(&self, keys: Vec<String>) -> FutureResultSend<(), ServerError> {
-        let pg_pool = self.pg_pool.clone();
-        FutureResultSend::new(async move {
-            let mut transaction = pg_pool
-                .begin()
-                .await
-                .context("[KV]:Failed to acquire a Postgres connection")?;
-
-            let sql = RawSqlBuilder::delete_from(KV_TABLE).and_where_in("id", &keys).sql()?;
+    async fn batch_get_start_with(&self, key: &str) -> Result<Vec<KeyValue>, ServerError> {
+        let prefix = key.to_owned();
+        self.transaction(|transaction| {
+            Box::pin(async move {
+                let sql = RawSqlBuilder::select_from(KV_TABLE)
+                    .field("id")
+                    .field("blob")
+                    .and_where_like_left("id", &prefix)
+                    .sql()?;
 
-            let _ = sqlx::query(&sql)
-                .execute(&mut transaction)
-                .await
-                .map_err(map_sqlx_error)?;
+                let rows = sqlx::query(&sql).fetch_all(transaction).await.map_err(map_sqlx_error)?;
 
-            transaction
-                .commit()
-                .await
-                .context("[KV]:Failed to commit SQL transaction.")?;
+                let kvs = rows_to_key_values(rows);
 
-            Ok::<(), ServerError>(())
+                Ok::<Vec<KeyValue>, ServerError>(kvs)
+            })
         })
+        .await
     }
 
-    fn batch_delete_key_start_with(&self, keyword: &str) -> FutureResultSend<(), ServerError> {
-        let pg_pool = self.pg_pool.clone();
+    async fn batch_delete_key_start_with(&self, keyword: &str) -> Result<(), ServerError> {
         let keyword = keyword.to_owned();
-        FutureResultSend::new(async move {
-            let mut transaction = pg_pool
-                .begin()
-                .await
-                .context("[KV]:Failed to acquire a Postgres connection")?;
-
-            let sql = RawSqlBuilder::delete_from(KV_TABLE)
-                .and_where_like_left("id", &keyword)
-                .sql()?;
-
-            let _ = sqlx::query(&sql)
-                .execute(&mut transaction)
-                .await
-                .map_err(map_sqlx_error)?;
-
-            transaction
-                .commit()
-                .await
-                .context("[KV]:Failed to commit SQL transaction.")?;
-
-            Ok::<(), ServerError>(())
+        self.transaction(|transaction| {
+            Box::pin(async move {
+                let sql = RawSqlBuilder::delete_from(KV_TABLE)
+                    .and_where_like_left("id", &keyword)
+                    .sql()?;
+
+                let _ = sqlx::query(&sql).execute(transaction).await.map_err(map_sqlx_error)?;
+                Ok::<(), ServerError>(())
+            })
         })
+        .await
     }
 }
 

+ 25 - 10
backend/src/services/kv/mod.rs

@@ -1,11 +1,13 @@
 #![allow(clippy::module_inception)]
 mod kv;
 
+use async_trait::async_trait;
 use bytes::Bytes;
+use futures_core::future::BoxFuture;
 pub(crate) use kv::*;
 
 use backend_service::errors::ServerError;
-use lib_infra::future::FutureResultSend;
+use lib_infra::future::{BoxResultFuture, FutureResultSend};
 
 #[derive(Clone, Debug, PartialEq, Eq)]
 pub struct KeyValue {
@@ -13,14 +15,27 @@ pub struct KeyValue {
     pub value: Bytes,
 }
 
-pub trait KVStore: Send + Sync {
-    fn get(&self, key: &str) -> FutureResultSend<Option<Bytes>, ServerError>;
-    fn set(&self, key: &str, value: Bytes) -> FutureResultSend<(), ServerError>;
-    fn delete(&self, key: &str) -> FutureResultSend<(), ServerError>;
-    fn batch_set(&self, kvs: Vec<KeyValue>) -> FutureResultSend<(), ServerError>;
-    fn batch_get(&self, keys: Vec<String>) -> FutureResultSend<Vec<KeyValue>, ServerError>;
-    fn batch_get_start_with(&self, key: &str) -> FutureResultSend<Vec<KeyValue>, ServerError>;
+// https://rust-lang.github.io/async-book/07_workarounds/05_async_in_traits.html
+// Note that using these trait methods will result in a heap allocation
+// per-function-call. This is not a significant cost for the vast majority of
+// applications, but should be considered when deciding whether to use this
+// functionality in the public API of a low-level function that is expected to
+// be called millions of times a second.
+#[async_trait]
+pub trait KVStore: KVAction + Send + Sync {}
 
-    fn batch_delete(&self, keys: Vec<String>) -> FutureResultSend<(), ServerError>;
-    fn batch_delete_key_start_with(&self, keyword: &str) -> FutureResultSend<(), ServerError>;
+// pub trait KVTransaction
+
+#[async_trait]
+pub trait KVAction: Send + Sync {
+    async fn get(&self, key: &str) -> Result<Option<Bytes>, ServerError>;
+    async fn set(&self, key: &str, value: Bytes) -> Result<(), ServerError>;
+    async fn remove(&self, key: &str) -> Result<(), ServerError>;
+
+    async fn batch_set(&self, kvs: Vec<KeyValue>) -> Result<(), ServerError>;
+    async fn batch_get(&self, keys: Vec<String>) -> Result<Vec<KeyValue>, ServerError>;
+    async fn batch_delete(&self, keys: Vec<String>) -> Result<(), ServerError>;
+
+    async fn batch_get_start_with(&self, key: &str) -> Result<Vec<KeyValue>, ServerError>;
+    async fn batch_delete_key_start_with(&self, keyword: &str) -> Result<(), ServerError>;
 }

+ 1 - 1
backend/tests/api_test/kv_test.rs

@@ -23,7 +23,7 @@ async fn kv_delete_test() {
     let key = "1";
 
     let _ = kv.set(key, s1.clone().into()).await.unwrap();
-    let _ = kv.delete(key).await.unwrap();
+    let _ = kv.remove(key).await.unwrap();
     assert_eq!(kv.get(key).await.unwrap(), None);
 }
 

+ 31 - 23
backend/tests/document_test/edit_script.rs

@@ -1,7 +1,7 @@
 #![allow(clippy::all)]
 #![cfg_attr(rustfmt, rustfmt::skip)]
+use std::convert::TryInto;
 use actix_web::web::Data;
-use backend::services::doc::{crud::update_doc};
 use flowy_document::services::doc::edit::ClientDocEditor as ClientEditDocContext;
 use flowy_test::{helper::ViewTest, FlowySDKTest};
 use flowy_user::services::user::UserSession;
@@ -14,9 +14,11 @@ use crate::util::helper::{spawn_server, TestServer};
 use flowy_collaboration::{entities::doc::DocIdentifier, protobuf::ResetDocumentParams};
 use lib_ot::rich_text::{RichTextAttribute, RichTextDelta};
 use parking_lot::RwLock;
+use flowy_collaboration::entities::revision::RepeatedRevision;
 use lib_ot::core::Interval;
-use flowy_collaboration::core::sync::ServerDocManager;
-use flowy_net::services::ws::WsManager;
+
+use flowy_net::services::ws::FlowyWSConnect;
+use crate::util::helper::*;
 
 pub struct DocumentTest {
     server: TestServer,
@@ -30,7 +32,7 @@ pub enum DocScript {
     ClientOpenDoc,
     AssertClient(&'static str),
     AssertServer(&'static str, i64),
-    ServerSaveDocument(String, i64), // delta_json, rev_id
+    ServerSaveDocument(RepeatedRevision), // delta_json, rev_id
 }
 
 impl DocumentTest {
@@ -54,9 +56,8 @@ struct ScriptContext {
     client_edit_context: Option<Arc<ClientEditDocContext>>,
     client_sdk: FlowySDKTest,
     client_user_session: Arc<UserSession>,
-    ws_manager: Arc<WsManager>,
-    server_doc_manager: Arc<ServerDocManager>,
-    server_pg_pool: Data<PgPool>,
+    ws_conn: Arc<FlowyWSConnect>,
+    server: TestServer,
     doc_id: String,
 }
 
@@ -70,9 +71,8 @@ impl ScriptContext {
             client_edit_context: None,
             client_sdk,
             client_user_session: user_session,
-            ws_manager,
-            server_doc_manager: server.app_ctx.document_core.manager.clone(),
-            server_pg_pool: Data::new(server.pg_pool.clone()),
+            ws_conn: ws_manager,
+            server,
             doc_id,
         }
     }
@@ -97,7 +97,7 @@ async fn run_scripts(context: Arc<RwLock<ScriptContext>>, scripts: Vec<DocScript
             match script {
                 DocScript::ClientConnectWS => {
                     // sleep(Duration::from_millis(300)).await;
-                    let ws_manager = context.read().ws_manager.clone();
+                    let ws_manager = context.read().ws_conn.clone();
                     let user_session = context.read().client_user_session.clone();
                     let token = user_session.token().unwrap();
                     let _ = ws_manager.start(token).await.unwrap();
@@ -123,16 +123,24 @@ async fn run_scripts(context: Arc<RwLock<ScriptContext>>, scripts: Vec<DocScript
                 },
                 DocScript::AssertServer(s, rev_id) => {
                     sleep(Duration::from_millis(100)).await;
+                    
+                    // let doc_identifier = DocIdentifier {
+                    //     doc_id
+                    // };
+                    // 
+                    // let doc = context.read().server.read_doc()
+                    
+                    
                     // let pg_pool = context.read().server_pg_pool.clone();
-                    let doc_manager = context.read().server_doc_manager.clone();
-                    let edit_doc = doc_manager.get(&doc_id).await.unwrap();
-                    let json = edit_doc.document_json().await.unwrap();
-                    assert_eq(s, &json);
-                    assert_eq!(edit_doc.rev_id().await.unwrap(), rev_id);
+                    // let doc_manager = context.read().server_doc_manager.clone();
+                    // let edit_doc = doc_manager.get(&doc_id).await.unwrap();
+                    // let json = edit_doc.document_json().await.unwrap();
+                    // assert_eq(s, &json);
+                    // assert_eq!(edit_doc.rev_id().await.unwrap(), rev_id);
                 },
-                DocScript::ServerSaveDocument(json, rev_id) => {
-                    let pg_pool = context.read().server_pg_pool.clone();
-                    save_doc(&doc_id, json, rev_id, pg_pool).await;
+                DocScript::ServerSaveDocument(repeated_revision) => {
+                    let pg_pool = Data::new(context.read().server.pg_pool.clone());
+                    reset_doc(&doc_id, repeated_revision, pg_pool).await;
                 },
                 // DocScript::Sleep(sec) => {
                 //     sleep(Duration::from_secs(sec)).await;
@@ -166,10 +174,10 @@ async fn create_doc(flowy_test: &FlowySDKTest) -> String {
     view_test.view.id
 }
 
-async fn save_doc(doc_id: &str, json: String, rev_id: i64, pool: Data<PgPool>) {
+async fn reset_doc(doc_id: &str, repeated_revision: RepeatedRevision, pool: Data<PgPool>) {
+    let pb: flowy_collaboration::protobuf::RepeatedRevision = repeated_revision.try_into().unwrap();
     let mut params = ResetDocumentParams::new();
     params.set_doc_id(doc_id.to_owned());
-    params.set_data(json);
-    params.set_rev_id(rev_id);
-    let _ = update_doc(pool.get_ref(), params).await.unwrap();
+    params.set_revisions(pb);
+    // let _ = reset_document_handler(pool.get_ref(), params).await.unwrap();
 }

+ 2 - 2
backend/tests/document_test/edit_test.rs

@@ -1,5 +1,5 @@
-use crate::document::edit_script::{DocScript, DocumentTest};
-use flowy_collaboration::core::document::{Document, FlowyDoc};
+use crate::document_test::edit_script::{DocScript, DocumentTest};
+use flowy_collaboration::document::{Document, FlowyDoc};
 use lib_ot::{core::Interval, rich_text::RichTextAttribute};
 
 #[rustfmt::skip]

+ 2 - 2
backend/tests/document_test/mod.rs

@@ -1,2 +1,2 @@
-// mod edit_script;
-// mod edit_test;
+mod edit_script;
+mod edit_test;

+ 5 - 6
backend/tests/util/helper.rs

@@ -17,10 +17,9 @@ use sqlx::{Connection, Executor, PgConnection, PgPool};
 use uuid::Uuid;
 
 pub struct TestUserServer {
-    pub pg_pool: PgPool,
+    pub inner: TestServer,
     pub user_token: Option<String>,
     pub user_id: Option<String>,
-    pub client_server_config: ClientServerConfiguration,
 }
 
 impl TestUserServer {
@@ -176,12 +175,12 @@ impl TestUserServer {
         response
     }
 
-    pub fn http_addr(&self) -> String { self.client_server_config.base_url() }
+    pub fn http_addr(&self) -> String { self.inner.client_server_config.base_url() }
 
     pub fn ws_addr(&self) -> String {
         format!(
             "{}/{}",
-            self.client_server_config.ws_addr(),
+            self.inner.client_server_config.ws_addr(),
             self.user_token.as_ref().unwrap()
         )
     }
@@ -190,10 +189,9 @@ impl TestUserServer {
 impl std::convert::From<TestServer> for TestUserServer {
     fn from(server: TestServer) -> Self {
         TestUserServer {
-            pg_pool: server.pg_pool,
+            inner: server,
             user_token: None,
             user_id: None,
-            client_server_config: server.client_server_config,
         }
     }
 }
@@ -203,6 +201,7 @@ pub async fn spawn_user_server() -> TestUserServer {
     server
 }
 
+#[derive(Clone)]
 pub struct TestServer {
     pub pg_pool: PgPool,
     pub app_ctx: AppContext,

+ 1 - 1
frontend/rust-lib/flowy-test/tests/main.rs

@@ -1 +1 @@
-mod revision_test;
+// mod revision_test;

+ 8 - 8
frontend/rust-lib/flowy-virtual-net/src/mock/server.rs

@@ -2,7 +2,7 @@ use bytes::Bytes;
 use dashmap::DashMap;
 use flowy_collaboration::{entities::prelude::*, errors::CollaborateError, sync::*};
 // use flowy_net::services::ws::*;
-use lib_infra::future::FutureResultSend;
+use lib_infra::future::{BoxResultFuture, FutureResultSend};
 use lib_ws::{WSModule, WebSocketRawMessage};
 use std::{
     convert::TryInto,
@@ -61,10 +61,10 @@ impl std::default::Default for MockDocServerPersistence {
 }
 
 impl DocumentPersistence for MockDocServerPersistence {
-    fn read_doc(&self, doc_id: &str) -> FutureResultSend<DocumentInfo, CollaborateError> {
+    fn read_doc(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError> {
         let inner = self.inner.clone();
         let doc_id = doc_id.to_owned();
-        FutureResultSend::new(async move {
+        Box::pin(async move {
             match inner.get(&doc_id) {
                 None => {
                     //
@@ -78,16 +78,16 @@ impl DocumentPersistence for MockDocServerPersistence {
         })
     }
 
-    fn create_doc(&self, doc_id: &str, revisions: Vec<Revision>) -> FutureResultSend<DocumentInfo, CollaborateError> {
+    fn create_doc(&self, doc_id: &str, revisions: Vec<Revision>) -> BoxResultFuture<DocumentInfo, CollaborateError> {
         let doc_id = doc_id.to_owned();
-        FutureResultSend::new(async move { DocumentInfo::from_revisions(&doc_id, revisions) })
+        Box::pin(async move { DocumentInfo::from_revisions(&doc_id, revisions) })
     }
 
-    fn get_revisions(&self, _doc_id: &str, _rev_ids: Vec<i64>) -> FutureResultSend<Vec<Revision>, CollaborateError> {
-        FutureResultSend::new(async move { Ok(vec![]) })
+    fn get_revisions(&self, _doc_id: &str, _rev_ids: Vec<i64>) -> BoxResultFuture<Vec<Revision>, CollaborateError> {
+        Box::pin(async move { Ok(vec![]) })
     }
 
-    fn get_doc_revisions(&self, _doc_id: &str) -> FutureResultSend<Vec<Revision>, CollaborateError> { unimplemented!() }
+    fn get_doc_revisions(&self, _doc_id: &str) -> BoxResultFuture<Vec<Revision>, CollaborateError> { unimplemented!() }
 }
 
 #[derive(Debug)]

+ 5 - 5
shared-lib/flowy-collaboration/src/sync/server.rs

@@ -12,7 +12,7 @@ use crate::{
 use async_stream::stream;
 use dashmap::DashMap;
 use futures::stream::StreamExt;
-use lib_infra::future::FutureResultSend;
+use lib_infra::future::{BoxResultFuture, FutureResultSend};
 use lib_ot::rich_text::RichTextDelta;
 use std::{convert::TryFrom, fmt::Debug, sync::Arc};
 use tokio::{
@@ -21,10 +21,10 @@ use tokio::{
 };
 
 pub trait DocumentPersistence: Send + Sync + Debug {
-    fn read_doc(&self, doc_id: &str) -> FutureResultSend<DocumentInfo, CollaborateError>;
-    fn create_doc(&self, doc_id: &str, revisions: Vec<Revision>) -> FutureResultSend<DocumentInfo, CollaborateError>;
-    fn get_revisions(&self, doc_id: &str, rev_ids: Vec<i64>) -> FutureResultSend<Vec<Revision>, CollaborateError>;
-    fn get_doc_revisions(&self, doc_id: &str) -> FutureResultSend<Vec<Revision>, CollaborateError>;
+    fn read_doc(&self, doc_id: &str) -> BoxResultFuture<DocumentInfo, CollaborateError>;
+    fn create_doc(&self, doc_id: &str, revisions: Vec<Revision>) -> BoxResultFuture<DocumentInfo, CollaborateError>;
+    fn get_revisions(&self, doc_id: &str, rev_ids: Vec<i64>) -> BoxResultFuture<Vec<Revision>, CollaborateError>;
+    fn get_doc_revisions(&self, doc_id: &str) -> BoxResultFuture<Vec<Revision>, CollaborateError>;
 }
 
 pub struct ServerDocumentManager {

+ 3 - 1
shared-lib/lib-infra/src/future.rs

@@ -1,4 +1,4 @@
-use futures_core::ready;
+use futures_core::{future::BoxFuture, ready};
 use pin_project::pin_project;
 use std::{
     fmt::Debug,
@@ -63,6 +63,8 @@ where
     }
 }
 
+pub type BoxResultFuture<'a, T, E> = BoxFuture<'a, Result<T, E>>;
+
 #[pin_project]
 pub struct FutureResultSend<T, E> {
     #[pin]