request.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. use std::future::Future;
  2. use std::iter::Take;
  3. use std::pin::Pin;
  4. use std::str::FromStr;
  5. use std::sync::{Arc, Weak};
  6. use std::time::Duration;
  7. use anyhow::Error;
  8. use chrono::{DateTime, Utc};
  9. use collab_plugins::cloud_storage::{CollabObject, CollabType, RemoteCollabSnapshot};
  10. use serde_json::Value;
  11. use tokio_retry::strategy::FixedInterval;
  12. use tokio_retry::{Action, Condition, RetryIf};
  13. use flowy_database_deps::cloud::{CollabObjectUpdate, CollabObjectUpdateByOid};
  14. use lib_infra::util::md5;
  15. use crate::supabase::api::util::{
  16. BinaryColumnDecoder, ExtendedResponse, InsertParamsBuilder, SupabaseBinaryColumnDecoder,
  17. SupabaseBinaryColumnEncoder,
  18. };
  19. use crate::supabase::api::PostgresWrapper;
  20. use crate::supabase::define::*;
  21. pub struct FetchObjectUpdateAction {
  22. object_id: String,
  23. object_ty: CollabType,
  24. postgrest: Weak<PostgresWrapper>,
  25. }
  26. impl FetchObjectUpdateAction {
  27. pub fn new(object_id: String, object_ty: CollabType, postgrest: Weak<PostgresWrapper>) -> Self {
  28. Self {
  29. postgrest,
  30. object_id,
  31. object_ty,
  32. }
  33. }
  34. pub fn run(self) -> RetryIf<Take<FixedInterval>, FetchObjectUpdateAction, RetryCondition> {
  35. let postgrest = self.postgrest.clone();
  36. let retry_strategy = FixedInterval::new(Duration::from_secs(5)).take(3);
  37. RetryIf::spawn(retry_strategy, self, RetryCondition(postgrest))
  38. }
  39. pub fn run_with_fix_interval(
  40. self,
  41. secs: u64,
  42. times: usize,
  43. ) -> RetryIf<Take<FixedInterval>, FetchObjectUpdateAction, RetryCondition> {
  44. let postgrest = self.postgrest.clone();
  45. let retry_strategy = FixedInterval::new(Duration::from_secs(secs)).take(times);
  46. RetryIf::spawn(retry_strategy, self, RetryCondition(postgrest))
  47. }
  48. }
  49. impl Action for FetchObjectUpdateAction {
  50. type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send>>;
  51. type Item = CollabObjectUpdate;
  52. type Error = anyhow::Error;
  53. fn run(&mut self) -> Self::Future {
  54. let weak_postgres = self.postgrest.clone();
  55. let object_id = self.object_id.clone();
  56. let object_ty = self.object_ty.clone();
  57. Box::pin(async move {
  58. match weak_postgres.upgrade() {
  59. None => Ok(vec![]),
  60. Some(postgrest) => {
  61. match get_updates_from_server(&object_id, &object_ty, &postgrest).await {
  62. Ok(items) => Ok(items.into_iter().map(|item| item.value).collect()),
  63. Err(err) => {
  64. tracing::error!("Get {} updates failed with error: {:?}", object_id, err);
  65. Err(err)
  66. },
  67. }
  68. },
  69. }
  70. })
  71. }
  72. }
  73. pub struct BatchFetchObjectUpdateAction {
  74. object_ids: Vec<String>,
  75. object_ty: CollabType,
  76. postgrest: Weak<PostgresWrapper>,
  77. }
  78. impl BatchFetchObjectUpdateAction {
  79. pub fn new(
  80. object_ids: Vec<String>,
  81. object_ty: CollabType,
  82. postgrest: Weak<PostgresWrapper>,
  83. ) -> Self {
  84. Self {
  85. postgrest,
  86. object_ty,
  87. object_ids,
  88. }
  89. }
  90. pub fn run(self) -> RetryIf<Take<FixedInterval>, BatchFetchObjectUpdateAction, RetryCondition> {
  91. let postgrest = self.postgrest.clone();
  92. let retry_strategy = FixedInterval::new(Duration::from_secs(5)).take(3);
  93. RetryIf::spawn(retry_strategy, self, RetryCondition(postgrest))
  94. }
  95. }
  96. impl Action for BatchFetchObjectUpdateAction {
  97. type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send>>;
  98. type Item = CollabObjectUpdateByOid;
  99. type Error = anyhow::Error;
  100. fn run(&mut self) -> Self::Future {
  101. let weak_postgrest = self.postgrest.clone();
  102. let object_ids = self.object_ids.clone();
  103. let object_ty = self.object_ty.clone();
  104. Box::pin(async move {
  105. match weak_postgrest.upgrade() {
  106. None => Ok(CollabObjectUpdateByOid::default()),
  107. Some(server) => {
  108. match batch_get_updates_from_server(object_ids.clone(), &object_ty, server).await {
  109. Ok(updates_by_oid) => Ok(updates_by_oid),
  110. Err(err) => {
  111. tracing::error!(
  112. "Batch get object with given ids:{:?} failed with error: {:?}",
  113. object_ids,
  114. err
  115. );
  116. Err(err)
  117. },
  118. }
  119. },
  120. }
  121. })
  122. }
  123. }
  124. pub async fn create_snapshot(
  125. postgrest: &Arc<PostgresWrapper>,
  126. object: &CollabObject,
  127. snapshot: Vec<u8>,
  128. ) -> Result<i64, Error> {
  129. let value_size = snapshot.len() as i32;
  130. let (snapshot, encrypt) = SupabaseBinaryColumnEncoder::encode(&snapshot, &postgrest.secret())?;
  131. let ret: Value = postgrest
  132. .from(AF_COLLAB_SNAPSHOT_TABLE)
  133. .insert(
  134. InsertParamsBuilder::new()
  135. .insert(AF_COLLAB_SNAPSHOT_OID_COLUMN, object.object_id.clone())
  136. .insert("name", object.ty.to_string())
  137. .insert(AF_COLLAB_SNAPSHOT_ENCRYPT_COLUMN, encrypt)
  138. .insert(AF_COLLAB_SNAPSHOT_BLOB_COLUMN, snapshot)
  139. .insert(AF_COLLAB_SNAPSHOT_BLOB_SIZE_COLUMN, value_size)
  140. .build(),
  141. )
  142. .execute()
  143. .await?
  144. .get_json()
  145. .await?;
  146. let snapshot_id = ret
  147. .as_array()
  148. .and_then(|array| array.first())
  149. .and_then(|value| value.get("sid"))
  150. .and_then(|value| value.as_i64())
  151. .unwrap_or(0);
  152. Ok(snapshot_id)
  153. }
  154. pub async fn get_snapshots_from_server(
  155. object_id: &str,
  156. postgrest: Arc<PostgresWrapper>,
  157. limit: usize,
  158. ) -> Result<Vec<RemoteCollabSnapshot>, Error> {
  159. let json: Value = postgrest
  160. .from(AF_COLLAB_SNAPSHOT_TABLE)
  161. .select(format!(
  162. "{},{},{},{}",
  163. AF_COLLAB_SNAPSHOT_ID_COLUMN,
  164. AF_COLLAB_SNAPSHOT_BLOB_COLUMN,
  165. AF_COLLAB_SNAPSHOT_CREATED_AT_COLUMN,
  166. AF_COLLAB_SNAPSHOT_ENCRYPT_COLUMN
  167. ))
  168. .order(format!("{}.desc", AF_COLLAB_SNAPSHOT_ID_COLUMN))
  169. .limit(limit)
  170. .eq(AF_COLLAB_SNAPSHOT_OID_COLUMN, object_id)
  171. .execute()
  172. .await?
  173. .get_json()
  174. .await?;
  175. let mut snapshots = vec![];
  176. let secret = postgrest.secret();
  177. match json.as_array() {
  178. None => {
  179. if let Some(snapshot) = parser_snapshot(object_id, &json, &secret) {
  180. snapshots.push(snapshot);
  181. }
  182. },
  183. Some(snapshot_values) => {
  184. for snapshot_value in snapshot_values {
  185. if let Some(snapshot) = parser_snapshot(object_id, snapshot_value, &secret) {
  186. snapshots.push(snapshot);
  187. }
  188. }
  189. },
  190. }
  191. Ok(snapshots)
  192. }
  193. fn parser_snapshot(
  194. object_id: &str,
  195. snapshot: &Value,
  196. secret: &Option<String>,
  197. ) -> Option<RemoteCollabSnapshot> {
  198. let blob = match (
  199. snapshot
  200. .get(AF_COLLAB_SNAPSHOT_ENCRYPT_COLUMN)
  201. .and_then(|encrypt| encrypt.as_i64()),
  202. snapshot
  203. .get(AF_COLLAB_SNAPSHOT_BLOB_COLUMN)
  204. .and_then(|value| value.as_str()),
  205. ) {
  206. (Some(encrypt), Some(value)) => {
  207. SupabaseBinaryColumnDecoder::decode::<_, BinaryColumnDecoder>(value, encrypt as i32, secret)
  208. .ok()
  209. },
  210. _ => None,
  211. }?;
  212. let sid = snapshot.get("sid").and_then(|id| id.as_i64())?;
  213. let created_at = snapshot.get("created_at").and_then(|created_at| {
  214. created_at
  215. .as_str()
  216. .map(|id| DateTime::<Utc>::from_str(id).ok())
  217. .and_then(|date| date)
  218. })?;
  219. Some(RemoteCollabSnapshot {
  220. sid,
  221. oid: object_id.to_string(),
  222. blob,
  223. created_at: created_at.timestamp(),
  224. })
  225. }
  226. pub async fn batch_get_updates_from_server(
  227. object_ids: Vec<String>,
  228. object_ty: &CollabType,
  229. postgrest: Arc<PostgresWrapper>,
  230. ) -> Result<CollabObjectUpdateByOid, Error> {
  231. let json = postgrest
  232. .from(table_name(object_ty))
  233. .select("oid, key, value, encrypt, md5")
  234. .order(format!("{}.asc", AF_COLLAB_KEY_COLUMN))
  235. .in_("oid", object_ids)
  236. .execute()
  237. .await?
  238. .get_json()
  239. .await?;
  240. let mut updates_by_oid = CollabObjectUpdateByOid::new();
  241. if let Some(records) = json.as_array() {
  242. for record in records {
  243. tracing::debug!("get updates from server: {:?}", record);
  244. if let Some(oid) = record.get("oid").and_then(|value| value.as_str()) {
  245. match parser_updates_form_json(record.clone(), &postgrest.secret()) {
  246. Ok(updates) => {
  247. let object_updates = updates_by_oid
  248. .entry(oid.to_string())
  249. .or_insert_with(Vec::new);
  250. for update in updates {
  251. object_updates.push(update.value);
  252. }
  253. },
  254. Err(e) => {
  255. tracing::error!("parser_updates_form_json error: {:?}", e);
  256. },
  257. }
  258. }
  259. }
  260. }
  261. Ok(updates_by_oid)
  262. }
  263. pub async fn get_updates_from_server(
  264. object_id: &str,
  265. object_ty: &CollabType,
  266. postgrest: &Arc<PostgresWrapper>,
  267. ) -> Result<Vec<UpdateItem>, Error> {
  268. let json = postgrest
  269. .from(table_name(object_ty))
  270. .select("key, value, encrypt, md5")
  271. .order(format!("{}.asc", AF_COLLAB_KEY_COLUMN))
  272. .eq("oid", object_id)
  273. .execute()
  274. .await?
  275. .get_json()
  276. .await?;
  277. parser_updates_form_json(json, &postgrest.secret())
  278. }
  279. /// json format:
  280. /// ```json
  281. /// [
  282. /// {
  283. /// "value": "\\x...",
  284. /// "encrypt": 1,
  285. /// "md5": "..."
  286. /// },
  287. /// {
  288. /// "value": "\\x...",
  289. /// "encrypt": 1,
  290. /// "md5": "..."
  291. /// },
  292. /// ...
  293. /// ]
  294. /// ```
  295. fn parser_updates_form_json(
  296. json: Value,
  297. encryption_secret: &Option<String>,
  298. ) -> Result<Vec<UpdateItem>, Error> {
  299. let mut updates = vec![];
  300. match json.as_array() {
  301. None => {
  302. updates.push(parser_update_from_json(&json, encryption_secret)?);
  303. },
  304. Some(values) => {
  305. let expected_update_len = values.len();
  306. for value in values {
  307. updates.push(parser_update_from_json(value, encryption_secret)?);
  308. }
  309. if updates.len() != expected_update_len {
  310. return Err(anyhow::anyhow!(
  311. "The length of the updates does not match the length of the expected updates, indicating that some updates failed to parse."
  312. ));
  313. }
  314. },
  315. }
  316. Ok(updates)
  317. }
  318. /// Parses update from a JSON representation.
  319. ///
  320. /// This function attempts to decode an encrypted value from a JSON object
  321. /// and verify its integrity against a provided MD5 hash.
  322. ///
  323. /// # Parameters
  324. /// - `json`: The JSON value representing the update information.
  325. /// - `encryption_secret`: An optional encryption secret used for decrypting the value.
  326. ///
  327. /// json format:
  328. /// ```json
  329. /// {
  330. /// "value": "\\x...",
  331. /// "encrypt": 1,
  332. /// "md5": "..."
  333. /// },
  334. /// ```
  335. fn parser_update_from_json(
  336. json: &Value,
  337. encryption_secret: &Option<String>,
  338. ) -> Result<UpdateItem, Error> {
  339. let some_record = match (
  340. json.get("encrypt").and_then(|encrypt| encrypt.as_i64()),
  341. json.get("value").and_then(|value| value.as_str()),
  342. ) {
  343. (Some(encrypt), Some(value)) => {
  344. match SupabaseBinaryColumnDecoder::decode::<_, BinaryColumnDecoder>(
  345. value,
  346. encrypt as i32,
  347. encryption_secret,
  348. ) {
  349. Ok(value) => Some(value),
  350. Err(err) => {
  351. tracing::error!("Decode value column failed: {:?}", err);
  352. None
  353. },
  354. }
  355. },
  356. _ => None,
  357. };
  358. let some_key = json.get("key").and_then(|value| value.as_i64());
  359. if let (Some(value), Some(key)) = (some_record, some_key) {
  360. // Check the md5 of the value that we received from the server is equal to the md5 of the value
  361. // that we calculated locally.
  362. if let Some(expected_md5) = json.get("md5").and_then(|v| v.as_str()) {
  363. let value_md5 = md5(&value);
  364. if value_md5 != expected_md5 {
  365. let msg = format!(
  366. "md5 not match: key:{} {} != {}",
  367. key, value_md5, expected_md5
  368. );
  369. tracing::error!("{}", msg);
  370. return Err(anyhow::anyhow!(msg));
  371. }
  372. }
  373. Ok(UpdateItem { key, value })
  374. } else {
  375. let keys = json
  376. .as_object()
  377. .map(|map| map.iter().map(|(key, _)| key).collect::<Vec<&String>>());
  378. Err(anyhow::anyhow!(
  379. "missing key or value column. Current keys:: {:?}",
  380. keys
  381. ))
  382. }
  383. }
  384. pub struct UpdateItem {
  385. pub key: i64,
  386. pub value: Vec<u8>,
  387. }
  388. pub struct RetryCondition(Weak<PostgresWrapper>);
  389. impl Condition<anyhow::Error> for RetryCondition {
  390. fn should_retry(&mut self, _error: &anyhow::Error) -> bool {
  391. self.0.upgrade().is_some()
  392. }
  393. }