folder_test.rs 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. use assert_json_diff::assert_json_eq;
  2. use collab_plugins::cloud_storage::{CollabObject, CollabType};
  3. use futures::future::join_all;
  4. use serde_json::json;
  5. use tokio::task;
  6. use uuid::Uuid;
  7. use yrs::types::ToJson;
  8. use yrs::updates::decoder::Decode;
  9. use yrs::{merge_updates_v1, Array, Doc, Map, MapPrelim, ReadTxn, StateVector, Transact, Update};
  10. use flowy_user_deps::entities::SignUpResponse;
  11. use lib_infra::box_any::BoxAny;
  12. use crate::supabase_test::util::{
  13. collab_service, folder_service, get_supabase_config, sign_up_param, user_auth_service,
  14. };
  15. #[tokio::test]
  16. async fn supabase_create_workspace_test() {
  17. if get_supabase_config().is_none() {
  18. return;
  19. }
  20. let service = folder_service();
  21. // will replace the uid with the real uid
  22. let workspace = service.create_workspace(1, "test").await.unwrap();
  23. dbg!(workspace);
  24. }
  25. #[tokio::test]
  26. async fn supabase_get_folder_test() {
  27. if get_supabase_config().is_none() {
  28. return;
  29. }
  30. let folder_service = folder_service();
  31. let user_service = user_auth_service();
  32. let collab_service = collab_service();
  33. let uuid = Uuid::new_v4().to_string();
  34. let params = sign_up_param(uuid);
  35. let user: SignUpResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
  36. let collab_object = CollabObject {
  37. object_id: user.latest_workspace.id.clone(),
  38. uid: user.user_id,
  39. ty: CollabType::Folder,
  40. meta: Default::default(),
  41. }
  42. .with_workspace_id(user.latest_workspace.id.clone());
  43. let doc = Doc::with_client_id(1);
  44. let map = { doc.get_or_insert_map("map") };
  45. {
  46. let mut txn = doc.transact_mut();
  47. map.insert(&mut txn, "1", "a");
  48. collab_service
  49. .send_update(&collab_object, 0, txn.encode_update_v1())
  50. .await
  51. .unwrap();
  52. };
  53. {
  54. let mut txn = doc.transact_mut();
  55. map.insert(&mut txn, "2", "b");
  56. collab_service
  57. .send_update(&collab_object, 1, txn.encode_update_v1())
  58. .await
  59. .unwrap();
  60. };
  61. // let updates = collab_service.get_all_updates(&collab_object).await.unwrap();
  62. let updates = folder_service
  63. .get_folder_updates(&user.latest_workspace.id, user.user_id)
  64. .await
  65. .unwrap();
  66. assert_eq!(updates.len(), 2);
  67. // The init sync will try to merge the updates into one. Spawn 5 tasks to simulate
  68. // multiple clients trying to init sync at the same time.
  69. let mut handles = Vec::new();
  70. for _ in 0..5 {
  71. let cloned_collab_service = collab_service.clone();
  72. let cloned_collab_object = collab_object.clone();
  73. let handle = task::spawn(async move {
  74. cloned_collab_service
  75. .send_init_sync(&cloned_collab_object, 3, vec![])
  76. .await
  77. .unwrap();
  78. });
  79. handles.push(handle);
  80. }
  81. let _results: Vec<_> = join_all(handles).await;
  82. // after the init sync, the updates should be merged into one.
  83. let updates: Vec<Vec<u8>> = folder_service
  84. .get_folder_updates(&user.latest_workspace.id, user.user_id)
  85. .await
  86. .unwrap();
  87. assert_eq!(updates.len(), 1);
  88. // Other the init sync, try to get the updates from the server.
  89. let remote_update = updates.first().unwrap().clone();
  90. let expected_update = doc
  91. .transact_mut()
  92. .encode_state_as_update_v1(&StateVector::default());
  93. // check the update is the same as local document update.
  94. assert_eq!(remote_update, expected_update);
  95. }
  96. /// This async test function checks the behavior of updates duplication in Supabase.
  97. /// It creates a new user and simulates two updates to the user's workspace with different values.
  98. /// Then, it merges these updates and sends an initial synchronization request to test duplication handling.
  99. /// Finally, it asserts that the duplicated updates don't affect the overall data consistency in Supabase.
  100. #[tokio::test]
  101. async fn supabase_duplicate_updates_test() {
  102. if get_supabase_config().is_none() {
  103. return;
  104. }
  105. let folder_service = folder_service();
  106. let user_service = user_auth_service();
  107. let collab_service = collab_service();
  108. let uuid = Uuid::new_v4().to_string();
  109. let params = sign_up_param(uuid);
  110. let user: SignUpResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
  111. let collab_object = CollabObject {
  112. object_id: user.latest_workspace.id.clone(),
  113. uid: user.user_id,
  114. ty: CollabType::Folder,
  115. meta: Default::default(),
  116. }
  117. .with_workspace_id(user.latest_workspace.id.clone());
  118. let doc = Doc::with_client_id(1);
  119. let map = { doc.get_or_insert_map("map") };
  120. let mut duplicated_updates = vec![];
  121. {
  122. let mut txn = doc.transact_mut();
  123. map.insert(&mut txn, "1", "a");
  124. let update = txn.encode_update_v1();
  125. duplicated_updates.push(update.clone());
  126. collab_service
  127. .send_update(&collab_object, 0, update)
  128. .await
  129. .unwrap();
  130. };
  131. {
  132. let mut txn = doc.transact_mut();
  133. map.insert(&mut txn, "2", "b");
  134. let update = txn.encode_update_v1();
  135. duplicated_updates.push(update.clone());
  136. collab_service
  137. .send_update(&collab_object, 1, update)
  138. .await
  139. .unwrap();
  140. };
  141. // send init sync
  142. collab_service
  143. .send_init_sync(&collab_object, 3, vec![])
  144. .await
  145. .unwrap();
  146. let first_init_sync_update: Vec<u8> = folder_service
  147. .get_folder_updates(&user.latest_workspace.id, user.user_id)
  148. .await
  149. .unwrap()
  150. .first()
  151. .unwrap()
  152. .clone();
  153. // simulate the duplicated updates.
  154. let merged_update = merge_updates_v1(
  155. &duplicated_updates
  156. .iter()
  157. .map(|update| update.as_ref())
  158. .collect::<Vec<&[u8]>>(),
  159. )
  160. .unwrap();
  161. collab_service
  162. .send_init_sync(&collab_object, 4, merged_update)
  163. .await
  164. .unwrap();
  165. let second_init_sync_update: Vec<u8> = folder_service
  166. .get_folder_updates(&user.latest_workspace.id, user.user_id)
  167. .await
  168. .unwrap()
  169. .first()
  170. .unwrap()
  171. .clone();
  172. let doc_2 = Doc::new();
  173. assert_eq!(first_init_sync_update.len(), second_init_sync_update.len());
  174. let map = { doc_2.get_or_insert_map("map") };
  175. {
  176. let mut txn = doc_2.transact_mut();
  177. let update = Update::decode_v1(&second_init_sync_update).unwrap();
  178. txn.apply_update(update);
  179. }
  180. {
  181. let txn = doc_2.transact();
  182. let json = map.to_json(&txn);
  183. assert_json_eq!(
  184. json,
  185. json!({
  186. "1": "a",
  187. "2": "b"
  188. })
  189. );
  190. }
  191. }
  192. #[tokio::test]
  193. async fn supabase_diff_state_vec_test() {
  194. if get_supabase_config().is_none() {
  195. return;
  196. }
  197. let folder_service = folder_service();
  198. let user_service = user_auth_service();
  199. let collab_service = collab_service();
  200. let uuid = Uuid::new_v4().to_string();
  201. let params = sign_up_param(uuid);
  202. let user: SignUpResponse = user_service.sign_up(BoxAny::new(params)).await.unwrap();
  203. let collab_object = CollabObject {
  204. object_id: user.latest_workspace.id.clone(),
  205. uid: user.user_id,
  206. ty: CollabType::Folder,
  207. meta: Default::default(),
  208. }
  209. .with_workspace_id(user.latest_workspace.id.clone());
  210. let doc = Doc::with_client_id(1);
  211. let map = { doc.get_or_insert_map("map") };
  212. let array = { doc.get_or_insert_array("array") };
  213. {
  214. let mut txn = doc.transact_mut();
  215. map.insert(&mut txn, "1", "a");
  216. map.insert(&mut txn, "inner_map", MapPrelim::<String>::new());
  217. array.push_back(&mut txn, "element 1");
  218. let update = txn.encode_update_v1();
  219. collab_service
  220. .send_update(&collab_object, 0, update)
  221. .await
  222. .unwrap();
  223. };
  224. {
  225. let mut txn = doc.transact_mut();
  226. map.insert(&mut txn, "2", "b");
  227. array.push_back(&mut txn, "element 2");
  228. let update = txn.encode_update_v1();
  229. collab_service
  230. .send_update(&collab_object, 1, update)
  231. .await
  232. .unwrap();
  233. };
  234. // restore the doc with given updates.
  235. let old_version_doc = Doc::new();
  236. let map = { old_version_doc.get_or_insert_map("map") };
  237. let updates: Vec<Vec<u8>> = folder_service
  238. .get_folder_updates(&user.latest_workspace.id, user.user_id)
  239. .await
  240. .unwrap();
  241. {
  242. let mut txn = old_version_doc.transact_mut();
  243. for update in updates {
  244. let update = Update::decode_v1(&update).unwrap();
  245. txn.apply_update(update);
  246. }
  247. }
  248. let txn = old_version_doc.transact();
  249. let json = map.to_json(&txn);
  250. assert_json_eq!(
  251. json,
  252. json!({
  253. "1": "a",
  254. "2": "b",
  255. "inner_map": {}
  256. })
  257. );
  258. }