pg_row.rs 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. use std::sync::Arc;
  2. use assert_json_diff::assert_json_eq;
  3. use collab::core::collab::MutexCollab;
  4. use collab::core::origin::CollabOrigin;
  5. use collab::preclude::updates::decoder::Decode;
  6. use collab::preclude::Update;
  7. use parking_lot::RwLock;
  8. use serde_json::json;
  9. use flowy_database2::deps::DatabaseCloudService;
  10. use flowy_document2::deps::DocumentCloudService;
  11. use flowy_folder2::deps::{FolderCloudService, FolderSnapshot};
  12. use flowy_server::supabase::impls::{
  13. SupabaseDatabaseCloudServiceImpl, SupabaseDocumentCloudServiceImpl,
  14. SupabaseFolderCloudServiceImpl,
  15. };
  16. use flowy_server::supabase::{PostgresServer, SupabaseServerServiceImpl};
  17. use crate::util::get_supabase_config;
  18. pub struct PostgresConnect {
  19. inner: Arc<PostgresServer>,
  20. }
  21. impl PostgresConnect {
  22. pub fn new() -> Option<Self> {
  23. let config = get_supabase_config()?;
  24. let inner = PostgresServer::new(config.postgres_config);
  25. Some(Self {
  26. inner: Arc::new(inner),
  27. })
  28. }
  29. fn server_provider_impl(&self) -> SupabaseServerServiceImpl {
  30. SupabaseServerServiceImpl(Arc::new(RwLock::new(Some(self.inner.clone()))))
  31. }
  32. async fn get_folder(&self, workspace_id: &str) -> MutexCollab {
  33. let folder_service = SupabaseFolderCloudServiceImpl::new(self.server_provider_impl());
  34. let updates = folder_service
  35. .get_folder_updates(workspace_id, 0)
  36. .await
  37. .unwrap();
  38. let collab = MutexCollab::new(CollabOrigin::Server, workspace_id, vec![]);
  39. collab.lock().with_transact_mut(|txn| {
  40. for update in updates {
  41. txn.apply_update(Update::decode_v1(&update).unwrap());
  42. }
  43. });
  44. collab
  45. }
  46. async fn get_folder_snapshot(&self, workspace_id: &str) -> MutexCollab {
  47. let folder_service = SupabaseFolderCloudServiceImpl::new(self.server_provider_impl());
  48. let snapshot: FolderSnapshot = folder_service
  49. .get_folder_latest_snapshot(workspace_id)
  50. .await
  51. .unwrap()
  52. .unwrap();
  53. let collab = MutexCollab::new(CollabOrigin::Server, workspace_id, vec![]);
  54. collab.lock().with_transact_mut(|txn| {
  55. txn.apply_update(Update::decode_v1(&snapshot.data).unwrap());
  56. });
  57. collab
  58. }
  59. async fn get_database_collab_object(&self, object_id: &str) -> MutexCollab {
  60. let database_service = SupabaseDatabaseCloudServiceImpl::new(self.server_provider_impl());
  61. let updates = database_service.get_collab_update(object_id).await.unwrap();
  62. let collab = MutexCollab::new(CollabOrigin::Server, object_id, vec![]);
  63. collab.lock().with_transact_mut(|txn| {
  64. for update in updates {
  65. txn.apply_update(Update::decode_v1(&update).unwrap());
  66. }
  67. });
  68. collab
  69. }
  70. async fn get_database_rows_object(&self, row_ids: Vec<String>) -> Vec<MutexCollab> {
  71. let database_service = SupabaseDatabaseCloudServiceImpl::new(self.server_provider_impl());
  72. let updates_by_oid = database_service
  73. .batch_get_collab_updates(row_ids)
  74. .await
  75. .unwrap();
  76. let mut collabs = vec![];
  77. for (oid, updates) in updates_by_oid {
  78. let collab = MutexCollab::new(CollabOrigin::Server, &oid, vec![]);
  79. collab.lock().with_transact_mut(|txn| {
  80. for update in updates {
  81. txn.apply_update(Update::decode_v1(&update).unwrap());
  82. }
  83. });
  84. collabs.push(collab);
  85. }
  86. collabs
  87. }
  88. async fn get_document(&self, document_id: &str) -> MutexCollab {
  89. let document_service = SupabaseDocumentCloudServiceImpl::new(self.server_provider_impl());
  90. let updates = document_service
  91. .get_document_updates(document_id)
  92. .await
  93. .unwrap();
  94. let collab = MutexCollab::new(CollabOrigin::Server, document_id, vec![]);
  95. collab.lock().with_transact_mut(|txn| {
  96. for update in updates {
  97. txn.apply_update(Update::decode_v1(&update).unwrap());
  98. }
  99. });
  100. collab
  101. }
  102. }
  103. #[tokio::test]
  104. async fn get_folder_test() {
  105. if let Some(conn) = PostgresConnect::new() {
  106. let collab = conn
  107. .get_folder("2ddf790f-18bb-4e9c-aacb-f29ca755f72a")
  108. .await;
  109. let value = collab.to_json_value();
  110. assert_json_eq!(value, json!(""));
  111. }
  112. }
  113. #[tokio::test]
  114. async fn get_folder_snapshot() {
  115. if let Some(conn) = PostgresConnect::new() {
  116. let collab = conn
  117. .get_folder_snapshot("17f5e820-dcc8-4ca9-ab93-b45f17ca0948")
  118. .await;
  119. let value = collab.to_json_value();
  120. assert_json_eq!(value, json!(""));
  121. }
  122. }
  123. #[tokio::test]
  124. async fn get_document_test() {
  125. if let Some(conn) = PostgresConnect::new() {
  126. let collab = conn
  127. .get_document("158c8275-ff6d-49e1-a2ed-82c71dea1126")
  128. .await;
  129. let value = collab.to_json_value();
  130. assert_json_eq!(value, json!(""));
  131. }
  132. }
  133. #[tokio::test]
  134. async fn get_workspace_database_test() {
  135. if let Some(conn) = PostgresConnect::new() {
  136. let collab = conn
  137. .get_database_collab_object("MTp1c2VyOmRhdGFiYXNl")
  138. .await;
  139. let value = collab.to_json_value();
  140. assert_json_eq!(value, json!(""));
  141. }
  142. }
  143. #[tokio::test]
  144. async fn batch_get_database_rows_test() {
  145. if let Some(conn) = PostgresConnect::new() {
  146. let row_ids = vec![
  147. "93cebb2d-4831-496c-adde-1a82bd745099".to_string(),
  148. "7989a12f-23b2-48ff-8d5f-9bdf651ad7aa".to_string(),
  149. ];
  150. let collabs = conn.get_database_rows_object(row_ids).await;
  151. for collab in collabs {
  152. let value = collab.to_json_value();
  153. println!("{}", value);
  154. }
  155. }
  156. }