Sfoglia il codice sorgente

compare md5 of document after applying remote delta

appflowy 3 anni fa
parent
commit
c23bf46916

+ 2 - 0
backend/Cargo.toml

@@ -54,11 +54,13 @@ sql-builder = "3.1.1"
 lazy_static = "1.4"
 tokio = { version = "1", features = ["full"] }
 parking_lot = "0.11"
+md5 = "0.7.0"
 
 flowy-user = { path = "../rust-lib/flowy-user" }
 flowy-workspace = { path = "../rust-lib/flowy-workspace" }
 flowy-document = { path = "../rust-lib/flowy-document" }
 flowy-ws = { path = "../rust-lib/flowy-ws" }
+flowy-ot = { path = "../rust-lib/flowy-ot" }
 flowy-net = { path = "../rust-lib/flowy-net", features = ["http_server"] }
 
 ormx = { version = "0.7", features = ["postgres"]}

+ 2 - 5
backend/src/service/doc/doc.rs

@@ -23,10 +23,7 @@ pub(crate) async fn create_doc(
     Ok(())
 }
 
-pub(crate) async fn read_doc(
-    pool: &PgPool,
-    params: QueryDocParams,
-) -> Result<FlowyResponse, ServerError> {
+pub(crate) async fn read_doc(pool: &PgPool, params: QueryDocParams) -> Result<Doc, ServerError> {
     let doc_id = Uuid::parse_str(&params.doc_id)?;
     let mut transaction = pool
         .begin()
@@ -50,7 +47,7 @@ pub(crate) async fn read_doc(
         .await
         .context("Failed to commit SQL transaction to read doc.")?;
 
-    FlowyResponse::success().pb(doc)
+    Ok(doc)
 }
 
 pub(crate) async fn update_doc(

+ 3 - 1
backend/src/service/doc/router.rs

@@ -11,13 +11,15 @@ use crate::service::{
     doc::{read_doc, update_doc},
     util::parse_from_payload,
 };
+use flowy_net::response::FlowyResponse;
 
 pub async fn read_handler(
     payload: Payload,
     pool: Data<PgPool>,
 ) -> Result<HttpResponse, ServerError> {
     let params: QueryDocParams = parse_from_payload(payload).await?;
-    let response = read_doc(pool.get_ref(), params).await?;
+    let doc = read_doc(pool.get_ref(), params).await?;
+    let response = FlowyResponse::success().pb(doc)?;
     Ok(response.into())
 }
 

+ 88 - 16
backend/src/service/doc/ws_handler.rs

@@ -1,12 +1,14 @@
-use crate::service::{util::parse_from_bytes, ws::WsBizHandler};
+use crate::service::{doc::read_doc, util::parse_from_bytes, ws::WsBizHandler};
 use actix_web::web::Data;
 use bytes::Bytes;
-use dashmap::DashMap;
+use dashmap::{mapref::one::Ref, DashMap};
 use flowy_document::{
-    protobuf::{Revision, WsDataType, WsDocumentData},
+    protobuf::{Doc, QueryDocParams, Revision, WsDataType, WsDocumentData},
     services::doc::Document,
 };
-use parking_lot::RwLock;
+use flowy_net::errors::{internal_error, ServerError};
+use flowy_ot::core::Delta;
+use parking_lot::{RawRwLock, RwLock};
 use protobuf::Message;
 use sqlx::PgPool;
 use std::sync::Arc;
@@ -22,33 +24,103 @@ use std::sync::Arc;
 //   WsDocumentData────▶WsMessage ────▶ Message  ─────▶WsMessage ─────▶WsDocumentData
 
 pub struct DocWsBizHandler {
+    inner: Arc<Inner>,
+}
+
+struct Inner {
     pg_pool: Data<PgPool>,
-    edit_docs: DashMap<String, Arc<RwLock<EditDoc>>>,
+    edited_docs: DashMap<String, Arc<RwLock<EditedDoc>>>,
 }
 
 impl DocWsBizHandler {
     pub fn new(pg_pool: Data<PgPool>) -> Self {
         Self {
-            edit_docs: DashMap::new(),
-            pg_pool,
+            inner: Arc::new(Inner {
+                edited_docs: DashMap::new(),
+                pg_pool,
+            }),
         }
     }
 }
 
+async fn handle_document_data(inner: Arc<Inner>, data: Bytes) -> Result<(), ServerError> {
+    let document_data: WsDocumentData = parse_from_bytes(&data)?;
+    match document_data.ty {
+        WsDataType::Command => {},
+        WsDataType::Delta => {
+            let revision: Revision = parse_from_bytes(&document_data.data).unwrap();
+            let edited_doc = get_edit_doc(inner, &revision.doc_id).await?;
+            let _ = edited_doc.write().apply_revision(revision)?;
+        },
+    }
+
+    Ok(())
+}
+
+async fn get_edit_doc(
+    inner: Arc<Inner>,
+    doc_id: &str,
+) -> Result<Arc<RwLock<EditedDoc>>, ServerError> {
+    let pg_pool = inner.pg_pool.clone();
+
+    if let Some(doc) = inner.edited_docs.get(doc_id) {
+        return Ok(doc.clone());
+    }
+
+    let params = QueryDocParams {
+        doc_id: doc_id.to_string(),
+        ..Default::default()
+    };
+
+    let doc = read_doc(pg_pool.get_ref(), params).await?;
+    let edited_doc = Arc::new(RwLock::new(EditedDoc::new(doc)?));
+    inner
+        .edited_docs
+        .insert(doc_id.to_string(), edited_doc.clone());
+    Ok(edited_doc)
+}
+
 impl WsBizHandler for DocWsBizHandler {
     fn receive_data(&self, data: Bytes) {
-        let document_data: WsDocumentData = parse_from_bytes(&data).unwrap();
-        match document_data.ty {
-            WsDataType::Command => {},
-            WsDataType::Delta => {
-                let revision: Revision = parse_from_bytes(&document_data.data).unwrap();
-                log::warn!("{:?}", revision);
-            },
-        }
+        let inner = self.inner.clone();
+        actix_rt::spawn(async {
+            let result = handle_document_data(inner, data).await;
+            match result {
+                Ok(_) => {},
+                Err(e) => log::error!("WsBizHandler handle data error: {:?}", e),
+            }
+        });
     }
 }
 
-pub struct EditDoc {
+struct EditedDoc {
     doc_id: String,
     document: Document,
 }
+
+impl EditedDoc {
+    fn new(doc: Doc) -> Result<Self, ServerError> {
+        let delta = Delta::from_bytes(doc.data).map_err(internal_error)?;
+        let document = Document::from_delta(delta);
+        Ok(Self {
+            doc_id: doc.id.clone(),
+            document,
+        })
+    }
+
+    fn apply_revision(&mut self, revision: Revision) -> Result<(), ServerError> {
+        let delta = Delta::from_bytes(revision.delta).map_err(internal_error)?;
+        let _ = self
+            .document
+            .apply_delta(delta.clone())
+            .map_err(internal_error)?;
+
+        let json = self.document.to_json();
+        let md5 = format!("{:x}", md5::compute(json));
+        if md5 != revision.md5 {
+            log::error!("Document conflict after apply delta {}", delta)
+        }
+
+        Ok(())
+    }
+}

+ 4 - 4
rust-lib/dart-ffi/Cargo.toml

@@ -7,11 +7,11 @@ edition = "2018"
 [lib]
 name = "dart_ffi"
 # this value will change depending on the target os
-# for iOS it would be `cdylib`
-# for Macos it would be `cdylib`
+# for iOS it would be `rlib`
+# for Macos it would be `rlib`
 # for android it would be `c-dylib`
-# default cdylib
-crate-type = ["cdylib"]
+# default rlib
+crate-type = ["rlib"]
 
 
 [dependencies]

+ 7 - 3
rust-lib/flowy-document/src/services/doc/document/document.rs

@@ -53,10 +53,14 @@ impl Document {
 
     pub fn to_plain_string(&self) -> String { self.delta.apply("").unwrap() }
 
-    pub fn apply_delta(&mut self, data: Bytes) -> Result<(), DocError> {
+    pub fn apply_delta_data(&mut self, data: Bytes) -> Result<(), DocError> {
         let new_delta = Delta::from_bytes(data.to_vec())?;
-        log::debug!("Apply delta: {}", new_delta);
-        let _ = self.add_delta(&new_delta)?;
+        self.apply_delta(new_delta)
+    }
+
+    pub fn apply_delta(&mut self, delta: Delta) -> Result<(), DocError> {
+        log::debug!("Apply delta: {}", delta);
+        let _ = self.add_delta(&delta)?;
         log::debug!("Document: {}", self.to_json());
         Ok(())
     }

+ 3 - 1
rust-lib/flowy-document/src/services/doc/edit_context.rs

@@ -58,7 +58,9 @@ impl EditDocContext {
         let mut guard = self.document.write();
         let base_rev_id = self.rev_counter.value();
         let rev_id = self.rev_counter.next();
-        let _ = guard.apply_delta(data.clone())?;
+        let delta = Delta::from_bytes(data.to_vec())?;
+
+        let _ = guard.apply_delta(delta)?;
         let json = guard.to_json();
         drop(guard);
 

+ 7 - 0
rust-lib/flowy-net/src/errors.rs

@@ -47,6 +47,13 @@ impl ServerError {
     pub fn is_unauthorized(&self) -> bool { self.code == ErrorCode::UserUnauthorized }
 }
 
+pub fn internal_error<T>(e: T) -> ServerError
+where
+    T: std::fmt::Debug,
+{
+    ServerError::internal().context(e)
+}
+
 pub fn invalid_params<T: Debug>(e: T) -> ServerError { ServerError::params_invalid().context(e) }
 
 impl std::fmt::Display for ServerError {

+ 3 - 1
rust-lib/flowy-ot/src/core/operation/operation_serde.rs

@@ -82,7 +82,9 @@ impl<'de> Deserialize<'de> for Operation {
                 match operation {
                     None => Err(de::Error::missing_field("operation")),
                     Some(mut operation) => {
-                        operation.set_attributes(attributes.unwrap_or(Attributes::default()));
+                        if !operation.is_delete() {
+                            operation.set_attributes(attributes.unwrap_or(Attributes::default()));
+                        }
                         Ok(operation)
                     },
                 }