collab_storage.rs 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. use std::str::FromStr;
  2. use std::sync::Arc;
  3. use anyhow::Error;
  4. use chrono::{DateTime, Utc};
  5. use collab::preclude::merge_updates_v1;
  6. use collab_plugins::cloud_storage::{
  7. CollabObject, MsgId, RemoteCollabSnapshot, RemoteCollabState, RemoteCollabStorage,
  8. RemoteUpdateReceiver,
  9. };
  10. use tokio::task::spawn_blocking;
  11. use lib_infra::async_trait::async_trait;
  12. use lib_infra::util::md5;
  13. use crate::supabase::api::request::{
  14. create_snapshot, get_latest_snapshot_from_server, get_updates_from_server,
  15. FetchObjectUpdateAction, UpdateItem,
  16. };
  17. use crate::supabase::api::util::{ExtendedResponse, InsertParamsBuilder};
  18. use crate::supabase::api::{PostgresWrapper, SupabaseServerService};
  19. use crate::supabase::define::*;
  20. pub struct SupabaseCollabStorageImpl<T>(T);
  21. impl<T> SupabaseCollabStorageImpl<T> {
  22. pub fn new(server: T) -> Self {
  23. Self(server)
  24. }
  25. }
  26. #[async_trait]
  27. impl<T> RemoteCollabStorage for SupabaseCollabStorageImpl<T>
  28. where
  29. T: SupabaseServerService,
  30. {
  31. fn is_enable(&self) -> bool {
  32. true
  33. }
  34. async fn get_all_updates(&self, object: &CollabObject) -> Result<Vec<Vec<u8>>, Error> {
  35. let postgrest = self.0.try_get_weak_postgrest()?;
  36. let action = FetchObjectUpdateAction::new(object.id.clone(), object.ty.clone(), postgrest);
  37. let updates = action.run().await?;
  38. Ok(updates)
  39. }
  40. async fn get_latest_snapshot(&self, object_id: &str) -> Option<RemoteCollabSnapshot> {
  41. let postgrest = self.0.try_get_postgrest().ok()?;
  42. get_latest_snapshot_from_server(object_id, postgrest)
  43. .await
  44. .ok()?
  45. }
  46. async fn get_collab_state(&self, object_id: &str) -> Result<Option<RemoteCollabState>, Error> {
  47. let postgrest = self.0.try_get_postgrest()?;
  48. let json = postgrest
  49. .from("af_collab_state")
  50. .select("*")
  51. .eq("oid", object_id)
  52. .order("snapshot_created_at.desc".to_string())
  53. .limit(1)
  54. .execute()
  55. .await?
  56. .get_json()
  57. .await?;
  58. Ok(
  59. json
  60. .as_array()
  61. .and_then(|array| array.first())
  62. .and_then(|value| {
  63. let created_at = value.get("snapshot_created_at").and_then(|created_at| {
  64. created_at
  65. .as_str()
  66. .map(|id| DateTime::<Utc>::from_str(id).ok())
  67. .and_then(|date| date)
  68. })?;
  69. let current_edit_count = value.get("current_edit_count").and_then(|id| id.as_i64())?;
  70. let last_snapshot_edit_count = value
  71. .get("last_snapshot_edit_count")
  72. .and_then(|id| id.as_i64())?;
  73. Some(RemoteCollabState {
  74. current_edit_count,
  75. last_snapshot_edit_count,
  76. last_snapshot_created_at: created_at.timestamp(),
  77. })
  78. }),
  79. )
  80. }
  81. async fn create_snapshot(&self, object: &CollabObject, snapshot: Vec<u8>) -> Result<i64, Error> {
  82. let postgrest = self.0.try_get_postgrest()?;
  83. create_snapshot(&postgrest, object, snapshot).await
  84. }
  85. async fn send_update(
  86. &self,
  87. object: &CollabObject,
  88. _id: MsgId,
  89. update: Vec<u8>,
  90. ) -> Result<(), Error> {
  91. let postgrest = self.0.try_get_postgrest()?;
  92. let workspace_id = object
  93. .get_workspace_id()
  94. .ok_or(anyhow::anyhow!("Invalid workspace id"))?;
  95. send_update(workspace_id, object, update, &postgrest).await
  96. }
  97. async fn send_init_sync(
  98. &self,
  99. object: &CollabObject,
  100. _id: MsgId,
  101. init_update: Vec<u8>,
  102. ) -> Result<(), Error> {
  103. let postgrest = self.0.try_get_postgrest()?;
  104. let workspace_id = object
  105. .get_workspace_id()
  106. .ok_or(anyhow::anyhow!("Invalid workspace id"))?;
  107. let update_items = get_updates_from_server(&object.id, &object.ty, postgrest.clone()).await?;
  108. // If the update_items is empty, we can send the init_update directly
  109. if update_items.is_empty() {
  110. send_update(workspace_id, object, init_update, &postgrest).await?;
  111. } else {
  112. // 2.Merge the updates into one and then delete the merged updates
  113. let merge_result = spawn_blocking(move || merge_updates(update_items, init_update)).await??;
  114. tracing::trace!("Merged updates count: {}", merge_result.merged_keys.len());
  115. let override_key = merge_result.merged_keys.last().cloned().unwrap();
  116. let value_size = merge_result.new_update.len() as i32;
  117. let md5 = md5(&merge_result.new_update);
  118. let new_update = format!("\\x{}", hex::encode(merge_result.new_update));
  119. let params = InsertParamsBuilder::new()
  120. .insert("oid", object.id.clone())
  121. .insert("new_key", override_key)
  122. .insert("new_value", new_update)
  123. .insert("md5", md5)
  124. .insert("value_size", value_size)
  125. .insert("partition_key", partition_key(&object.ty))
  126. .insert("uid", object.uid)
  127. .insert("workspace_id", workspace_id)
  128. .insert("removed_keys", merge_result.merged_keys)
  129. .build();
  130. postgrest
  131. .rpc("flush_collab_updates", params)
  132. .execute()
  133. .await?
  134. .success()
  135. .await?;
  136. }
  137. Ok(())
  138. }
  139. async fn subscribe_remote_updates(&self, _object: &CollabObject) -> Option<RemoteUpdateReceiver> {
  140. todo!()
  141. }
  142. }
  143. async fn send_update(
  144. workspace_id: String,
  145. object: &CollabObject,
  146. update: Vec<u8>,
  147. postgrest: &Arc<PostgresWrapper>,
  148. ) -> Result<(), Error> {
  149. let value_size = update.len() as i32;
  150. let md5 = md5(&update);
  151. let update = format!("\\x{}", hex::encode(update));
  152. let builder = InsertParamsBuilder::new()
  153. .insert("oid", object.id.clone())
  154. .insert("partition_key", partition_key(&object.ty))
  155. .insert("value", update)
  156. .insert("uid", object.uid)
  157. .insert("md5", md5)
  158. .insert("workspace_id", workspace_id)
  159. .insert("value_size", value_size);
  160. let params = builder.build();
  161. postgrest
  162. .from(AF_COLLAB_UPDATE_TABLE)
  163. .insert(params)
  164. .execute()
  165. .await?
  166. .success()
  167. .await?;
  168. Ok(())
  169. }
  170. fn merge_updates(update_items: Vec<UpdateItem>, new_update: Vec<u8>) -> Result<MergeResult, Error> {
  171. let mut updates = vec![];
  172. let mut merged_keys = vec![];
  173. for item in update_items {
  174. merged_keys.push(item.key);
  175. updates.push(item.value);
  176. }
  177. if !new_update.is_empty() {
  178. updates.push(new_update);
  179. }
  180. let updates = updates
  181. .iter()
  182. .map(|update| update.as_ref())
  183. .collect::<Vec<&[u8]>>();
  184. let new_update = merge_updates_v1(&updates)?;
  185. Ok(MergeResult {
  186. merged_keys,
  187. new_update,
  188. })
  189. }
  190. struct MergeResult {
  191. merged_keys: Vec<i64>,
  192. new_update: Vec<u8>,
  193. }