server.rs 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. use std::collections::HashMap;
  2. use std::sync::{Arc, Weak};
  3. use collab_define::CollabObject;
  4. use collab_plugins::cloud_storage::{RemoteCollabStorage, RemoteUpdateSender};
  5. use parking_lot::RwLock;
  6. use flowy_database_deps::cloud::DatabaseCloudService;
  7. use flowy_document_deps::cloud::DocumentCloudService;
  8. use flowy_folder_deps::cloud::FolderCloudService;
  9. use flowy_server_config::supabase_config::SupabaseConfiguration;
  10. use flowy_storage::FileStorageService;
  11. use flowy_user_deps::cloud::UserCloudService;
  12. use crate::supabase::api::{
  13. RESTfulPostgresServer, RealtimeCollabUpdateHandler, RealtimeEventHandler, RealtimeUserHandler,
  14. SupabaseCollabStorageImpl, SupabaseDatabaseServiceImpl, SupabaseDocumentServiceImpl,
  15. SupabaseFolderServiceImpl, SupabaseServerServiceImpl, SupabaseUserServiceImpl,
  16. };
  17. use crate::supabase::file_storage::core::SupabaseFileStorage;
  18. use crate::supabase::file_storage::FileStoragePlanImpl;
  19. use crate::{AppFlowyEncryption, AppFlowyServer};
  20. /// https://www.pgbouncer.org/features.html
  21. /// Only support session mode.
  22. ///
  23. /// Session mode:
  24. /// When a new client connects, a connection is assigned to the client until it disconnects. Afterward,
  25. /// the connection is returned back to the pool. All PostgreSQL features can be used with this option.
  26. /// For the moment, the default pool size of pgbouncer in supabase is 15 in session mode. Which means
  27. /// that we can have 15 concurrent connections to the database.
  28. ///
  29. /// Transaction mode:
  30. /// This is the suggested option for serverless functions. With this, the connection is only assigned
  31. /// to the client for the duration of a transaction. Once done, the connection is returned to the pool.
  32. /// Two consecutive transactions from the same client could be done over two, different connections.
  33. /// Some session-based PostgreSQL features such as prepared statements are not available with this option.
  34. /// A more comprehensive list of incompatible features can be found here.
  35. ///
  36. /// Most of the case, Session mode is faster than Transaction mode(no statement cache(https://github.com/supabase/supavisor/issues/69) and queue transaction).
  37. /// But Transaction mode is more suitable for serverless functions. It can reduce the number of concurrent
  38. /// connections to the database.
  39. /// TODO(nathan): fix prepared statement error when using transaction mode. https://github.com/prisma/prisma/issues/11643
  40. ///
  41. #[derive(Clone, Debug, Default)]
  42. pub enum PgPoolMode {
  43. #[default]
  44. Session,
  45. Transaction,
  46. }
  47. impl PgPoolMode {
  48. pub fn support_prepare_cached(&self) -> bool {
  49. matches!(self, PgPoolMode::Session)
  50. }
  51. }
  52. pub type CollabUpdateSenderByOid = RwLock<HashMap<String, RemoteUpdateSender>>;
  53. /// Supabase server is used to provide the implementation of the [AppFlowyServer] trait.
  54. /// It contains the configuration of the supabase server and the postgres server.
  55. pub struct SupabaseServer {
  56. #[allow(dead_code)]
  57. config: SupabaseConfiguration,
  58. /// did represents as the device id is used to identify the device that is currently using the app.
  59. device_id: Arc<RwLock<String>>,
  60. uid: Arc<RwLock<Option<i64>>>,
  61. collab_update_sender: Arc<CollabUpdateSenderByOid>,
  62. restful_postgres: Arc<RwLock<Option<Arc<RESTfulPostgresServer>>>>,
  63. file_storage: Arc<RwLock<Option<Arc<SupabaseFileStorage>>>>,
  64. encryption: Weak<dyn AppFlowyEncryption>,
  65. }
  66. impl SupabaseServer {
  67. pub fn new(
  68. uid: Arc<RwLock<Option<i64>>>,
  69. config: SupabaseConfiguration,
  70. enable_sync: bool,
  71. device_id: Arc<RwLock<String>>,
  72. encryption: Weak<dyn AppFlowyEncryption>,
  73. ) -> Self {
  74. let collab_update_sender = Default::default();
  75. let restful_postgres = if enable_sync {
  76. Some(Arc::new(RESTfulPostgresServer::new(
  77. config.clone(),
  78. encryption.clone(),
  79. )))
  80. } else {
  81. None
  82. };
  83. let file_storage = if enable_sync {
  84. let plan = FileStoragePlanImpl::new(
  85. Arc::downgrade(&uid),
  86. restful_postgres.as_ref().map(Arc::downgrade),
  87. );
  88. Some(Arc::new(
  89. SupabaseFileStorage::new(&config, encryption.clone(), Arc::new(plan)).unwrap(),
  90. ))
  91. } else {
  92. None
  93. };
  94. Self {
  95. config,
  96. device_id,
  97. collab_update_sender,
  98. restful_postgres: Arc::new(RwLock::new(restful_postgres)),
  99. file_storage: Arc::new(RwLock::new(file_storage)),
  100. encryption,
  101. uid,
  102. }
  103. }
  104. }
  105. impl AppFlowyServer for SupabaseServer {
  106. fn set_enable_sync(&self, uid: i64, enable: bool) {
  107. tracing::info!("{} supabase sync: {}", uid, enable);
  108. if enable {
  109. if self.restful_postgres.read().is_none() {
  110. let postgres = RESTfulPostgresServer::new(self.config.clone(), self.encryption.clone());
  111. *self.restful_postgres.write() = Some(Arc::new(postgres));
  112. }
  113. if self.file_storage.read().is_none() {
  114. let plan = FileStoragePlanImpl::new(
  115. Arc::downgrade(&self.uid),
  116. self.restful_postgres.read().as_ref().map(Arc::downgrade),
  117. );
  118. let file_storage =
  119. SupabaseFileStorage::new(&self.config, self.encryption.clone(), Arc::new(plan)).unwrap();
  120. *self.file_storage.write() = Some(Arc::new(file_storage));
  121. }
  122. } else {
  123. *self.restful_postgres.write() = None;
  124. *self.file_storage.write() = None;
  125. }
  126. }
  127. fn user_service(&self) -> Arc<dyn UserCloudService> {
  128. // handle the realtime collab update event.
  129. let (user_update_tx, _) = tokio::sync::broadcast::channel(100);
  130. let collab_update_handler = Box::new(RealtimeCollabUpdateHandler::new(
  131. Arc::downgrade(&self.collab_update_sender),
  132. self.device_id.clone(),
  133. self.encryption.clone(),
  134. ));
  135. // handle the realtime user event.
  136. let user_handler = Box::new(RealtimeUserHandler(user_update_tx.clone()));
  137. let handlers: Vec<Box<dyn RealtimeEventHandler>> = vec![collab_update_handler, user_handler];
  138. Arc::new(SupabaseUserServiceImpl::new(
  139. SupabaseServerServiceImpl(self.restful_postgres.clone()),
  140. handlers,
  141. Some(user_update_tx),
  142. ))
  143. }
  144. fn folder_service(&self) -> Arc<dyn FolderCloudService> {
  145. Arc::new(SupabaseFolderServiceImpl::new(SupabaseServerServiceImpl(
  146. self.restful_postgres.clone(),
  147. )))
  148. }
  149. fn database_service(&self) -> Arc<dyn DatabaseCloudService> {
  150. Arc::new(SupabaseDatabaseServiceImpl::new(SupabaseServerServiceImpl(
  151. self.restful_postgres.clone(),
  152. )))
  153. }
  154. fn document_service(&self) -> Arc<dyn DocumentCloudService> {
  155. Arc::new(SupabaseDocumentServiceImpl::new(SupabaseServerServiceImpl(
  156. self.restful_postgres.clone(),
  157. )))
  158. }
  159. fn collab_storage(&self, collab_object: &CollabObject) -> Option<Arc<dyn RemoteCollabStorage>> {
  160. let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
  161. self
  162. .collab_update_sender
  163. .write()
  164. .insert(collab_object.object_id.clone(), tx);
  165. Some(Arc::new(SupabaseCollabStorageImpl::new(
  166. SupabaseServerServiceImpl(self.restful_postgres.clone()),
  167. Some(rx),
  168. self.encryption.clone(),
  169. )))
  170. }
  171. fn file_storage(&self) -> Option<Arc<dyn FileStorageService>> {
  172. self
  173. .file_storage
  174. .read()
  175. .clone()
  176. .map(|s| s as Arc<dyn FileStorageService>)
  177. }
  178. }