trait_impls.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. use std::sync::Arc;
  2. use anyhow::Error;
  3. use bytes::Bytes;
  4. use client_api::collab_sync::{SinkConfig, SyncObject, SyncPlugin};
  5. use collab::core::origin::{CollabClient, CollabOrigin};
  6. use collab::preclude::CollabPlugin;
  7. use collab_define::CollabType;
  8. use collab_integrate::collab_builder::{CollabPluginContext, CollabSource, CollabStorageProvider};
  9. use collab_integrate::postgres::SupabaseDBPlugin;
  10. use flowy_database_deps::cloud::{
  11. CollabObjectUpdate, CollabObjectUpdateByOid, DatabaseCloudService, DatabaseSnapshot,
  12. };
  13. use flowy_document2::deps::DocumentData;
  14. use flowy_document_deps::cloud::{DocumentCloudService, DocumentSnapshot};
  15. use flowy_error::FlowyError;
  16. use flowy_folder_deps::cloud::{FolderCloudService, FolderData, FolderSnapshot, Workspace};
  17. use flowy_storage::{FileStorageService, StorageObject};
  18. use flowy_user::event_map::UserCloudServiceProvider;
  19. use flowy_user_deps::cloud::UserCloudService;
  20. use flowy_user_deps::entities::AuthType;
  21. use lib_infra::future::{to_fut, Fut, FutureResult};
  22. use crate::integrate::server::{ServerProvider, ServerType, SERVER_PROVIDER_TYPE_KEY};
  23. impl FileStorageService for ServerProvider {
  24. fn create_object(&self, object: StorageObject) -> FutureResult<String, FlowyError> {
  25. let server = self.get_server(&self.get_server_type());
  26. FutureResult::new(async move {
  27. let storage = server?.file_storage().ok_or(FlowyError::internal())?;
  28. storage.create_object(object).await
  29. })
  30. }
  31. fn delete_object_by_url(&self, object_url: String) -> FutureResult<(), FlowyError> {
  32. let server = self.get_server(&self.get_server_type());
  33. FutureResult::new(async move {
  34. let storage = server?.file_storage().ok_or(FlowyError::internal())?;
  35. storage.delete_object_by_url(object_url).await
  36. })
  37. }
  38. fn get_object_by_url(&self, object_url: String) -> FutureResult<Bytes, FlowyError> {
  39. let server = self.get_server(&self.get_server_type());
  40. FutureResult::new(async move {
  41. let storage = server?.file_storage().ok_or(FlowyError::internal())?;
  42. storage.get_object_by_url(object_url).await
  43. })
  44. }
  45. }
  46. impl UserCloudServiceProvider for ServerProvider {
  47. fn set_enable_sync(&self, uid: i64, enable_sync: bool) {
  48. match self.get_server(&self.get_server_type()) {
  49. Ok(server) => {
  50. server.set_enable_sync(uid, enable_sync);
  51. *self.enable_sync.write() = enable_sync;
  52. *self.uid.write() = Some(uid);
  53. },
  54. Err(e) => tracing::error!("🔴Failed to enable sync: {:?}", e),
  55. }
  56. }
  57. fn set_encrypt_secret(&self, secret: String) {
  58. tracing::info!("🔑Set encrypt secret");
  59. self.encryption.write().set_secret(secret);
  60. }
  61. /// When user login, the provider type is set by the [AuthType] and save to disk for next use.
  62. ///
  63. /// Each [AuthType] has a corresponding [ServerType]. The [ServerType] is used
  64. /// to create a new [AppFlowyServer] if it doesn't exist. Once the [ServerType] is set,
  65. /// it will be used when user open the app again.
  66. ///
  67. fn set_auth_type(&self, auth_type: AuthType) {
  68. let server_type: ServerType = auth_type.into();
  69. self.set_server_type(server_type.clone());
  70. match self.store_preferences.upgrade() {
  71. None => tracing::error!("🔴Failed to update server provider type: store preferences is drop"),
  72. Some(store_preferences) => {
  73. match store_preferences.set_object(SERVER_PROVIDER_TYPE_KEY, server_type.clone()) {
  74. Ok(_) => tracing::trace!("Update server provider type to: {:?}", server_type),
  75. Err(e) => {
  76. tracing::error!("🔴Failed to update server provider type: {:?}", e);
  77. },
  78. }
  79. },
  80. }
  81. }
  82. fn set_device_id(&self, device_id: &str) {
  83. if device_id.is_empty() {
  84. tracing::error!("🔴Device id is empty");
  85. return;
  86. }
  87. *self.device_id.write() = device_id.to_string();
  88. }
  89. /// Returns the [UserCloudService] base on the current [ServerType].
  90. /// Creates a new [AppFlowyServer] if it doesn't exist.
  91. fn get_user_service(&self) -> Result<Arc<dyn UserCloudService>, FlowyError> {
  92. if let Some(user_service) = self.cache_user_service.read().get(&self.get_server_type()) {
  93. return Ok(user_service.clone());
  94. }
  95. let server_type = self.get_server_type();
  96. let user_service = self.get_server(&server_type)?.user_service();
  97. self
  98. .cache_user_service
  99. .write()
  100. .insert(server_type, user_service.clone());
  101. Ok(user_service)
  102. }
  103. fn service_name(&self) -> String {
  104. self.get_server_type().to_string()
  105. }
  106. }
  107. impl FolderCloudService for ServerProvider {
  108. fn create_workspace(&self, uid: i64, name: &str) -> FutureResult<Workspace, Error> {
  109. let server = self.get_server(&self.get_server_type());
  110. let name = name.to_string();
  111. FutureResult::new(async move { server?.folder_service().create_workspace(uid, &name).await })
  112. }
  113. fn get_folder_data(&self, workspace_id: &str) -> FutureResult<Option<FolderData>, Error> {
  114. let server = self.get_server(&self.get_server_type());
  115. let workspace_id = workspace_id.to_string();
  116. FutureResult::new(async move {
  117. server?
  118. .folder_service()
  119. .get_folder_data(&workspace_id)
  120. .await
  121. })
  122. }
  123. fn get_folder_snapshots(
  124. &self,
  125. workspace_id: &str,
  126. limit: usize,
  127. ) -> FutureResult<Vec<FolderSnapshot>, Error> {
  128. let workspace_id = workspace_id.to_string();
  129. let server = self.get_server(&self.get_server_type());
  130. FutureResult::new(async move {
  131. server?
  132. .folder_service()
  133. .get_folder_snapshots(&workspace_id, limit)
  134. .await
  135. })
  136. }
  137. fn get_folder_updates(&self, workspace_id: &str, uid: i64) -> FutureResult<Vec<Vec<u8>>, Error> {
  138. let workspace_id = workspace_id.to_string();
  139. let server = self.get_server(&self.get_server_type());
  140. FutureResult::new(async move {
  141. server?
  142. .folder_service()
  143. .get_folder_updates(&workspace_id, uid)
  144. .await
  145. })
  146. }
  147. fn service_name(&self) -> String {
  148. self
  149. .get_server(&self.get_server_type())
  150. .map(|provider| provider.folder_service().service_name())
  151. .unwrap_or_default()
  152. }
  153. }
  154. impl DatabaseCloudService for ServerProvider {
  155. fn get_collab_update(
  156. &self,
  157. object_id: &str,
  158. collab_type: CollabType,
  159. ) -> FutureResult<CollabObjectUpdate, Error> {
  160. let server = self.get_server(&self.get_server_type());
  161. let database_id = object_id.to_string();
  162. FutureResult::new(async move {
  163. server?
  164. .database_service()
  165. .get_collab_update(&database_id, collab_type)
  166. .await
  167. })
  168. }
  169. fn batch_get_collab_updates(
  170. &self,
  171. object_ids: Vec<String>,
  172. object_ty: CollabType,
  173. ) -> FutureResult<CollabObjectUpdateByOid, Error> {
  174. let server = self.get_server(&self.get_server_type());
  175. FutureResult::new(async move {
  176. server?
  177. .database_service()
  178. .batch_get_collab_updates(object_ids, object_ty)
  179. .await
  180. })
  181. }
  182. fn get_collab_snapshots(
  183. &self,
  184. object_id: &str,
  185. limit: usize,
  186. ) -> FutureResult<Vec<DatabaseSnapshot>, Error> {
  187. let server = self.get_server(&self.get_server_type());
  188. let database_id = object_id.to_string();
  189. FutureResult::new(async move {
  190. server?
  191. .database_service()
  192. .get_collab_snapshots(&database_id, limit)
  193. .await
  194. })
  195. }
  196. }
  197. impl DocumentCloudService for ServerProvider {
  198. fn get_document_updates(&self, document_id: &str) -> FutureResult<Vec<Vec<u8>>, Error> {
  199. let server = self.get_server(&self.get_server_type());
  200. let document_id = document_id.to_string();
  201. FutureResult::new(async move {
  202. server?
  203. .document_service()
  204. .get_document_updates(&document_id)
  205. .await
  206. })
  207. }
  208. fn get_document_snapshots(
  209. &self,
  210. document_id: &str,
  211. limit: usize,
  212. ) -> FutureResult<Vec<DocumentSnapshot>, Error> {
  213. let server = self.get_server(&self.get_server_type());
  214. let document_id = document_id.to_string();
  215. FutureResult::new(async move {
  216. server?
  217. .document_service()
  218. .get_document_snapshots(&document_id, limit)
  219. .await
  220. })
  221. }
  222. fn get_document_data(&self, document_id: &str) -> FutureResult<Option<DocumentData>, Error> {
  223. let server = self.get_server(&self.get_server_type());
  224. let document_id = document_id.to_string();
  225. FutureResult::new(async move {
  226. server?
  227. .document_service()
  228. .get_document_data(&document_id)
  229. .await
  230. })
  231. }
  232. }
  233. impl CollabStorageProvider for ServerProvider {
  234. fn storage_source(&self) -> CollabSource {
  235. self.get_server_type().into()
  236. }
  237. fn get_plugins(&self, context: CollabPluginContext) -> Fut<Vec<Arc<dyn CollabPlugin>>> {
  238. match context {
  239. CollabPluginContext::Local => to_fut(async move { vec![] }),
  240. CollabPluginContext::AppFlowyCloud {
  241. uid: _,
  242. collab_object,
  243. local_collab,
  244. } => {
  245. if let Ok(server) = self.get_server(&ServerType::AFCloud) {
  246. to_fut(async move {
  247. let mut plugins: Vec<Arc<dyn CollabPlugin>> = vec![];
  248. match server.collab_ws_channel(&collab_object.object_id).await {
  249. Ok(Some((channel, ws_connect_state))) => {
  250. let origin = CollabOrigin::Client(CollabClient::new(
  251. collab_object.uid,
  252. collab_object.device_id.clone(),
  253. ));
  254. let sync_object = SyncObject::from(collab_object);
  255. let (sink, stream) = (channel.sink(), channel.stream());
  256. let sink_config = SinkConfig::new().with_timeout(6);
  257. let sync_plugin = SyncPlugin::new(
  258. origin,
  259. sync_object,
  260. local_collab,
  261. sink,
  262. sink_config,
  263. stream,
  264. Some(channel),
  265. ws_connect_state,
  266. );
  267. plugins.push(Arc::new(sync_plugin));
  268. },
  269. Ok(None) => {
  270. tracing::error!("🔴Failed to get collab ws channel: channel is none");
  271. },
  272. Err(err) => tracing::error!("🔴Failed to get collab ws channel: {:?}", err),
  273. }
  274. plugins
  275. })
  276. } else {
  277. to_fut(async move { vec![] })
  278. }
  279. },
  280. CollabPluginContext::Supabase {
  281. uid,
  282. collab_object,
  283. local_collab,
  284. local_collab_db,
  285. } => {
  286. let mut plugins: Vec<Arc<dyn CollabPlugin>> = vec![];
  287. if let Some(remote_collab_storage) = self
  288. .get_server(&ServerType::Supabase)
  289. .ok()
  290. .and_then(|provider| provider.collab_storage(&collab_object))
  291. {
  292. plugins.push(Arc::new(SupabaseDBPlugin::new(
  293. uid,
  294. collab_object,
  295. local_collab,
  296. 1,
  297. remote_collab_storage,
  298. local_collab_db,
  299. )));
  300. }
  301. to_fut(async move { plugins })
  302. },
  303. }
  304. }
  305. fn is_sync_enabled(&self) -> bool {
  306. *self.enable_sync.read()
  307. }
  308. }