Переглянути джерело

fix: realtime event parser (#3231)

* chore: decrypt realtime event

* fix: realtime event parser
Nathan.fooo 1 рік тому
батько
коміт
c1bba7e48b

+ 0 - 1
frontend/appflowy_flutter/lib/user/application/supabase_realtime.dart

@@ -70,7 +70,6 @@ class SupbaseRealtimeService {
           (payload, [ref]) {
             try {
               final jsonStr = jsonEncode(payload);
-              Log.info("Realtime payload: $jsonStr");
               final pb = RealtimePayloadPB.create()..jsonStr = jsonStr;
               UserEventPushRealtimeEvent(pb).send();
             } catch (e) {

+ 10 - 8
frontend/appflowy_tauri/src-tauri/Cargo.toml

@@ -34,14 +34,15 @@ default = ["custom-protocol"]
 custom-protocol = ["tauri/custom-protocol"]
 
 [patch.crates-io]
-collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c3c22d" }
-collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c3c22d" }
-collab-persistence = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c3c22d" }
-collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c3c22d" }
-collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c3c22d" }
-appflowy-integrate = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c3c22d" }
-collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c3c22d" }
-collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c3c22d" }
+collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cff1b9" }
+collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cff1b9" }
+collab-persistence = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cff1b9" }
+collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cff1b9" }
+collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cff1b9" }
+appflowy-integrate = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cff1b9" }
+collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cff1b9" }
+collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cff1b9" }
+collab-define = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cff1b9" }
 
 #collab = { path = "../../../../AppFlowy-Collab/collab" }
 #collab-folder = { path = "../../../../AppFlowy-Collab/collab-folder" }
@@ -50,6 +51,7 @@ collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c
 #appflowy-integrate = { path = "../../../../AppFlowy-Collab/appflowy-integrate" }
 #collab-plugins = { path = "../../../../AppFlowy-Collab/collab-plugins" }
 #collab-user = { path = "../../../../AppFlowy-Collab/collab-user" }
+#collab-define = { path = "../../../../AppFlowy-Collab/collab-define" }
 
 
 

+ 21 - 11
frontend/rust-lib/Cargo.lock

@@ -120,7 +120,7 @@ checksum = "3b13c32d80ecc7ab747b80c3784bce54ee8a7a0cc4fbda9bf4cda2cf6fe90854"
 [[package]]
 name = "appflowy-integrate"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c3c22d#c3c22d9addda6cf9943e28c4294b4180d3454299"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cff1b9#cff1b99f4ed51f65dab73492eac4da8e7907f079"
 dependencies = [
  "anyhow",
  "collab",
@@ -611,7 +611,7 @@ dependencies = [
 [[package]]
 name = "collab"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c3c22d#c3c22d9addda6cf9943e28c4294b4180d3454299"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cff1b9#cff1b99f4ed51f65dab73492eac4da8e7907f079"
 dependencies = [
  "anyhow",
  "bytes",
@@ -629,7 +629,7 @@ dependencies = [
 [[package]]
 name = "collab-client-ws"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c3c22d#c3c22d9addda6cf9943e28c4294b4180d3454299"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cff1b9#cff1b99f4ed51f65dab73492eac4da8e7907f079"
 dependencies = [
  "bytes",
  "collab-sync",
@@ -647,7 +647,7 @@ dependencies = [
 [[package]]
 name = "collab-database"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c3c22d#c3c22d9addda6cf9943e28c4294b4180d3454299"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cff1b9#cff1b99f4ed51f65dab73492eac4da8e7907f079"
 dependencies = [
  "anyhow",
  "async-trait",
@@ -671,10 +671,18 @@ dependencies = [
  "uuid",
 ]
 
+[[package]]
+name = "collab-define"
+version = "0.1.0"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cff1b9#cff1b99f4ed51f65dab73492eac4da8e7907f079"
+dependencies = [
+ "uuid",
+]
+
 [[package]]
 name = "collab-derive"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c3c22d#c3c22d9addda6cf9943e28c4294b4180d3454299"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cff1b9#cff1b99f4ed51f65dab73492eac4da8e7907f079"
 dependencies = [
  "proc-macro2",
  "quote",
@@ -686,7 +694,7 @@ dependencies = [
 [[package]]
 name = "collab-document"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c3c22d#c3c22d9addda6cf9943e28c4294b4180d3454299"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cff1b9#cff1b99f4ed51f65dab73492eac4da8e7907f079"
 dependencies = [
  "anyhow",
  "collab",
@@ -705,7 +713,7 @@ dependencies = [
 [[package]]
 name = "collab-folder"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c3c22d#c3c22d9addda6cf9943e28c4294b4180d3454299"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cff1b9#cff1b99f4ed51f65dab73492eac4da8e7907f079"
 dependencies = [
  "anyhow",
  "chrono",
@@ -725,7 +733,7 @@ dependencies = [
 [[package]]
 name = "collab-persistence"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c3c22d#c3c22d9addda6cf9943e28c4294b4180d3454299"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cff1b9#cff1b99f4ed51f65dab73492eac4da8e7907f079"
 dependencies = [
  "bincode",
  "chrono",
@@ -745,12 +753,13 @@ dependencies = [
 [[package]]
 name = "collab-plugins"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c3c22d#c3c22d9addda6cf9943e28c4294b4180d3454299"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cff1b9#cff1b99f4ed51f65dab73492eac4da8e7907f079"
 dependencies = [
  "anyhow",
  "async-trait",
  "collab",
  "collab-client-ws",
+ "collab-define",
  "collab-persistence",
  "collab-sync",
  "futures-util",
@@ -773,7 +782,7 @@ dependencies = [
 [[package]]
 name = "collab-sync"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c3c22d#c3c22d9addda6cf9943e28c4294b4180d3454299"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cff1b9#cff1b99f4ed51f65dab73492eac4da8e7907f079"
 dependencies = [
  "bytes",
  "collab",
@@ -795,7 +804,7 @@ dependencies = [
 [[package]]
 name = "collab-user"
 version = "0.1.0"
-source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=c3c22d#c3c22d9addda6cf9943e28c4294b4180d3454299"
+source = "git+https://github.com/AppFlowy-IO/AppFlowy-Collab?rev=cff1b9#cff1b99f4ed51f65dab73492eac4da8e7907f079"
 dependencies = [
  "anyhow",
  "collab",
@@ -1776,6 +1785,7 @@ version = "0.1.0"
 dependencies = [
  "anyhow",
  "chrono",
+ "collab-define",
  "flowy-error",
  "lib-infra",
  "serde",

+ 9 - 7
frontend/rust-lib/Cargo.toml

@@ -39,13 +39,14 @@ opt-level = 3
 incremental = false
 
 [patch.crates-io]
-collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c3c22d" }
-collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c3c22d" }
-collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c3c22d" }
-collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c3c22d" }
-appflowy-integrate = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c3c22d" }
-collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c3c22d" }
-collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c3c22d" }
+collab = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cff1b9" }
+collab-folder = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cff1b9" }
+collab-document = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cff1b9" }
+collab-database = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cff1b9" }
+appflowy-integrate = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cff1b9" }
+collab-plugins = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cff1b9" }
+collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cff1b9" }
+collab-define = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "cff1b9" }
 
 #collab = { path = "../AppFlowy-Collab/collab" }
 #collab-folder = { path = "../AppFlowy-Collab/collab-folder" }
@@ -54,4 +55,5 @@ collab-user = { git = "https://github.com/AppFlowy-IO/AppFlowy-Collab", rev = "c
 #collab-plugins = { path = "../AppFlowy-Collab/collab-plugins" }
 #appflowy-integrate = { path = "../AppFlowy-Collab/appflowy-integrate" }
 #collab-user = { path = "../AppFlowy-Collab/collab-user" }
+#collab-define = { path = "../AppFlowy-Collab/collab-define" }
 

+ 9 - 0
frontend/rust-lib/flowy-server/src/local_server/impls/user.rs

@@ -1,6 +1,7 @@
 use std::sync::Arc;
 
 use anyhow::Error;
+use collab_plugins::cloud_storage::CollabObject;
 use lazy_static::lazy_static;
 use parking_lot::Mutex;
 
@@ -116,6 +117,14 @@ impl UserService for LocalServerUserAuthServiceImpl {
   fn get_user_awareness_updates(&self, _uid: i64) -> FutureResult<Vec<Vec<u8>>, Error> {
     FutureResult::new(async { Ok(vec![]) })
   }
+
+  fn create_collab_object(
+    &self,
+    _collab_object: &CollabObject,
+    _data: Vec<u8>,
+  ) -> FutureResult<(), Error> {
+    FutureResult::new(async { Ok(()) })
+  }
 }
 
 fn make_user_workspace() -> UserWorkspace {

+ 10 - 0
frontend/rust-lib/flowy-server/src/self_host/impls/user.rs

@@ -1,4 +1,5 @@
 use anyhow::Error;
+use collab_plugins::cloud_storage::CollabObject;
 
 use flowy_error::{ErrorCode, FlowyError};
 use flowy_user_deps::cloud::UserService;
@@ -127,6 +128,15 @@ impl UserService for SelfHostedUserAuthServiceImpl {
     // TODO(nathan): implement the RESTful API for this
     FutureResult::new(async { Ok(vec![]) })
   }
+
+  fn create_collab_object(
+    &self,
+    _collab_object: &CollabObject,
+    _data: Vec<u8>,
+  ) -> FutureResult<(), Error> {
+    // TODO(nathan): implement the RESTful API for this
+    FutureResult::new(async { Ok(()) })
+  }
 }
 
 pub async fn user_sign_up_request(

+ 5 - 5
frontend/rust-lib/flowy-server/src/supabase/api/collab_storage.rs

@@ -116,14 +116,14 @@ where
           })?;
 
           let current_edit_count = value.get("current_edit_count").and_then(|id| id.as_i64())?;
-          let last_snapshot_edit_count = value
-            .get("last_snapshot_edit_count")
+          let snapshot_edit_count = value
+            .get("snapshot_edit_count")
             .and_then(|id| id.as_i64())?;
 
           Some(RemoteCollabState {
             current_edit_count,
-            last_snapshot_edit_count,
-            last_snapshot_created_at: created_at.timestamp(),
+            snapshot_edit_count,
+            snapshot_created_at: created_at.timestamp(),
           })
         }),
     )
@@ -215,7 +215,7 @@ where
   }
 }
 
-async fn send_update(
+pub(crate) async fn send_update(
   workspace_id: String,
   object: &CollabObject,
   update: Vec<u8>,

+ 38 - 1
frontend/rust-lib/flowy-server/src/supabase/api/user.rs

@@ -2,6 +2,7 @@ use std::str::FromStr;
 use std::sync::Arc;
 
 use anyhow::Error;
+use collab_plugins::cloud_storage::CollabObject;
 use tokio::sync::oneshot::channel;
 use uuid::Uuid;
 
@@ -13,7 +14,7 @@ use lib_infra::future::FutureResult;
 
 use crate::supabase::api::request::FetchObjectUpdateAction;
 use crate::supabase::api::util::{ExtendedResponse, InsertParamsBuilder};
-use crate::supabase::api::{PostgresWrapper, SupabaseServerService};
+use crate::supabase::api::{send_update, PostgresWrapper, SupabaseServerService};
 use crate::supabase::define::*;
 use crate::supabase::entities::GetUserProfileParams;
 use crate::supabase::entities::UidResponse;
@@ -224,6 +225,42 @@ where
     });
     FutureResult::new(async { rx.await? })
   }
+
+  fn create_collab_object(
+    &self,
+    collab_object: &CollabObject,
+    data: Vec<u8>,
+  ) -> FutureResult<(), Error> {
+    let try_get_postgrest = self.server.try_get_weak_postgrest();
+    let cloned_collab_object = collab_object.clone();
+    let (tx, rx) = channel();
+    tokio::spawn(async move {
+      tx.send(
+        async move {
+          let workspace_id = cloned_collab_object
+            .get_workspace_id()
+            .ok_or(anyhow::anyhow!("Invalid workspace id"))?;
+
+          let postgrest = try_get_postgrest?
+            .upgrade()
+            .ok_or(anyhow::anyhow!("postgrest is not available"))?;
+
+          let encryption_secret = postgrest.secret();
+          send_update(
+            workspace_id,
+            &cloned_collab_object,
+            data,
+            &postgrest,
+            &encryption_secret,
+          )
+          .await?;
+          Ok(())
+        }
+        .await,
+      )
+    });
+    FutureResult::new(async { rx.await? })
+  }
 }
 
 async fn get_user_profile(

+ 1 - 9
frontend/rust-lib/flowy-server/src/supabase/api/util.rs

@@ -200,18 +200,10 @@ impl SupabaseBinaryColumnDecoder {
 
 /// A decoder specifically tailored for realtime event binary columns in Supabase.
 ///
-/// Decodes the realtime event binary column data using the standard Supabase binary column decoder.
 pub struct SupabaseRealtimeEventBinaryColumnDecoder;
 
 impl SupabaseRealtimeEventBinaryColumnDecoder {
-  /// Decodes a realtime event binary column string from Supabase into binary data.
-  ///
-  /// # Parameters
-  /// - `value`: The string representation from a Supabase realtime event binary column.
-  ///
-  /// # Returns
-  /// Returns an `Option` containing the decoded binary data if decoding is successful.
-  /// Otherwise, returns `None`.
+  /// The realtime event binary column string is encoded twice. So it needs to be decoded twice.
   pub fn decode<T: AsRef<str>>(value: T) -> Option<Vec<u8>> {
     let s = value.as_ref().strip_prefix("\\x")?;
     let bytes = hex::decode(s).ok()?;

+ 25 - 7
frontend/rust-lib/flowy-server/src/supabase/server.rs

@@ -7,6 +7,7 @@ use serde_json::Value;
 
 use flowy_database_deps::cloud::DatabaseCloudService;
 use flowy_document_deps::cloud::DocumentCloudService;
+use flowy_encrypt::decrypt_bytes;
 use flowy_folder_deps::cloud::FolderCloudService;
 use flowy_server_config::supabase_config::SupabaseConfiguration;
 use flowy_user_deps::cloud::UserService;
@@ -57,7 +58,8 @@ impl PgPoolMode {
 pub struct SupabaseServer {
   #[allow(dead_code)]
   config: SupabaseConfiguration,
-  device_id: Mutex<String>,
+  /// did represents as the device id is used to identify the device that is currently using the app.
+  did: Mutex<String>,
   update_tx: RwLock<HashMap<String, RemoteUpdateSender>>,
   restful_postgres: Arc<RwLock<Option<Arc<RESTfulPostgresServer>>>>,
   encryption: Weak<dyn AppFlowyEncryption>,
@@ -80,7 +82,7 @@ impl SupabaseServer {
     };
     Self {
       config,
-      device_id: Default::default(),
+      did: Default::default(),
       update_tx,
       restful_postgres: Arc::new(RwLock::new(restful_postgres)),
       encryption,
@@ -107,7 +109,7 @@ impl AppFlowyServer for SupabaseServer {
   }
 
   fn set_sync_device_id(&self, device_id: &str) {
-    *self.device_id.lock() = device_id.to_string();
+    *self.did.lock() = device_id.to_string();
   }
 
   fn user_service(&self) -> Arc<dyn UserService> {
@@ -150,10 +152,26 @@ impl AppFlowyServer for SupabaseServer {
   fn handle_realtime_event(&self, json: Value) {
     match serde_json::from_value::<RealtimeCollabUpdateEvent>(json) {
       Ok(event) => {
-        if let Some(tx) = self.update_tx.read().get(event.payload.oid.as_str()) {
-          if self.device_id.lock().as_str() != event.payload.did.as_str() {
-            if let Err(e) = tx.send(event.payload.value) {
-              tracing::trace!("send realtime update error: {}", e);
+        if let (Some(tx), Some(secret)) = (
+          self.update_tx.read().get(event.payload.oid.as_str()),
+          self
+            .encryption
+            .upgrade()
+            .and_then(|encryption| encryption.get_secret()),
+        ) {
+          if self.did.lock().as_str() != event.payload.did.as_str() {
+            tracing::trace!("Did receive realtime event: {}", event);
+            let value = if event.payload.encrypt == 1 {
+              decrypt_bytes(event.payload.value, &secret).unwrap_or_default()
+            } else {
+              event.payload.value
+            };
+
+            if !value.is_empty() {
+              tracing::trace!("Parse payload with len: {} success", value.len());
+              if let Err(e) = tx.send(value) {
+                tracing::trace!("send realtime update error: {}", e);
+              }
             }
           }
         }

+ 3 - 0
frontend/rust-lib/flowy-server/tests/supabase_test/util.rs

@@ -65,6 +65,7 @@ pub fn encryption_folder_service(
   (service, encryption_impl)
 }
 
+#[allow(dead_code)]
 pub fn encryption_collab_service(
   secret: Option<String>,
 ) -> (Arc<dyn RemoteCollabStorage>, Arc<dyn AppFlowyEncryption>) {
@@ -77,6 +78,7 @@ pub fn encryption_collab_service(
   (service, encryption_impl)
 }
 
+#[allow(dead_code)]
 pub async fn print_encryption_folder(folder_id: &str, encryption_secret: Option<String>) {
   let (cloud_service, _encryption) = encryption_folder_service(encryption_secret);
   let folder_data = cloud_service.get_folder_data(folder_id).await.unwrap();
@@ -84,6 +86,7 @@ pub async fn print_encryption_folder(folder_id: &str, encryption_secret: Option<
   println!("{}", serde_json::to_string_pretty(&json).unwrap());
 }
 
+#[allow(dead_code)]
 pub async fn print_encryption_folder_snapshot(folder_id: &str, encryption_secret: Option<String>) {
   let (cloud_service, _encryption) = encryption_collab_service(encryption_secret);
   let snapshot = cloud_service

+ 1 - 0
frontend/rust-lib/flowy-user-deps/Cargo.toml

@@ -10,6 +10,7 @@ lib-infra = { path = "../../../shared-lib/lib-infra" }
 flowy-error = { path = "../flowy-error" }
 uuid = { version = "1.3.3", features = ["v4"] }
 serde = { version = "1.0", features = ["derive"] }
+collab-define = { version = "0.1.0" }
 serde_json = {version = "1.0"}
 serde_repr = "0.1"
 chrono = { version = "0.4.22", default-features = false, features = ["clock"] }

+ 7 - 0
frontend/rust-lib/flowy-user-deps/src/cloud.rs

@@ -2,6 +2,7 @@ use std::collections::HashMap;
 use std::str::FromStr;
 
 use anyhow::Error;
+use collab_define::CollabObject;
 use serde::{Deserialize, Serialize};
 use uuid::Uuid;
 
@@ -79,6 +80,12 @@ pub trait UserService: Send + Sync {
   ) -> FutureResult<(), Error>;
 
   fn get_user_awareness_updates(&self, uid: i64) -> FutureResult<Vec<Vec<u8>>, Error>;
+
+  fn create_collab_object(
+    &self,
+    collab_object: &CollabObject,
+    data: Vec<u8>,
+  ) -> FutureResult<(), Error>;
 }
 
 pub fn third_party_params_from_box_any(any: BoxAny) -> Result<ThirdPartyParams, Error> {