123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423 |
- use std::future::Future;
- use std::iter::Take;
- use std::pin::Pin;
- use std::str::FromStr;
- use std::sync::{Arc, Weak};
- use std::time::Duration;
- use anyhow::Error;
- use chrono::{DateTime, Utc};
- use collab_plugins::cloud_storage::{CollabObject, CollabType, RemoteCollabSnapshot};
- use serde_json::Value;
- use tokio_retry::strategy::FixedInterval;
- use tokio_retry::{Action, Condition, RetryIf};
- use flowy_database_deps::cloud::{CollabObjectUpdate, CollabObjectUpdateByOid};
- use lib_infra::util::md5;
- use crate::supabase::api::util::{
- BinaryColumnDecoder, ExtendedResponse, InsertParamsBuilder, SupabaseBinaryColumnDecoder,
- SupabaseBinaryColumnEncoder,
- };
- use crate::supabase::api::PostgresWrapper;
- use crate::supabase::define::*;
- pub struct FetchObjectUpdateAction {
- object_id: String,
- object_ty: CollabType,
- postgrest: Weak<PostgresWrapper>,
- }
- impl FetchObjectUpdateAction {
- pub fn new(object_id: String, object_ty: CollabType, postgrest: Weak<PostgresWrapper>) -> Self {
- Self {
- postgrest,
- object_id,
- object_ty,
- }
- }
- pub fn run(self) -> RetryIf<Take<FixedInterval>, FetchObjectUpdateAction, RetryCondition> {
- let postgrest = self.postgrest.clone();
- let retry_strategy = FixedInterval::new(Duration::from_secs(5)).take(3);
- RetryIf::spawn(retry_strategy, self, RetryCondition(postgrest))
- }
- pub fn run_with_fix_interval(
- self,
- secs: u64,
- times: usize,
- ) -> RetryIf<Take<FixedInterval>, FetchObjectUpdateAction, RetryCondition> {
- let postgrest = self.postgrest.clone();
- let retry_strategy = FixedInterval::new(Duration::from_secs(secs)).take(times);
- RetryIf::spawn(retry_strategy, self, RetryCondition(postgrest))
- }
- }
- impl Action for FetchObjectUpdateAction {
- type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send>>;
- type Item = CollabObjectUpdate;
- type Error = anyhow::Error;
- fn run(&mut self) -> Self::Future {
- let weak_postgres = self.postgrest.clone();
- let object_id = self.object_id.clone();
- let object_ty = self.object_ty.clone();
- Box::pin(async move {
- match weak_postgres.upgrade() {
- None => Ok(vec![]),
- Some(postgrest) => {
- match get_updates_from_server(&object_id, &object_ty, &postgrest).await {
- Ok(items) => Ok(items.into_iter().map(|item| item.value).collect()),
- Err(err) => {
- tracing::error!("Get {} updates failed with error: {:?}", object_id, err);
- Err(err)
- },
- }
- },
- }
- })
- }
- }
- pub struct BatchFetchObjectUpdateAction {
- object_ids: Vec<String>,
- object_ty: CollabType,
- postgrest: Weak<PostgresWrapper>,
- }
- impl BatchFetchObjectUpdateAction {
- pub fn new(
- object_ids: Vec<String>,
- object_ty: CollabType,
- postgrest: Weak<PostgresWrapper>,
- ) -> Self {
- Self {
- postgrest,
- object_ty,
- object_ids,
- }
- }
- pub fn run(self) -> RetryIf<Take<FixedInterval>, BatchFetchObjectUpdateAction, RetryCondition> {
- let postgrest = self.postgrest.clone();
- let retry_strategy = FixedInterval::new(Duration::from_secs(5)).take(3);
- RetryIf::spawn(retry_strategy, self, RetryCondition(postgrest))
- }
- }
- impl Action for BatchFetchObjectUpdateAction {
- type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send>>;
- type Item = CollabObjectUpdateByOid;
- type Error = anyhow::Error;
- fn run(&mut self) -> Self::Future {
- let weak_postgrest = self.postgrest.clone();
- let object_ids = self.object_ids.clone();
- let object_ty = self.object_ty.clone();
- Box::pin(async move {
- match weak_postgrest.upgrade() {
- None => Ok(CollabObjectUpdateByOid::default()),
- Some(server) => {
- match batch_get_updates_from_server(object_ids.clone(), &object_ty, server).await {
- Ok(updates_by_oid) => Ok(updates_by_oid),
- Err(err) => {
- tracing::error!(
- "Batch get object with given ids:{:?} failed with error: {:?}",
- object_ids,
- err
- );
- Err(err)
- },
- }
- },
- }
- })
- }
- }
- pub async fn create_snapshot(
- postgrest: &Arc<PostgresWrapper>,
- object: &CollabObject,
- snapshot: Vec<u8>,
- ) -> Result<i64, Error> {
- let value_size = snapshot.len() as i32;
- let (snapshot, encrypt) = SupabaseBinaryColumnEncoder::encode(&snapshot, &postgrest.secret())?;
- let ret: Value = postgrest
- .from(AF_COLLAB_SNAPSHOT_TABLE)
- .insert(
- InsertParamsBuilder::new()
- .insert(AF_COLLAB_SNAPSHOT_OID_COLUMN, object.object_id.clone())
- .insert("name", object.ty.to_string())
- .insert(AF_COLLAB_SNAPSHOT_ENCRYPT_COLUMN, encrypt)
- .insert(AF_COLLAB_SNAPSHOT_BLOB_COLUMN, snapshot)
- .insert(AF_COLLAB_SNAPSHOT_BLOB_SIZE_COLUMN, value_size)
- .build(),
- )
- .execute()
- .await?
- .get_json()
- .await?;
- let snapshot_id = ret
- .as_array()
- .and_then(|array| array.first())
- .and_then(|value| value.get("sid"))
- .and_then(|value| value.as_i64())
- .unwrap_or(0);
- Ok(snapshot_id)
- }
- pub async fn get_snapshots_from_server(
- object_id: &str,
- postgrest: Arc<PostgresWrapper>,
- limit: usize,
- ) -> Result<Vec<RemoteCollabSnapshot>, Error> {
- let json: Value = postgrest
- .from(AF_COLLAB_SNAPSHOT_TABLE)
- .select(format!(
- "{},{},{},{}",
- AF_COLLAB_SNAPSHOT_ID_COLUMN,
- AF_COLLAB_SNAPSHOT_BLOB_COLUMN,
- AF_COLLAB_SNAPSHOT_CREATED_AT_COLUMN,
- AF_COLLAB_SNAPSHOT_ENCRYPT_COLUMN
- ))
- .order(format!("{}.desc", AF_COLLAB_SNAPSHOT_ID_COLUMN))
- .limit(limit)
- .eq(AF_COLLAB_SNAPSHOT_OID_COLUMN, object_id)
- .execute()
- .await?
- .get_json()
- .await?;
- let mut snapshots = vec![];
- let secret = postgrest.secret();
- match json.as_array() {
- None => {
- if let Some(snapshot) = parser_snapshot(object_id, &json, &secret) {
- snapshots.push(snapshot);
- }
- },
- Some(snapshot_values) => {
- for snapshot_value in snapshot_values {
- if let Some(snapshot) = parser_snapshot(object_id, snapshot_value, &secret) {
- snapshots.push(snapshot);
- }
- }
- },
- }
- Ok(snapshots)
- }
- fn parser_snapshot(
- object_id: &str,
- snapshot: &Value,
- secret: &Option<String>,
- ) -> Option<RemoteCollabSnapshot> {
- let blob = match (
- snapshot
- .get(AF_COLLAB_SNAPSHOT_ENCRYPT_COLUMN)
- .and_then(|encrypt| encrypt.as_i64()),
- snapshot
- .get(AF_COLLAB_SNAPSHOT_BLOB_COLUMN)
- .and_then(|value| value.as_str()),
- ) {
- (Some(encrypt), Some(value)) => {
- SupabaseBinaryColumnDecoder::decode::<_, BinaryColumnDecoder>(value, encrypt as i32, secret)
- .ok()
- },
- _ => None,
- }?;
- let sid = snapshot.get("sid").and_then(|id| id.as_i64())?;
- let created_at = snapshot.get("created_at").and_then(|created_at| {
- created_at
- .as_str()
- .map(|id| DateTime::<Utc>::from_str(id).ok())
- .and_then(|date| date)
- })?;
- Some(RemoteCollabSnapshot {
- sid,
- oid: object_id.to_string(),
- blob,
- created_at: created_at.timestamp(),
- })
- }
- pub async fn batch_get_updates_from_server(
- object_ids: Vec<String>,
- object_ty: &CollabType,
- postgrest: Arc<PostgresWrapper>,
- ) -> Result<CollabObjectUpdateByOid, Error> {
- let json = postgrest
- .from(table_name(object_ty))
- .select("oid, key, value, encrypt, md5")
- .order(format!("{}.asc", AF_COLLAB_KEY_COLUMN))
- .in_("oid", object_ids)
- .execute()
- .await?
- .get_json()
- .await?;
- let mut updates_by_oid = CollabObjectUpdateByOid::new();
- if let Some(records) = json.as_array() {
- for record in records {
- tracing::debug!("get updates from server: {:?}", record);
- if let Some(oid) = record.get("oid").and_then(|value| value.as_str()) {
- match parser_updates_form_json(record.clone(), &postgrest.secret()) {
- Ok(updates) => {
- let object_updates = updates_by_oid
- .entry(oid.to_string())
- .or_insert_with(Vec::new);
- for update in updates {
- object_updates.push(update.value);
- }
- },
- Err(e) => {
- tracing::error!("parser_updates_form_json error: {:?}", e);
- },
- }
- }
- }
- }
- Ok(updates_by_oid)
- }
- pub async fn get_updates_from_server(
- object_id: &str,
- object_ty: &CollabType,
- postgrest: &Arc<PostgresWrapper>,
- ) -> Result<Vec<UpdateItem>, Error> {
- let json = postgrest
- .from(table_name(object_ty))
- .select("key, value, encrypt, md5")
- .order(format!("{}.asc", AF_COLLAB_KEY_COLUMN))
- .eq("oid", object_id)
- .execute()
- .await?
- .get_json()
- .await?;
- parser_updates_form_json(json, &postgrest.secret())
- }
- /// json format:
- /// ```json
- /// [
- /// {
- /// "value": "\\x...",
- /// "encrypt": 1,
- /// "md5": "..."
- /// },
- /// {
- /// "value": "\\x...",
- /// "encrypt": 1,
- /// "md5": "..."
- /// },
- /// ...
- /// ]
- /// ```
- fn parser_updates_form_json(
- json: Value,
- encryption_secret: &Option<String>,
- ) -> Result<Vec<UpdateItem>, Error> {
- let mut updates = vec![];
- match json.as_array() {
- None => {
- updates.push(parser_update_from_json(&json, encryption_secret)?);
- },
- Some(values) => {
- let expected_update_len = values.len();
- for value in values {
- updates.push(parser_update_from_json(value, encryption_secret)?);
- }
- if updates.len() != expected_update_len {
- return Err(anyhow::anyhow!(
- "The length of the updates does not match the length of the expected updates, indicating that some updates failed to parse."
- ));
- }
- },
- }
- Ok(updates)
- }
- /// Parses update from a JSON representation.
- ///
- /// This function attempts to decode an encrypted value from a JSON object
- /// and verify its integrity against a provided MD5 hash.
- ///
- /// # Parameters
- /// - `json`: The JSON value representing the update information.
- /// - `encryption_secret`: An optional encryption secret used for decrypting the value.
- ///
- /// json format:
- /// ```json
- /// {
- /// "value": "\\x...",
- /// "encrypt": 1,
- /// "md5": "..."
- /// },
- /// ```
- fn parser_update_from_json(
- json: &Value,
- encryption_secret: &Option<String>,
- ) -> Result<UpdateItem, Error> {
- let some_record = match (
- json.get("encrypt").and_then(|encrypt| encrypt.as_i64()),
- json.get("value").and_then(|value| value.as_str()),
- ) {
- (Some(encrypt), Some(value)) => {
- match SupabaseBinaryColumnDecoder::decode::<_, BinaryColumnDecoder>(
- value,
- encrypt as i32,
- encryption_secret,
- ) {
- Ok(value) => Some(value),
- Err(err) => {
- tracing::error!("Decode value column failed: {:?}", err);
- None
- },
- }
- },
- _ => None,
- };
- let some_key = json.get("key").and_then(|value| value.as_i64());
- if let (Some(value), Some(key)) = (some_record, some_key) {
- // Check the md5 of the value that we received from the server is equal to the md5 of the value
- // that we calculated locally.
- if let Some(expected_md5) = json.get("md5").and_then(|v| v.as_str()) {
- let value_md5 = md5(&value);
- if value_md5 != expected_md5 {
- let msg = format!(
- "md5 not match: key:{} {} != {}",
- key, value_md5, expected_md5
- );
- tracing::error!("{}", msg);
- return Err(anyhow::anyhow!(msg));
- }
- }
- Ok(UpdateItem { key, value })
- } else {
- let keys = json
- .as_object()
- .map(|map| map.iter().map(|(key, _)| key).collect::<Vec<&String>>());
- Err(anyhow::anyhow!(
- "missing key or value column. Current keys:: {:?}",
- keys
- ))
- }
- }
- pub struct UpdateItem {
- pub key: i64,
- pub value: Vec<u8>,
- }
- pub struct RetryCondition(Weak<PostgresWrapper>);
- impl Condition<anyhow::Error> for RetryCondition {
- fn should_retry(&mut self, _error: &anyhow::Error) -> bool {
- self.0.upgrade().is_some()
- }
- }
|