Browse Source

fix artix deadlock while acquire the write lock of document recusively

appflowy 3 years ago
parent
commit
c41b35af4e

+ 2 - 0
backend/Cargo.toml

@@ -55,6 +55,8 @@ lazy_static = "1.4"
 tokio = { version = "1", features = ["full"] }
 tokio = { version = "1", features = ["full"] }
 parking_lot = "0.11"
 parking_lot = "0.11"
 md5 = "0.7.0"
 md5 = "0.7.0"
+futures-core = { version = "0.3", default-features = false }
+pin-project = "1.0.0"
 
 
 flowy-user = { path = "../rust-lib/flowy-user" }
 flowy-user = { path = "../rust-lib/flowy-user" }
 flowy-workspace = { path = "../rust-lib/flowy-workspace" }
 flowy-workspace = { path = "../rust-lib/flowy-workspace" }

+ 1 - 1
backend/src/application.rs

@@ -133,7 +133,7 @@ fn user_scope() -> Scope {
 
 
 async fn init_app_context(configuration: &Settings) -> AppContext {
 async fn init_app_context(configuration: &Settings) -> AppContext {
     let _ = crate::service::log::Builder::new("flowy")
     let _ = crate::service::log::Builder::new("flowy")
-        .env_filter("Debug")
+        .env_filter("Trace")
         .build();
         .build();
     let pg_pool = get_connection_pool(&configuration.database)
     let pg_pool = get_connection_pool(&configuration.database)
         .await
         .await

+ 4 - 3
backend/src/service/doc/doc.rs

@@ -5,7 +5,7 @@ use crate::{
 };
 };
 use anyhow::Context;
 use anyhow::Context;
 use flowy_document::protobuf::{CreateDocParams, Doc, QueryDocParams, UpdateDocParams};
 use flowy_document::protobuf::{CreateDocParams, Doc, QueryDocParams, UpdateDocParams};
-use flowy_net::{errors::ServerError, response::FlowyResponse};
+use flowy_net::errors::ServerError;
 use sqlx::{postgres::PgArguments, PgPool, Postgres};
 use sqlx::{postgres::PgArguments, PgPool, Postgres};
 use uuid::Uuid;
 use uuid::Uuid;
 
 
@@ -50,10 +50,11 @@ pub(crate) async fn read_doc(pool: &PgPool, params: QueryDocParams) -> Result<Do
     Ok(doc)
     Ok(doc)
 }
 }
 
 
