request.rs 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. use std::future::Future;
  2. use std::iter::Take;
  3. use std::pin::Pin;
  4. use std::str::FromStr;
  5. use std::sync::{Arc, Weak};
  6. use std::time::Duration;
  7. use anyhow::Error;
  8. use chrono::{DateTime, Utc};
  9. use collab_plugins::cloud_storage::{CollabObject, CollabType, RemoteCollabSnapshot};
  10. use serde_json::Value;
  11. use tokio_retry::strategy::FixedInterval;
  12. use tokio_retry::{Action, Condition, RetryIf};
  13. use flowy_database_deps::cloud::{CollabObjectUpdate, CollabObjectUpdateByOid};
  14. use lib_infra::util::md5;
  15. use crate::supabase::api::util::{
  16. ExtendedResponse, InsertParamsBuilder, SupabaseBinaryColumnDecoder,
  17. };
  18. use crate::supabase::api::PostgresWrapper;
  19. use crate::supabase::define::*;
  20. pub struct FetchObjectUpdateAction {
  21. object_id: String,
  22. object_ty: CollabType,
  23. postgrest: Weak<PostgresWrapper>,
  24. }
  25. impl FetchObjectUpdateAction {
  26. pub fn new(object_id: String, object_ty: CollabType, postgrest: Weak<PostgresWrapper>) -> Self {
  27. Self {
  28. postgrest,
  29. object_id,
  30. object_ty,
  31. }
  32. }
  33. pub fn run(self) -> RetryIf<Take<FixedInterval>, FetchObjectUpdateAction, RetryCondition> {
  34. let postgrest = self.postgrest.clone();
  35. let retry_strategy = FixedInterval::new(Duration::from_secs(5)).take(3);
  36. RetryIf::spawn(retry_strategy, self, RetryCondition(postgrest))
  37. }
  38. pub fn run_with_fix_interval(
  39. self,
  40. secs: u64,
  41. times: usize,
  42. ) -> RetryIf<Take<FixedInterval>, FetchObjectUpdateAction, RetryCondition> {
  43. let postgrest = self.postgrest.clone();
  44. let retry_strategy = FixedInterval::new(Duration::from_secs(secs)).take(times);
  45. RetryIf::spawn(retry_strategy, self, RetryCondition(postgrest))
  46. }
  47. }
  48. impl Action for FetchObjectUpdateAction {
  49. type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send>>;
  50. type Item = CollabObjectUpdate;
  51. type Error = anyhow::Error;
  52. fn run(&mut self) -> Self::Future {
  53. let weak_postgres = self.postgrest.clone();
  54. let object_id = self.object_id.clone();
  55. let object_ty = self.object_ty.clone();
  56. Box::pin(async move {
  57. match weak_postgres.upgrade() {
  58. None => Ok(vec![]),
  59. Some(postgrest) => {
  60. let items = get_updates_from_server(&object_id, &object_ty, postgrest).await?;
  61. Ok(items.into_iter().map(|item| item.value).collect())
  62. },
  63. }
  64. })
  65. }
  66. }
  67. pub struct BatchFetchObjectUpdateAction {
  68. object_ids: Vec<String>,
  69. object_ty: CollabType,
  70. postgrest: Weak<PostgresWrapper>,
  71. }
  72. impl BatchFetchObjectUpdateAction {
  73. pub fn new(
  74. object_ids: Vec<String>,
  75. object_ty: CollabType,
  76. postgrest: Weak<PostgresWrapper>,
  77. ) -> Self {
  78. Self {
  79. postgrest,
  80. object_ty,
  81. object_ids,
  82. }
  83. }
  84. pub fn run(self) -> RetryIf<Take<FixedInterval>, BatchFetchObjectUpdateAction, RetryCondition> {
  85. let postgrest = self.postgrest.clone();
  86. let retry_strategy = FixedInterval::new(Duration::from_secs(5)).take(3);
  87. RetryIf::spawn(retry_strategy, self, RetryCondition(postgrest))
  88. }
  89. }
  90. impl Action for BatchFetchObjectUpdateAction {
  91. type Future = Pin<Box<dyn Future<Output = Result<Self::Item, Self::Error>> + Send>>;
  92. type Item = CollabObjectUpdateByOid;
  93. type Error = anyhow::Error;
  94. fn run(&mut self) -> Self::Future {
  95. let weak_postgrest = self.postgrest.clone();
  96. let object_ids = self.object_ids.clone();
  97. let object_ty = self.object_ty.clone();
  98. Box::pin(async move {
  99. match weak_postgrest.upgrade() {
  100. None => Ok(CollabObjectUpdateByOid::default()),
  101. Some(server) => batch_get_updates_from_server(object_ids, &object_ty, server).await,
  102. }
  103. })
  104. }
  105. }
  106. pub async fn create_snapshot(
  107. postgrest: &Arc<PostgresWrapper>,
  108. object: &CollabObject,
  109. snapshot: Vec<u8>,
  110. ) -> Result<i64, Error> {
  111. let value_size = snapshot.len() as i32;
  112. let snapshot = format!("\\x{}", hex::encode(snapshot));
  113. postgrest
  114. .from(AF_COLLAB_SNAPSHOT_TABLE)
  115. .insert(
  116. InsertParamsBuilder::new()
  117. .insert(AF_COLLAB_SNAPSHOT_OID_COLUMN, object.object_id.clone())
  118. .insert("name", object.ty.to_string())
  119. .insert(AF_COLLAB_SNAPSHOT_BLOB_COLUMN, snapshot)
  120. .insert(AF_COLLAB_SNAPSHOT_BLOB_SIZE_COLUMN, value_size)
  121. .build(),
  122. )
  123. .execute()
  124. .await?
  125. .success()
  126. .await?;
  127. Ok(1)
  128. }
  129. pub async fn get_latest_snapshot_from_server(
  130. object_id: &str,
  131. postgrest: Arc<PostgresWrapper>,
  132. ) -> Result<Option<RemoteCollabSnapshot>, Error> {
  133. let json = postgrest
  134. .from(AF_COLLAB_SNAPSHOT_TABLE)
  135. .select(format!(
  136. "{},{},{}",
  137. AF_COLLAB_SNAPSHOT_ID_COLUMN,
  138. AF_COLLAB_SNAPSHOT_BLOB_COLUMN,
  139. AF_COLLAB_SNAPSHOT_CREATED_AT_COLUMN
  140. ))
  141. .order(format!("{}.desc", AF_COLLAB_SNAPSHOT_ID_COLUMN))
  142. .limit(1)
  143. .eq(AF_COLLAB_SNAPSHOT_OID_COLUMN, object_id)
  144. .execute()
  145. .await?
  146. .get_json()
  147. .await?;
  148. let snapshot = json
  149. .as_array()
  150. .and_then(|array| array.first())
  151. .and_then(|value| {
  152. let blob = value
  153. .get("blob")
  154. .and_then(|blob| blob.as_str())
  155. .and_then(SupabaseBinaryColumnDecoder::decode)?;
  156. let sid = value.get("sid").and_then(|id| id.as_i64())?;
  157. let created_at = value.get("created_at").and_then(|created_at| {
  158. created_at
  159. .as_str()
  160. .map(|id| DateTime::<Utc>::from_str(id).ok())
  161. .and_then(|date| date)
  162. })?;
  163. Some(RemoteCollabSnapshot {
  164. sid,
  165. oid: object_id.to_string(),
  166. blob,
  167. created_at: created_at.timestamp(),
  168. })
  169. });
  170. Ok(snapshot)
  171. }
  172. pub async fn batch_get_updates_from_server(
  173. object_ids: Vec<String>,
  174. object_ty: &CollabType,
  175. postgrest: Arc<PostgresWrapper>,
  176. ) -> Result<CollabObjectUpdateByOid, Error> {
  177. let json = postgrest
  178. .from(table_name(object_ty))
  179. .select("oid, key, value, md5")
  180. .order(format!("{}.asc", AF_COLLAB_KEY_COLUMN))
  181. .in_("oid", object_ids)
  182. .execute()
  183. .await?
  184. .get_json()
  185. .await?;
  186. let mut updates_by_oid = CollabObjectUpdateByOid::new();
  187. if let Some(records) = json.as_array() {
  188. for record in records {
  189. if let Some(oid) = record.get("oid").and_then(|value| value.as_str()) {
  190. if let Ok(updates) = parser_updates_form_json(record.clone()) {
  191. let object_updates = updates_by_oid
  192. .entry(oid.to_string())
  193. .or_insert_with(Vec::new);
  194. tracing::debug!("get updates from server: {:?}", record);
  195. for update in updates {
  196. object_updates.push(update.value);
  197. }
  198. }
  199. }
  200. }
  201. }
  202. Ok(updates_by_oid)
  203. }
  204. pub async fn get_updates_from_server(
  205. object_id: &str,
  206. object_ty: &CollabType,
  207. postgrest: Arc<PostgresWrapper>,
  208. ) -> Result<Vec<UpdateItem>, Error> {
  209. let json = postgrest
  210. .from(table_name(object_ty))
  211. .select("key, value, md5")
  212. .order(format!("{}.asc", AF_COLLAB_KEY_COLUMN))
  213. .eq("oid", object_id)
  214. .execute()
  215. .await?
  216. .get_json()
  217. .await?;
  218. parser_updates_form_json(json)
  219. }
  220. /// json format:
  221. /// ```json
  222. /// [
  223. /// {
  224. /// "value": "\\x...",
  225. /// "md5": "..."
  226. /// },
  227. /// {
  228. /// "value": "\\x...",
  229. /// "md5": "..."
  230. /// },
  231. /// ...
  232. /// ]
  233. /// ```
  234. fn parser_updates_form_json(json: Value) -> Result<Vec<UpdateItem>, Error> {
  235. let mut updates = vec![];
  236. match json.as_array() {
  237. None => {
  238. updates.push(parser_update_from_json(&json)?);
  239. },
  240. Some(values) => {
  241. for value in values {
  242. updates.push(parser_update_from_json(value)?);
  243. }
  244. },
  245. }
  246. Ok(updates)
  247. }
  248. fn parser_update_from_json(json: &Value) -> Result<UpdateItem, Error> {
  249. let some_record = json
  250. .get("value")
  251. .and_then(|value| value.as_str())
  252. .and_then(SupabaseBinaryColumnDecoder::decode);
  253. let some_key = json.get("key").and_then(|value| value.as_i64());
  254. if let (Some(value), Some(key)) = (some_record, some_key) {
  255. // Check the md5 of the value that we received from the server is equal to the md5 of the value
  256. // that we calculated locally.
  257. if let Some(expected_md5) = json.get("md5").and_then(|v| v.as_str()) {
  258. let value_md5 = md5(&value);
  259. debug_assert!(
  260. value_md5 == expected_md5,
  261. "md5 not match: {} != {}",
  262. value_md5,
  263. expected_md5
  264. );
  265. }
  266. Ok(UpdateItem { key, value })
  267. } else {
  268. Err(anyhow::anyhow!(
  269. "missing key or value column in json: {:?}",
  270. json
  271. ))
  272. }
  273. }
  274. pub struct UpdateItem {
  275. pub key: i64,
  276. pub value: Vec<u8>,
  277. }
  278. pub struct RetryCondition(Weak<PostgresWrapper>);
  279. impl Condition<anyhow::Error> for RetryCondition {
  280. fn should_retry(&mut self, _error: &anyhow::Error) -> bool {
  281. self.0.upgrade().is_some()
  282. }
  283. }