server.rs 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. use std::collections::HashMap;
  2. use std::sync::{Arc, Weak};
  3. use collab_plugins::cloud_storage::{CollabObject, RemoteCollabStorage, RemoteUpdateSender};
  4. use parking_lot::{Mutex, RwLock};
  5. use serde_json::Value;
  6. use flowy_database_deps::cloud::DatabaseCloudService;
  7. use flowy_document_deps::cloud::DocumentCloudService;
  8. use flowy_encrypt::decrypt_bytes;
  9. use flowy_folder_deps::cloud::FolderCloudService;
  10. use flowy_server_config::supabase_config::SupabaseConfiguration;
  11. use flowy_user_deps::cloud::UserService;
  12. use crate::supabase::api::{
  13. RESTfulPostgresServer, SupabaseCollabStorageImpl, SupabaseDatabaseServiceImpl,
  14. SupabaseDocumentServiceImpl, SupabaseFolderServiceImpl, SupabaseServerServiceImpl,
  15. SupabaseUserServiceImpl,
  16. };
  17. use crate::supabase::entities::RealtimeCollabUpdateEvent;
  18. use crate::{AppFlowyEncryption, AppFlowyServer};
  19. /// https://www.pgbouncer.org/features.html
  20. /// Only support session mode.
  21. ///
  22. /// Session mode:
  23. /// When a new client connects, a connection is assigned to the client until it disconnects. Afterward,
  24. /// the connection is returned back to the pool. All PostgreSQL features can be used with this option.
  25. /// For the moment, the default pool size of pgbouncer in supabase is 15 in session mode. Which means
  26. /// that we can have 15 concurrent connections to the database.
  27. ///
  28. /// Transaction mode:
  29. /// This is the suggested option for serverless functions. With this, the connection is only assigned
  30. /// to the client for the duration of a transaction. Once done, the connection is returned to the pool.
  31. /// Two consecutive transactions from the same client could be done over two, different connections.
  32. /// Some session-based PostgreSQL features such as prepared statements are not available with this option.
  33. /// A more comprehensive list of incompatible features can be found here.
  34. ///
  35. /// Most of the case, Session mode is faster than Transaction mode(no statement cache(https://github.com/supabase/supavisor/issues/69) and queue transaction).
  36. /// But Transaction mode is more suitable for serverless functions. It can reduce the number of concurrent
  37. /// connections to the database.
  38. /// TODO(nathan): fix prepared statement error when using transaction mode. https://github.com/prisma/prisma/issues/11643
  39. ///
  40. #[derive(Clone, Debug, Default)]
  41. pub enum PgPoolMode {
  42. #[default]
  43. Session,
  44. Transaction,
  45. }
  46. impl PgPoolMode {
  47. pub fn support_prepare_cached(&self) -> bool {
  48. matches!(self, PgPoolMode::Session)
  49. }
  50. }
  51. /// Supabase server is used to provide the implementation of the [AppFlowyServer] trait.
  52. /// It contains the configuration of the supabase server and the postgres server.
  53. pub struct SupabaseServer {
  54. #[allow(dead_code)]
  55. config: SupabaseConfiguration,
  56. /// did represents as the device id is used to identify the device that is currently using the app.
  57. did: Mutex<String>,
  58. update_tx: RwLock<HashMap<String, RemoteUpdateSender>>,
  59. restful_postgres: Arc<RwLock<Option<Arc<RESTfulPostgresServer>>>>,
  60. encryption: Weak<dyn AppFlowyEncryption>,
  61. }
  62. impl SupabaseServer {
  63. pub fn new(
  64. config: SupabaseConfiguration,
  65. enable_sync: bool,
  66. encryption: Weak<dyn AppFlowyEncryption>,
  67. ) -> Self {
  68. let update_tx = RwLock::new(HashMap::new());
  69. let restful_postgres = if enable_sync {
  70. Some(Arc::new(RESTfulPostgresServer::new(
  71. config.clone(),
  72. encryption.clone(),
  73. )))
  74. } else {
  75. None
  76. };
  77. Self {
  78. config,
  79. did: Default::default(),
  80. update_tx,
  81. restful_postgres: Arc::new(RwLock::new(restful_postgres)),
  82. encryption,
  83. }
  84. }
  85. pub fn set_enable_sync(&self, enable: bool) {
  86. if enable {
  87. if self.restful_postgres.read().is_some() {
  88. return;
  89. }
  90. let postgres = RESTfulPostgresServer::new(self.config.clone(), self.encryption.clone());
  91. *self.restful_postgres.write() = Some(Arc::new(postgres));
  92. } else {
  93. *self.restful_postgres.write() = None;
  94. }
  95. }
  96. }
  97. impl AppFlowyServer for SupabaseServer {
  98. fn set_enable_sync(&self, enable: bool) {
  99. tracing::info!("supabase sync: {}", enable);
  100. self.set_enable_sync(enable);
  101. }
  102. fn set_sync_device_id(&self, device_id: &str) {
  103. *self.did.lock() = device_id.to_string();
  104. }
  105. fn user_service(&self) -> Arc<dyn UserService> {
  106. Arc::new(SupabaseUserServiceImpl::new(SupabaseServerServiceImpl(
  107. self.restful_postgres.clone(),
  108. )))
  109. }
  110. fn folder_service(&self) -> Arc<dyn FolderCloudService> {
  111. Arc::new(SupabaseFolderServiceImpl::new(SupabaseServerServiceImpl(
  112. self.restful_postgres.clone(),
  113. )))
  114. }
  115. fn database_service(&self) -> Arc<dyn DatabaseCloudService> {
  116. Arc::new(SupabaseDatabaseServiceImpl::new(SupabaseServerServiceImpl(
  117. self.restful_postgres.clone(),
  118. )))
  119. }
  120. fn document_service(&self) -> Arc<dyn DocumentCloudService> {
  121. Arc::new(SupabaseDocumentServiceImpl::new(SupabaseServerServiceImpl(
  122. self.restful_postgres.clone(),
  123. )))
  124. }
  125. fn collab_storage(&self, collab_object: &CollabObject) -> Option<Arc<dyn RemoteCollabStorage>> {
  126. let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
  127. self
  128. .update_tx
  129. .write()
  130. .insert(collab_object.object_id.clone(), tx);
  131. Some(Arc::new(SupabaseCollabStorageImpl::new(
  132. SupabaseServerServiceImpl(self.restful_postgres.clone()),
  133. Some(rx),
  134. self.encryption.clone(),
  135. )))
  136. }
  137. fn handle_realtime_event(&self, json: Value) {
  138. match serde_json::from_value::<RealtimeCollabUpdateEvent>(json) {
  139. Ok(event) => {
  140. if let Some(tx) = self.update_tx.read().get(event.payload.oid.as_str()) {
  141. tracing::trace!(
  142. "current device: {}, event device: {}",
  143. self.did.lock().as_str(),
  144. event.payload.did.as_str()
  145. );
  146. if self.did.lock().as_str() != event.payload.did.as_str() {
  147. tracing::trace!("Did receive realtime event: {}", event);
  148. let value = if event.payload.encrypt == 1 {
  149. match self
  150. .encryption
  151. .upgrade()
  152. .and_then(|encryption| encryption.get_secret())
  153. {
  154. None => vec![],
  155. Some(secret) => decrypt_bytes(event.payload.value, &secret).unwrap_or_default(),
  156. }
  157. } else {
  158. event.payload.value
  159. };
  160. if !value.is_empty() {
  161. tracing::trace!("Parse payload with len: {} success", value.len());
  162. if let Err(e) = tx.send(value) {
  163. tracing::trace!("send realtime update error: {}", e);
  164. }
  165. }
  166. }
  167. }
  168. },
  169. Err(e) => {
  170. tracing::error!("parser realtime event error: {}", e);
  171. },
  172. }
  173. }
  174. }