collab_builder.rs 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284
  1. use std::fmt::Debug;
  2. use std::sync::{Arc, Weak};
  3. use anyhow::Error;
  4. use async_trait::async_trait;
  5. use collab::core::collab::{CollabRawData, MutexCollab};
  6. use collab::preclude::{CollabBuilder, CollabPlugin};
  7. use collab_entity::{CollabObject, CollabType};
  8. use collab_persistence::kv::rocks_kv::RocksCollabDB;
  9. use collab_plugins::cloud_storage::network_state::{CollabNetworkReachability, CollabNetworkState};
  10. use collab_plugins::local_storage::rocksdb::RocksdbDiskPlugin;
  11. use collab_plugins::local_storage::CollabPersistenceConfig;
  12. use collab_plugins::snapshot::{CollabSnapshotPlugin, SnapshotPersistence};
  13. use parking_lot::{Mutex, RwLock};
  14. use tracing::trace;
  15. use lib_infra::future::{to_fut, Fut};
  16. #[derive(Clone, Debug)]
  17. pub enum CollabSource {
  18. Local,
  19. AFCloud,
  20. Supabase,
  21. }
  22. pub enum CollabPluginContext {
  23. Local,
  24. AppFlowyCloud {
  25. uid: i64,
  26. collab_object: CollabObject,
  27. local_collab: Weak<MutexCollab>,
  28. },
  29. Supabase {
  30. uid: i64,
  31. collab_object: CollabObject,
  32. local_collab: Weak<MutexCollab>,
  33. local_collab_db: Weak<RocksCollabDB>,
  34. },
  35. }
  36. pub trait CollabStorageProvider: Send + Sync + 'static {
  37. fn storage_source(&self) -> CollabSource;
  38. fn get_plugins(
  39. &self,
  40. context: CollabPluginContext,
  41. ) -> Fut<Vec<Arc<dyn collab::core::collab_plugin::CollabPlugin>>>;
  42. fn is_sync_enabled(&self) -> bool;
  43. }
  44. impl<T> CollabStorageProvider for Arc<T>
  45. where
  46. T: CollabStorageProvider,
  47. {
  48. fn storage_source(&self) -> CollabSource {
  49. (**self).storage_source()
  50. }
  51. fn get_plugins(&self, context: CollabPluginContext) -> Fut<Vec<Arc<dyn CollabPlugin>>> {
  52. (**self).get_plugins(context)
  53. }
  54. fn is_sync_enabled(&self) -> bool {
  55. (**self).is_sync_enabled()
  56. }
  57. }
  58. pub struct AppFlowyCollabBuilder {
  59. network_reachability: CollabNetworkReachability,
  60. workspace_id: RwLock<Option<String>>,
  61. cloud_storage: tokio::sync::RwLock<Arc<dyn CollabStorageProvider>>,
  62. snapshot_persistence: Mutex<Option<Arc<dyn SnapshotPersistence>>>,
  63. device_id: Mutex<String>,
  64. }
  65. impl AppFlowyCollabBuilder {
  66. pub fn new<T: CollabStorageProvider>(storage_provider: T) -> Self {
  67. Self {
  68. network_reachability: CollabNetworkReachability::new(),
  69. workspace_id: Default::default(),
  70. cloud_storage: tokio::sync::RwLock::new(Arc::new(storage_provider)),
  71. snapshot_persistence: Default::default(),
  72. device_id: Default::default(),
  73. }
  74. }
  75. pub fn set_snapshot_persistence(&self, snapshot_persistence: Arc<dyn SnapshotPersistence>) {
  76. *self.snapshot_persistence.lock() = Some(snapshot_persistence);
  77. }
  78. pub fn initialize(&self, workspace_id: String) {
  79. *self.workspace_id.write() = Some(workspace_id);
  80. }
  81. pub fn set_sync_device(&self, device_id: String) {
  82. *self.device_id.lock() = device_id;
  83. }
  84. pub fn update_network(&self, reachable: bool) {
  85. if reachable {
  86. self
  87. .network_reachability
  88. .set_state(CollabNetworkState::Connected)
  89. } else {
  90. self
  91. .network_reachability
  92. .set_state(CollabNetworkState::Disconnected)
  93. }
  94. }
  95. fn collab_object(
  96. &self,
  97. uid: i64,
  98. object_id: &str,
  99. collab_type: CollabType,
  100. ) -> Result<CollabObject, Error> {
  101. let workspace_id = self.workspace_id.read().clone().ok_or_else(|| {
  102. anyhow::anyhow!("When using supabase plugin, the workspace_id should not be empty")
  103. })?;
  104. Ok(CollabObject::new(
  105. uid,
  106. object_id.to_string(),
  107. collab_type,
  108. workspace_id,
  109. self.device_id.lock().clone(),
  110. ))
  111. }
  112. /// Creates a new collaboration builder with the default configuration.
  113. ///
  114. /// This function will initiate the creation of a [MutexCollab] object if it does not already exist.
  115. /// To check for the existence of the object prior to creation, you should utilize a transaction
  116. /// returned by the [read_txn] method of the [RocksCollabDB]. Then, invoke the [is_exist] method
  117. /// to confirm the object's presence.
  118. ///
  119. /// # Parameters
  120. /// - `uid`: The user ID associated with the collaboration.
  121. /// - `object_id`: A string reference representing the ID of the object.
  122. /// - `object_type`: The type of the collaboration, defined by the [CollabType] enum.
  123. /// - `raw_data`: The raw data of the collaboration object, defined by the [CollabRawData] type.
  124. /// - `collab_db`: A weak reference to the [RocksCollabDB].
  125. ///
  126. pub async fn build(
  127. &self,
  128. uid: i64,
  129. object_id: &str,
  130. object_type: CollabType,
  131. raw_data: CollabRawData,
  132. collab_db: Weak<RocksCollabDB>,
  133. ) -> Result<Arc<MutexCollab>, Error> {
  134. self
  135. .build_with_config(
  136. uid,
  137. object_id,
  138. object_type,
  139. collab_db,
  140. raw_data,
  141. &CollabPersistenceConfig::default(),
  142. )
  143. .await
  144. }
  145. /// Creates a new collaboration builder with the custom configuration.
  146. ///
  147. /// This function will initiate the creation of a [MutexCollab] object if it does not already exist.
  148. /// To check for the existence of the object prior to creation, you should utilize a transaction
  149. /// returned by the [read_txn] method of the [RocksCollabDB]. Then, invoke the [is_exist] method
  150. /// to confirm the object's presence.
  151. ///
  152. /// # Parameters
  153. /// - `uid`: The user ID associated with the collaboration.
  154. /// - `object_id`: A string reference representing the ID of the object.
  155. /// - `object_type`: The type of the collaboration, defined by the [CollabType] enum.
  156. /// - `raw_data`: The raw data of the collaboration object, defined by the [CollabRawData] type.
  157. /// - `collab_db`: A weak reference to the [RocksCollabDB].
  158. ///
  159. pub async fn build_with_config(
  160. &self,
  161. uid: i64,
  162. object_id: &str,
  163. object_type: CollabType,
  164. collab_db: Weak<RocksCollabDB>,
  165. collab_raw_data: CollabRawData,
  166. config: &CollabPersistenceConfig,
  167. ) -> Result<Arc<MutexCollab>, Error> {
  168. let collab = Arc::new(
  169. CollabBuilder::new(uid, object_id)
  170. .with_raw_data(collab_raw_data)
  171. .with_plugin(RocksdbDiskPlugin::new_with_config(
  172. uid,
  173. collab_db.clone(),
  174. config.clone(),
  175. ))
  176. .with_device_id(self.device_id.lock().clone())
  177. .build()?,
  178. );
  179. {
  180. let cloud_storage_type = self.cloud_storage.read().await.storage_source();
  181. let collab_object = self.collab_object(uid, object_id, object_type)?;
  182. match cloud_storage_type {
  183. CollabSource::AFCloud => {
  184. #[cfg(feature = "appflowy_cloud_integrate")]
  185. {
  186. trace!("init appflowy cloud collab plugins");
  187. let local_collab = Arc::downgrade(&collab);
  188. let plugins = self
  189. .cloud_storage
  190. .read()
  191. .await
  192. .get_plugins(CollabPluginContext::AppFlowyCloud {
  193. uid,
  194. collab_object: collab_object.clone(),
  195. local_collab,
  196. })
  197. .await;
  198. trace!("add appflowy cloud collab plugins: {}", plugins.len());
  199. for plugin in plugins {
  200. collab.lock().add_plugin(plugin);
  201. }
  202. }
  203. },
  204. CollabSource::Supabase => {
  205. #[cfg(feature = "supabase_integrate")]
  206. {
  207. trace!("init supabase collab plugins");
  208. let local_collab = Arc::downgrade(&collab);
  209. let local_collab_db = collab_db.clone();
  210. let plugins = self
  211. .cloud_storage
  212. .read()
  213. .await
  214. .get_plugins(CollabPluginContext::Supabase {
  215. uid,
  216. collab_object: collab_object.clone(),
  217. local_collab,
  218. local_collab_db,
  219. })
  220. .await;
  221. for plugin in plugins {
  222. collab.lock().add_plugin(plugin);
  223. }
  224. }
  225. },
  226. CollabSource::Local => {},
  227. }
  228. if let Some(snapshot_persistence) = self.snapshot_persistence.lock().as_ref() {
  229. if config.enable_snapshot {
  230. let snapshot_plugin = CollabSnapshotPlugin::new(
  231. uid,
  232. collab_object,
  233. snapshot_persistence.clone(),
  234. collab_db,
  235. config.snapshot_per_update,
  236. );
  237. // tracing::trace!("add snapshot plugin: {}", object_id);
  238. collab.lock().add_plugin(Arc::new(snapshot_plugin));
  239. }
  240. }
  241. }
  242. collab.lock().initialize();
  243. Ok(collab)
  244. }
  245. }
  246. pub struct DefaultCollabStorageProvider();
  247. #[async_trait]
  248. impl CollabStorageProvider for DefaultCollabStorageProvider {
  249. fn storage_source(&self) -> CollabSource {
  250. CollabSource::Local
  251. }
  252. fn get_plugins(&self, _context: CollabPluginContext) -> Fut<Vec<Arc<dyn CollabPlugin>>> {
  253. to_fut(async move { vec![] })
  254. }
  255. fn is_sync_enabled(&self) -> bool {
  256. false
  257. }
  258. }