server.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431
  1. use std::collections::HashMap;
  2. use std::fmt::{Display, Formatter};
  3. use std::sync::{Arc, Weak};
  4. use appflowy_integrate::collab_builder::{CollabStorageProvider, CollabStorageType};
  5. use appflowy_integrate::{CollabObject, CollabType, RemoteCollabStorage, YrsDocAction};
  6. use parking_lot::RwLock;
  7. use serde_repr::*;
  8. use flowy_database_deps::cloud::*;
  9. use flowy_document2::deps::DocumentData;
  10. use flowy_document_deps::cloud::{DocumentCloudService, DocumentSnapshot};
  11. use flowy_error::{ErrorCode, FlowyError, FlowyResult};
  12. use flowy_folder_deps::cloud::*;
  13. use flowy_server::local_server::{LocalServer, LocalServerDB};
  14. use flowy_server::self_host::configuration::self_host_server_configuration;
  15. use flowy_server::self_host::SelfHostServer;
  16. use flowy_server::supabase::SupabaseServer;
  17. use flowy_server::{AppFlowyEncryption, AppFlowyServer, EncryptionImpl};
  18. use flowy_server_config::supabase_config::SupabaseConfiguration;
  19. use flowy_sqlite::kv::StorePreferences;
  20. use flowy_user::event_map::UserCloudServiceProvider;
  21. use flowy_user::services::database::{
  22. get_user_profile, get_user_workspace, open_collab_db, open_user_db,
  23. };
  24. use flowy_user_deps::cloud::UserCloudService;
  25. use flowy_user_deps::entities::*;
  26. use lib_infra::future::FutureResult;
  27. use crate::AppFlowyCoreConfig;
  28. const SERVER_PROVIDER_TYPE_KEY: &str = "server_provider_type";
  29. #[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize_repr, Deserialize_repr)]
  30. #[repr(u8)]
  31. pub enum ServerProviderType {
  32. /// Local server provider.
  33. /// Offline mode, no user authentication and the data is stored locally.
  34. Local = 0,
  35. /// Self-hosted server provider.
  36. /// The [AppFlowy-Server](https://github.com/AppFlowy-IO/AppFlowy-Cloud) is still a work in
  37. /// progress.
  38. AppFlowyCloud = 1,
  39. /// Supabase server provider.
  40. /// It uses supabase's postgresql database to store data and user authentication.
  41. Supabase = 2,
  42. }
  43. impl Display for ServerProviderType {
  44. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
  45. match self {
  46. ServerProviderType::Local => write!(f, "Local"),
  47. ServerProviderType::AppFlowyCloud => write!(f, "AppFlowyCloud"),
  48. ServerProviderType::Supabase => write!(f, "Supabase"),
  49. }
  50. }
  51. }
  52. /// The [AppFlowyServerProvider] provides list of [AppFlowyServer] base on the [AuthType]. Using
  53. /// the auth type, the [AppFlowyServerProvider] will create a new [AppFlowyServer] if it doesn't
  54. /// exist.
  55. /// Each server implements the [AppFlowyServer] trait, which provides the [UserCloudService], etc.
  56. pub struct AppFlowyServerProvider {
  57. config: AppFlowyCoreConfig,
  58. provider_type: RwLock<ServerProviderType>,
  59. device_id: Arc<RwLock<String>>,
  60. providers: RwLock<HashMap<ServerProviderType, Arc<dyn AppFlowyServer>>>,
  61. enable_sync: RwLock<bool>,
  62. encryption: RwLock<Arc<dyn AppFlowyEncryption>>,
  63. store_preferences: Weak<StorePreferences>,
  64. cache_user_service: RwLock<HashMap<ServerProviderType, Arc<dyn UserCloudService>>>,
  65. }
  66. impl AppFlowyServerProvider {
  67. pub fn new(
  68. config: AppFlowyCoreConfig,
  69. provider_type: ServerProviderType,
  70. store_preferences: Weak<StorePreferences>,
  71. ) -> Self {
  72. let encryption = EncryptionImpl::new(None);
  73. Self {
  74. config,
  75. provider_type: RwLock::new(provider_type),
  76. device_id: Default::default(),
  77. providers: RwLock::new(HashMap::new()),
  78. enable_sync: RwLock::new(true),
  79. encryption: RwLock::new(Arc::new(encryption)),
  80. store_preferences,
  81. cache_user_service: Default::default(),
  82. }
  83. }
  84. pub fn set_sync_device(&self, device_id: &str) {
  85. *self.device_id.write() = device_id.to_string();
  86. }
  87. pub fn provider_type(&self) -> ServerProviderType {
  88. self.provider_type.read().clone()
  89. }
  90. /// Returns a [AppFlowyServer] trait implementation base on the provider_type.
  91. fn get_provider(
  92. &self,
  93. provider_type: &ServerProviderType,
  94. ) -> FlowyResult<Arc<dyn AppFlowyServer>> {
  95. if let Some(provider) = self.providers.read().get(provider_type) {
  96. return Ok(provider.clone());
  97. }
  98. let server = match provider_type {
  99. ServerProviderType::Local => {
  100. let local_db = Arc::new(LocalServerDBImpl {
  101. storage_path: self.config.storage_path.clone(),
  102. });
  103. let server = Arc::new(LocalServer::new(local_db));
  104. Ok::<Arc<dyn AppFlowyServer>, FlowyError>(server)
  105. },
  106. ServerProviderType::AppFlowyCloud => {
  107. let config = self_host_server_configuration().map_err(|e| {
  108. FlowyError::new(
  109. ErrorCode::InvalidAuthConfig,
  110. format!(
  111. "Missing self host config: {:?}. Error: {:?}",
  112. provider_type, e
  113. ),
  114. )
  115. })?;
  116. let server = Arc::new(SelfHostServer::new(config));
  117. Ok::<Arc<dyn AppFlowyServer>, FlowyError>(server)
  118. },
  119. ServerProviderType::Supabase => {
  120. let config = SupabaseConfiguration::from_env()?;
  121. let encryption = Arc::downgrade(&*self.encryption.read());
  122. Ok::<Arc<dyn AppFlowyServer>, FlowyError>(Arc::new(SupabaseServer::new(
  123. config,
  124. *self.enable_sync.read(),
  125. self.device_id.clone(),
  126. encryption,
  127. )))
  128. },
  129. }?;
  130. self
  131. .providers
  132. .write()
  133. .insert(provider_type.clone(), server.clone());
  134. Ok(server)
  135. }
  136. }
  137. impl UserCloudServiceProvider for AppFlowyServerProvider {
  138. fn set_enable_sync(&self, enable_sync: bool) {
  139. match self.get_provider(&self.provider_type.read()) {
  140. Ok(server) => {
  141. server.set_enable_sync(enable_sync);
  142. *self.enable_sync.write() = enable_sync;
  143. },
  144. Err(e) => tracing::error!("🔴Failed to enable sync: {:?}", e),
  145. }
  146. }
  147. fn set_encrypt_secret(&self, secret: String) {
  148. tracing::info!("🔑Set encrypt secret");
  149. self.encryption.write().set_secret(secret);
  150. }
  151. /// When user login, the provider type is set by the [AuthType] and save to disk for next use.
  152. ///
  153. /// Each [AuthType] has a corresponding [ServerProviderType]. The [ServerProviderType] is used
  154. /// to create a new [AppFlowyServer] if it doesn't exist. Once the [ServerProviderType] is set,
  155. /// it will be used when user open the app again.
  156. ///
  157. fn set_auth_type(&self, auth_type: AuthType) {
  158. let provider_type: ServerProviderType = auth_type.into();
  159. *self.provider_type.write() = provider_type.clone();
  160. match self.store_preferences.upgrade() {
  161. None => tracing::error!("🔴Failed to update server provider type: store preferences is drop"),
  162. Some(store_preferences) => {
  163. match store_preferences.set_object(SERVER_PROVIDER_TYPE_KEY, provider_type.clone()) {
  164. Ok(_) => tracing::trace!("Update server provider type to: {:?}", provider_type),
  165. Err(e) => {
  166. tracing::error!("🔴Failed to update server provider type: {:?}", e);
  167. },
  168. }
  169. },
  170. }
  171. }
  172. fn set_device_id(&self, device_id: &str) {
  173. *self.device_id.write() = device_id.to_string();
  174. }
  175. /// Returns the [UserCloudService] base on the current [ServerProviderType].
  176. /// Creates a new [AppFlowyServer] if it doesn't exist.
  177. fn get_user_service(&self) -> Result<Arc<dyn UserCloudService>, FlowyError> {
  178. if let Some(user_service) = self
  179. .cache_user_service
  180. .read()
  181. .get(&self.provider_type.read())
  182. {
  183. return Ok(user_service.clone());
  184. }
  185. let provider_type = self.provider_type.read().clone();
  186. let user_service = self.get_provider(&provider_type)?.user_service();
  187. self
  188. .cache_user_service
  189. .write()
  190. .insert(provider_type, user_service.clone());
  191. Ok(user_service)
  192. }
  193. fn service_name(&self) -> String {
  194. self.provider_type.read().to_string()
  195. }
  196. }
  197. impl FolderCloudService for AppFlowyServerProvider {
  198. fn create_workspace(&self, uid: i64, name: &str) -> FutureResult<Workspace, Error> {
  199. let server = self.get_provider(&self.provider_type.read());
  200. let name = name.to_string();
  201. FutureResult::new(async move { server?.folder_service().create_workspace(uid, &name).await })
  202. }
  203. fn get_folder_data(&self, workspace_id: &str) -> FutureResult<Option<FolderData>, Error> {
  204. let server = self.get_provider(&self.provider_type.read());
  205. let workspace_id = workspace_id.to_string();
  206. FutureResult::new(async move {
  207. server?
  208. .folder_service()
  209. .get_folder_data(&workspace_id)
  210. .await
  211. })
  212. }
  213. fn get_folder_snapshots(
  214. &self,
  215. workspace_id: &str,
  216. limit: usize,
  217. ) -> FutureResult<Vec<FolderSnapshot>, Error> {
  218. let workspace_id = workspace_id.to_string();
  219. let server = self.get_provider(&self.provider_type.read());
  220. FutureResult::new(async move {
  221. server?
  222. .folder_service()
  223. .get_folder_snapshots(&workspace_id, limit)
  224. .await
  225. })
  226. }
  227. fn get_folder_updates(&self, workspace_id: &str, uid: i64) -> FutureResult<Vec<Vec<u8>>, Error> {
  228. let workspace_id = workspace_id.to_string();
  229. let server = self.get_provider(&self.provider_type.read());
  230. FutureResult::new(async move {
  231. server?
  232. .folder_service()
  233. .get_folder_updates(&workspace_id, uid)
  234. .await
  235. })
  236. }
  237. fn service_name(&self) -> String {
  238. self
  239. .get_provider(&self.provider_type.read())
  240. .map(|provider| provider.folder_service().service_name())
  241. .unwrap_or_default()
  242. }
  243. }
  244. impl DatabaseCloudService for AppFlowyServerProvider {
  245. fn get_collab_update(
  246. &self,
  247. object_id: &str,
  248. object_ty: CollabType,
  249. ) -> FutureResult<CollabObjectUpdate, Error> {
  250. let server = self.get_provider(&self.provider_type.read());
  251. let database_id = object_id.to_string();
  252. FutureResult::new(async move {
  253. server?
  254. .database_service()
  255. .get_collab_update(&database_id, object_ty)
  256. .await
  257. })
  258. }
  259. fn batch_get_collab_updates(
  260. &self,
  261. object_ids: Vec<String>,
  262. object_ty: CollabType,
  263. ) -> FutureResult<CollabObjectUpdateByOid, Error> {
  264. let server = self.get_provider(&self.provider_type.read());
  265. FutureResult::new(async move {
  266. server?
  267. .database_service()
  268. .batch_get_collab_updates(object_ids, object_ty)
  269. .await
  270. })
  271. }
  272. fn get_collab_snapshots(
  273. &self,
  274. object_id: &str,
  275. limit: usize,
  276. ) -> FutureResult<Vec<DatabaseSnapshot>, Error> {
  277. let server = self.get_provider(&self.provider_type.read());
  278. let database_id = object_id.to_string();
  279. FutureResult::new(async move {
  280. server?
  281. .database_service()
  282. .get_collab_snapshots(&database_id, limit)
  283. .await
  284. })
  285. }
  286. }
  287. impl DocumentCloudService for AppFlowyServerProvider {
  288. fn get_document_updates(&self, document_id: &str) -> FutureResult<Vec<Vec<u8>>, Error> {
  289. let server = self.get_provider(&self.provider_type.read());
  290. let document_id = document_id.to_string();
  291. FutureResult::new(async move {
  292. server?
  293. .document_service()
  294. .get_document_updates(&document_id)
  295. .await
  296. })
  297. }
  298. fn get_document_snapshots(
  299. &self,
  300. document_id: &str,
  301. limit: usize,
  302. ) -> FutureResult<Vec<DocumentSnapshot>, Error> {
  303. let server = self.get_provider(&self.provider_type.read());
  304. let document_id = document_id.to_string();
  305. FutureResult::new(async move {
  306. server?
  307. .document_service()
  308. .get_document_snapshots(&document_id, limit)
  309. .await
  310. })
  311. }
  312. fn get_document_data(&self, document_id: &str) -> FutureResult<Option<DocumentData>, Error> {
  313. let server = self.get_provider(&self.provider_type.read());
  314. let document_id = document_id.to_string();
  315. FutureResult::new(async move {
  316. server?
  317. .document_service()
  318. .get_document_data(&document_id)
  319. .await
  320. })
  321. }
  322. }
  323. impl CollabStorageProvider for AppFlowyServerProvider {
  324. fn storage_type(&self) -> CollabStorageType {
  325. self.provider_type().into()
  326. }
  327. fn get_storage(
  328. &self,
  329. collab_object: &CollabObject,
  330. storage_type: &CollabStorageType,
  331. ) -> Option<Arc<dyn RemoteCollabStorage>> {
  332. match storage_type {
  333. CollabStorageType::Local => None,
  334. CollabStorageType::AWS => None,
  335. CollabStorageType::Supabase => self
  336. .get_provider(&ServerProviderType::Supabase)
  337. .ok()
  338. .and_then(|provider| provider.collab_storage(collab_object)),
  339. }
  340. }
  341. fn is_sync_enabled(&self) -> bool {
  342. *self.enable_sync.read()
  343. }
  344. }
  345. impl From<AuthType> for ServerProviderType {
  346. fn from(auth_provider: AuthType) -> Self {
  347. match auth_provider {
  348. AuthType::Local => ServerProviderType::Local,
  349. AuthType::SelfHosted => ServerProviderType::AppFlowyCloud,
  350. AuthType::Supabase => ServerProviderType::Supabase,
  351. }
  352. }
  353. }
  354. impl From<&AuthType> for ServerProviderType {
  355. fn from(auth_provider: &AuthType) -> Self {
  356. Self::from(auth_provider.clone())
  357. }
  358. }
  359. pub fn current_server_provider(store_preferences: &Arc<StorePreferences>) -> ServerProviderType {
  360. match store_preferences.get_object::<ServerProviderType>(SERVER_PROVIDER_TYPE_KEY) {
  361. None => ServerProviderType::Local,
  362. Some(provider_type) => provider_type,
  363. }
  364. }
  365. struct LocalServerDBImpl {
  366. storage_path: String,
  367. }
  368. impl LocalServerDB for LocalServerDBImpl {
  369. fn get_user_profile(&self, uid: i64) -> Result<Option<UserProfile>, FlowyError> {
  370. let sqlite_db = open_user_db(&self.storage_path, uid)?;
  371. let user_profile = get_user_profile(&sqlite_db, uid).ok();
  372. Ok(user_profile)
  373. }
  374. fn get_user_workspace(&self, uid: i64) -> Result<Option<UserWorkspace>, FlowyError> {
  375. let sqlite_db = open_user_db(&self.storage_path, uid)?;
  376. let user_workspace = get_user_workspace(&sqlite_db, uid)?;
  377. Ok(user_workspace)
  378. }
  379. fn get_collab_updates(&self, uid: i64, object_id: &str) -> Result<Vec<Vec<u8>>, FlowyError> {
  380. let collab_db = open_collab_db(&self.storage_path, uid)?;
  381. let read_txn = collab_db.read_txn();
  382. let updates = read_txn.get_all_updates(uid, object_id).map_err(|e| {
  383. FlowyError::internal().with_context(format!("Failed to open collab db: {:?}", e))
  384. })?;
  385. Ok(updates)
  386. }
  387. }