+#[tracing::instrument(level = "debug", skip(pool, params), err)]
 pub(crate) async fn update_doc(
 pub(crate) async fn update_doc(
     pool: &PgPool,
     pool: &PgPool,
     mut params: UpdateDocParams,
     mut params: UpdateDocParams,
-) -> Result<FlowyResponse, ServerError> {
+) -> Result<(), ServerError> {
     let doc_id = Uuid::parse_str(&params.doc_id)?;
     let doc_id = Uuid::parse_str(&params.doc_id)?;
     let mut transaction = pool
     let mut transaction = pool
         .begin()
         .begin()
@@ -77,7 +78,7 @@ pub(crate) async fn update_doc(
         .await
         .await
         .context("Failed to commit SQL transaction to update doc.")?;
         .context("Failed to commit SQL transaction to update doc.")?;
 
 
-    Ok(FlowyResponse::success())
+    Ok(())
 }
 }
 
 
 pub(crate) async fn delete_doc(
 pub(crate) async fn delete_doc(

+ 59 - 0
backend/src/service/doc/edit_doc.rs

@@ -0,0 +1,59 @@
+use crate::service::doc::update_doc;
+use actix_web::web::Data;
+use flowy_document::{
+    protobuf::{Doc, Revision, UpdateDocParams},
+    services::doc::Document,
+};
+use flowy_net::errors::{internal_error, ServerError};
+use flowy_ot::core::Delta;
+use parking_lot::RwLock;
+use sqlx::PgPool;
+use std::{sync::Arc, time::Duration};
+
+pub(crate) struct EditDoc {
+    doc_id: String,
+    document: Arc<RwLock<Document>>,
+    pg_pool: Data<PgPool>,
+}
+
+impl EditDoc {
+    pub(crate) fn new(doc: Doc, pg_pool: Data<PgPool>) -> Result<Self, ServerError> {
+        let delta = Delta::from_bytes(doc.data).map_err(internal_error)?;
+        let document = Arc::new(RwLock::new(Document::from_delta(delta)));
+        Ok(Self {
+            doc_id: doc.id.clone(),
+            document,
+            pg_pool,
+        })
+    }
+
+    #[tracing::instrument(level = "debug", skip(self, revision))]
+    pub(crate) async fn apply_revision(&self, revision: Revision) -> Result<(), ServerError> {
+        let delta = Delta::from_bytes(revision.delta).map_err(internal_error)?;
+        match self.document.try_write_for(Duration::from_millis(300)) {
+            None => {
+                log::error!("Failed to acquire write lock of document");
+            },
+            Some(mut w) => {
+                let _ = w.apply_delta(delta).map_err(internal_error)?;
+            },
+        }
+
+        let md5 = format!("{:x}", md5::compute(self.document.read().to_json()));
+        if md5 != revision.md5 {
+            log::warn!("Document md5 not match")
+        }
+
+        let mut params = UpdateDocParams::new();
+        params.set_doc_id(self.doc_id.clone());
+        params.set_data(self.document.read().to_bytes());
+        match update_doc(self.pg_pool.get_ref(), params).await {
+            Ok(_) => {},
+            Err(e) => {
+                log::error!("Save doc data failed: {:?}", e);
+            },
+        }
+
+        Ok(())
+    }
+}

+ 1 - 0
backend/src/service/doc/mod.rs

@@ -1,4 +1,5 @@
 mod doc;
 mod doc;
+mod edit_doc;
 pub mod router;
 pub mod router;
 mod sql_builder;
 mod sql_builder;
 pub mod ws_handler;
 pub mod ws_handler;

+ 2 - 2
backend/src/service/doc/router.rs

@@ -28,6 +28,6 @@ pub async fn update_handler(
     pool: Data<PgPool>,
     pool: Data<PgPool>,
 ) -> Result<HttpResponse, ServerError> {
 ) -> Result<HttpResponse, ServerError> {
     let params: UpdateDocParams = parse_from_payload(payload).await?;
     let params: UpdateDocParams = parse_from_payload(payload).await?;
-    let response = update_doc(pool.get_ref(), params).await?;
-    Ok(response.into())
+    let _ = update_doc(pool.get_ref(), params).await?;
+    Ok(FlowyResponse::success().into())
 }
 }

+ 49 - 86
backend/src/service/doc/ws_handler.rs

@@ -1,90 +1,34 @@
+use super::edit_doc::EditDoc;
 use crate::service::{doc::read_doc, util::parse_from_bytes, ws::WsBizHandler};
 use crate::service::{doc::read_doc, util::parse_from_bytes, ws::WsBizHandler};
 use actix_web::web::Data;
 use actix_web::web::Data;
 use bytes::Bytes;
 use bytes::Bytes;
-use dashmap::{mapref::one::Ref, DashMap};
 use flowy_document::{
 use flowy_document::{
-    protobuf::{Doc, QueryDocParams, Revision, WsDataType, WsDocumentData},
+    protobuf::{QueryDocParams, Revision, WsDataType, WsDocumentData},
     services::doc::Document,
     services::doc::Document,
 };
 };
-use flowy_net::errors::{internal_error, ServerError};
-use flowy_ot::core::Delta;
-use parking_lot::{RawRwLock, RwLock};
+use flowy_net::errors::ServerError;
+use parking_lot::{Mutex, RwLock, RwLockUpgradableReadGuard};
 use protobuf::Message;
 use protobuf::Message;
 use sqlx::PgPool;
 use sqlx::PgPool;
-use std::sync::Arc;
-
-#[rustfmt::skip]
-//
-//                 Frontend              │                 Backend
-//
-// ┌──────────┐        ┌──────────┐      │     ┌─────────┐            ┌───────────────┐
-// │  user 1  │───────▶│WsManager │───────────▶│ws_client│───────────▶│DocWsBizHandler│
-// └──────────┘        └──────────┘      │     └─────────┘            └───────────────┘
-//
-//   WsDocumentData────▶WsMessage ────▶ Message  ─────▶WsMessage ─────▶WsDocumentData
+use std::{collections::HashMap, sync::Arc};
 
 
 pub struct DocWsBizHandler {
 pub struct DocWsBizHandler {
     inner: Arc<Inner>,
     inner: Arc<Inner>,
 }
 }
 
 
-struct Inner {
-    pg_pool: Data<PgPool>,
-    edited_docs: DashMap<String, Arc<RwLock<EditedDoc>>>,
-}
-
 impl DocWsBizHandler {
 impl DocWsBizHandler {
     pub fn new(pg_pool: Data<PgPool>) -> Self {
     pub fn new(pg_pool: Data<PgPool>) -> Self {
         Self {
         Self {
-            inner: Arc::new(Inner {
-                edited_docs: DashMap::new(),
-                pg_pool,
-            }),
+            inner: Arc::new(Inner::new(pg_pool)),
         }
         }
     }
     }
 }
 }
 
 
-async fn handle_document_data(inner: Arc<Inner>, data: Bytes) -> Result<(), ServerError> {
-    let document_data: WsDocumentData = parse_from_bytes(&data)?;
-    match document_data.ty {
-        WsDataType::Command => {},
-        WsDataType::Delta => {
-            let revision: Revision = parse_from_bytes(&document_data.data).unwrap();
-            let edited_doc = get_edit_doc(inner, &revision.doc_id).await?;
-            let _ = edited_doc.write().apply_revision(revision)?;
-        },
-    }
-
-    Ok(())
-}
-
-async fn get_edit_doc(
-    inner: Arc<Inner>,
-    doc_id: &str,
-) -> Result<Arc<RwLock<EditedDoc>>, ServerError> {
-    let pg_pool = inner.pg_pool.clone();
-
-    if let Some(doc) = inner.edited_docs.get(doc_id) {
-        return Ok(doc.clone());
-    }
-
-    let params = QueryDocParams {
-        doc_id: doc_id.to_string(),
-        ..Default::default()
-    };
-
-    let doc = read_doc(pg_pool.get_ref(), params).await?;
-    let edited_doc = Arc::new(RwLock::new(EditedDoc::new(doc)?));
-    inner
-        .edited_docs
-        .insert(doc_id.to_string(), edited_doc.clone());
-    Ok(edited_doc)
-}
-
 impl WsBizHandler for DocWsBizHandler {
 impl WsBizHandler for DocWsBizHandler {
     fn receive_data(&self, data: Bytes) {
     fn receive_data(&self, data: Bytes) {
         let inner = self.inner.clone();
         let inner = self.inner.clone();
-        actix_rt::spawn(async {
-            let result = handle_document_data(inner, data).await;
+        actix_rt::spawn(async move {
+            let result = inner.handle(data).await;
             match result {
             match result {
                 Ok(_) => {},
                 Ok(_) => {},
                 Err(e) => log::error!("WsBizHandler handle data error: {:?}", e),
                 Err(e) => log::error!("WsBizHandler handle data error: {:?}", e),
@@ -93,34 +37,53 @@ impl WsBizHandler for DocWsBizHandler {
     }
     }
 }
 }
 
 
-struct EditedDoc {
-    doc_id: String,
-    document: Document,
+struct Inner {
+    pg_pool: Data<PgPool>,
+    edit_docs: RwLock<HashMap<String, Arc<EditDoc>>>,
 }
 }
 
 
-impl EditedDoc {
-    fn new(doc: Doc) -> Result<Self, ServerError> {
-        let delta = Delta::from_bytes(doc.data).map_err(internal_error)?;
-        let document = Document::from_delta(delta);
-        Ok(Self {
-            doc_id: doc.id.clone(),
-            document,
-        })
+impl Inner {
+    fn new(pg_pool: Data<PgPool>) -> Self {
+        Self {
+            pg_pool,
+            edit_docs: RwLock::new(HashMap::new()),
+        }
     }
     }
 
 
-    fn apply_revision(&mut self, revision: Revision) -> Result<(), ServerError> {
-        let delta = Delta::from_bytes(revision.delta).map_err(internal_error)?;
-        let _ = self
-            .document
-            .apply_delta(delta.clone())
-            .map_err(internal_error)?;
-
-        let json = self.document.to_json();
-        let md5 = format!("{:x}", md5::compute(json));
-        if md5 != revision.md5 {
-            log::error!("Document conflict after apply delta {}", delta)
+    async fn handle(&self, data: Bytes) -> Result<(), ServerError> {
+        let document_data: WsDocumentData = parse_from_bytes(&data)?;
+
+        match document_data.ty {
+            WsDataType::Command => {},
+            WsDataType::Delta => {
+                let revision: Revision = parse_from_bytes(&document_data.data)?;
+                let edited_doc = self.get_edit_doc(&revision.doc_id).await?;
+                tokio::spawn(async move {
+                    edited_doc.apply_revision(revision).await.unwrap();
+                });
+            },
         }
         }
 
 
         Ok(())
         Ok(())
     }
     }
+
+    async fn get_edit_doc(&self, doc_id: &str) -> Result<Arc<EditDoc>, ServerError> {
+        // Opti: using lock free map instead?
+        let edit_docs = self.edit_docs.upgradable_read();
+        if let Some(doc) = edit_docs.get(doc_id) {
+            return Ok(doc.clone());
+        } else {
+            let mut edit_docs = RwLockUpgradableReadGuard::upgrade(edit_docs);
+            let pg_pool = self.pg_pool.clone();
+            let params = QueryDocParams {
+                doc_id: doc_id.to_string(),
+                ..Default::default()
+            };
+
+            let doc = read_doc(pg_pool.get_ref(), params).await?;
+            let edit_doc = Arc::new(EditDoc::new(doc, self.pg_pool.clone())?);
+            edit_docs.insert(doc_id.to_string(), edit_doc.clone());
+            Ok(edit_doc)
+        }
+    }
 }
 }

+ 1 - 1
backend/src/service/log/mod.rs

@@ -28,7 +28,7 @@ impl Builder {
         let env_filter = EnvFilter::new(self.env_filter);
         let env_filter = EnvFilter::new(self.env_filter);
         let subscriber = tracing_subscriber::fmt()
         let subscriber = tracing_subscriber::fmt()
             .with_target(true)
             .with_target(true)
-            .with_max_level(tracing::Level::DEBUG)
+            .with_max_level(tracing::Level::TRACE)
             .with_writer(std::io::stderr)
             .with_writer(std::io::stderr)
             .with_thread_ids(false)
             .with_thread_ids(false)
             .compact()
             .compact()

+ 2 - 2
rust-lib/flowy-document/src/services/doc/document/document.rs

@@ -59,9 +59,9 @@ impl Document {
     }
     }
 
 
     pub fn apply_delta(&mut self, delta: Delta) -> Result<(), DocError> {
     pub fn apply_delta(&mut self, delta: Delta) -> Result<(), DocError> {
-        log::debug!("Apply delta: {}", delta);
+        log::trace!("Apply delta: {}", delta);
         let _ = self.add_delta(&delta)?;
         let _ = self.add_delta(&delta)?;
-        log::debug!("Document: {}", self.to_json());
+        log::trace!("Document: {}", self.to_json());
         Ok(())
         Ok(())
     }
     }
 
 

+ 3 - 0
rust-lib/flowy-infra/src/lib.rs

@@ -9,6 +9,9 @@ pub mod future;
 pub mod kv;
 pub mod kv;
 mod protobuf;
 mod protobuf;
 
 
+#[macro_use]
+pub mod macros;
+
 #[allow(dead_code)]
 #[allow(dead_code)]
 pub fn uuid() -> String { uuid::Uuid::new_v4().to_string() }
 pub fn uuid() -> String { uuid::Uuid::new_v4().to_string() }
 
 

+ 8 - 0
rust-lib/flowy-infra/src/macros.rs

@@ -0,0 +1,8 @@
+#[macro_export]
+macro_rules! dispatch_future {
+    ($fut:expr) => {
+        ClosureFuture {
+            fut: Box::pin(async move { $fut.await }),
+        }
+    };
+}