server.rs 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. use std::collections::HashMap;
  2. use std::sync::{Arc, Weak};
  3. use collab_plugins::cloud_storage::{CollabObject, RemoteCollabStorage, RemoteUpdateSender};
  4. use parking_lot::RwLock;
  5. use flowy_database_deps::cloud::DatabaseCloudService;
  6. use flowy_document_deps::cloud::DocumentCloudService;
  7. use flowy_folder_deps::cloud::FolderCloudService;
  8. use flowy_server_config::supabase_config::SupabaseConfiguration;
  9. use flowy_storage::core::FileStorageService;
  10. use flowy_user_deps::cloud::UserCloudService;
  11. use crate::supabase::api::{
  12. RESTfulPostgresServer, RealtimeCollabUpdateHandler, RealtimeEventHandler, RealtimeUserHandler,
  13. SupabaseCollabStorageImpl, SupabaseDatabaseServiceImpl, SupabaseDocumentServiceImpl,
  14. SupabaseFolderServiceImpl, SupabaseServerServiceImpl, SupabaseUserServiceImpl,
  15. };
  16. use crate::supabase::file_storage::core::SupabaseFileStorage;
  17. use crate::{AppFlowyEncryption, AppFlowyServer};
  18. /// https://www.pgbouncer.org/features.html
  19. /// Only support session mode.
  20. ///
  21. /// Session mode:
  22. /// When a new client connects, a connection is assigned to the client until it disconnects. Afterward,
  23. /// the connection is returned back to the pool. All PostgreSQL features can be used with this option.
  24. /// For the moment, the default pool size of pgbouncer in supabase is 15 in session mode. Which means
  25. /// that we can have 15 concurrent connections to the database.
  26. ///
  27. /// Transaction mode:
  28. /// This is the suggested option for serverless functions. With this, the connection is only assigned
  29. /// to the client for the duration of a transaction. Once done, the connection is returned to the pool.
  30. /// Two consecutive transactions from the same client could be done over two, different connections.
  31. /// Some session-based PostgreSQL features such as prepared statements are not available with this option.
  32. /// A more comprehensive list of incompatible features can be found here.
  33. ///
  34. /// Most of the case, Session mode is faster than Transaction mode(no statement cache(https://github.com/supabase/supavisor/issues/69) and queue transaction).
  35. /// But Transaction mode is more suitable for serverless functions. It can reduce the number of concurrent
  36. /// connections to the database.
  37. /// TODO(nathan): fix prepared statement error when using transaction mode. https://github.com/prisma/prisma/issues/11643
  38. ///
  39. #[derive(Clone, Debug, Default)]
  40. pub enum PgPoolMode {
  41. #[default]
  42. Session,
  43. Transaction,
  44. }
  45. impl PgPoolMode {
  46. pub fn support_prepare_cached(&self) -> bool {
  47. matches!(self, PgPoolMode::Session)
  48. }
  49. }
  50. pub type CollabUpdateSenderByOid = RwLock<HashMap<String, RemoteUpdateSender>>;
  51. /// Supabase server is used to provide the implementation of the [AppFlowyServer] trait.
  52. /// It contains the configuration of the supabase server and the postgres server.
  53. pub struct SupabaseServer {
  54. #[allow(dead_code)]
  55. config: SupabaseConfiguration,
  56. /// did represents as the device id is used to identify the device that is currently using the app.
  57. device_id: Arc<RwLock<String>>,
  58. collab_update_sender: Arc<CollabUpdateSenderByOid>,
  59. restful_postgres: Arc<RwLock<Option<Arc<RESTfulPostgresServer>>>>,
  60. file_storage: Arc<RwLock<Option<Arc<SupabaseFileStorage>>>>,
  61. encryption: Weak<dyn AppFlowyEncryption>,
  62. }
  63. impl SupabaseServer {
  64. pub fn new(
  65. config: SupabaseConfiguration,
  66. enable_sync: bool,
  67. device_id: Arc<RwLock<String>>,
  68. encryption: Weak<dyn AppFlowyEncryption>,
  69. ) -> Self {
  70. let collab_update_sender = Default::default();
  71. let restful_postgres = if enable_sync {
  72. Some(Arc::new(RESTfulPostgresServer::new(
  73. config.clone(),
  74. encryption.clone(),
  75. )))
  76. } else {
  77. None
  78. };
  79. let file_storage = if enable_sync {
  80. Some(Arc::new(SupabaseFileStorage::new(&config).unwrap()))
  81. } else {
  82. None
  83. };
  84. Self {
  85. config,
  86. device_id,
  87. collab_update_sender,
  88. restful_postgres: Arc::new(RwLock::new(restful_postgres)),
  89. file_storage: Arc::new(RwLock::new(file_storage)),
  90. encryption,
  91. }
  92. }
  93. pub fn set_enable_sync(&self, enable: bool) {
  94. if enable {
  95. if self.restful_postgres.read().is_some() {
  96. return;
  97. }
  98. let postgres = RESTfulPostgresServer::new(self.config.clone(), self.encryption.clone());
  99. *self.restful_postgres.write() = Some(Arc::new(postgres));
  100. } else {
  101. *self.restful_postgres.write() = None;
  102. }
  103. }
  104. }
  105. impl AppFlowyServer for SupabaseServer {
  106. fn set_enable_sync(&self, enable: bool) {
  107. tracing::info!("supabase sync: {}", enable);
  108. self.set_enable_sync(enable);
  109. }
  110. fn user_service(&self) -> Arc<dyn UserCloudService> {
  111. // handle the realtime collab update event.
  112. let (user_update_tx, _) = tokio::sync::broadcast::channel(100);
  113. let collab_update_handler = Box::new(RealtimeCollabUpdateHandler::new(
  114. Arc::downgrade(&self.collab_update_sender),
  115. self.device_id.clone(),
  116. self.encryption.clone(),
  117. ));
  118. // handle the realtime user event.
  119. let user_handler = Box::new(RealtimeUserHandler(user_update_tx.clone()));
  120. let handlers: Vec<Box<dyn RealtimeEventHandler>> = vec![collab_update_handler, user_handler];
  121. Arc::new(SupabaseUserServiceImpl::new(
  122. SupabaseServerServiceImpl(self.restful_postgres.clone()),
  123. handlers,
  124. Some(user_update_tx),
  125. ))
  126. }
  127. fn folder_service(&self) -> Arc<dyn FolderCloudService> {
  128. Arc::new(SupabaseFolderServiceImpl::new(SupabaseServerServiceImpl(
  129. self.restful_postgres.clone(),
  130. )))
  131. }
  132. fn database_service(&self) -> Arc<dyn DatabaseCloudService> {
  133. Arc::new(SupabaseDatabaseServiceImpl::new(SupabaseServerServiceImpl(
  134. self.restful_postgres.clone(),
  135. )))
  136. }
  137. fn document_service(&self) -> Arc<dyn DocumentCloudService> {
  138. Arc::new(SupabaseDocumentServiceImpl::new(SupabaseServerServiceImpl(
  139. self.restful_postgres.clone(),
  140. )))
  141. }
  142. fn collab_storage(&self, collab_object: &CollabObject) -> Option<Arc<dyn RemoteCollabStorage>> {
  143. let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
  144. self
  145. .collab_update_sender
  146. .write()
  147. .insert(collab_object.object_id.clone(), tx);
  148. Some(Arc::new(SupabaseCollabStorageImpl::new(
  149. SupabaseServerServiceImpl(self.restful_postgres.clone()),
  150. Some(rx),
  151. self.encryption.clone(),
  152. )))
  153. }
  154. fn file_storage(&self) -> Option<Arc<dyn FileStorageService>> {
  155. self
  156. .file_storage
  157. .read()
  158. .clone()
  159. .map(|s| s as Arc<dyn FileStorageService>)
  160. }
  161. }