lib.rs 14 KB


  1. #![allow(unused_doc_comments)]
  2. use std::sync::Weak;
  3. use std::time::Duration;
  4. use std::{
  5. fmt,
  6. sync::{
  7. atomic::{AtomicBool, Ordering},
  8. Arc,
  9. },
  10. };
  11. use appflowy_integrate::collab_builder::{AppFlowyCollabBuilder, CollabStorageType};
  12. use tokio::sync::RwLock;
  13. use flowy_database2::DatabaseManager;
  14. use flowy_document2::manager::DocumentManager;
  15. use flowy_error::FlowyResult;
  16. use flowy_folder2::manager::{FolderInitializeDataSource, FolderManager};
  17. use flowy_sqlite::kv::StorePreferences;
  18. use flowy_storage::FileStorageService;
  19. use flowy_task::{TaskDispatcher, TaskRunner};
  20. use flowy_user::event_map::{UserCloudServiceProvider, UserStatusCallback};
  21. use flowy_user::manager::{UserManager, UserSessionConfig};
  22. use flowy_user_deps::cloud::UserCloudConfig;
  23. use flowy_user_deps::entities::{AuthType, UserProfile, UserWorkspace};
  24. use lib_dispatch::prelude::*;
  25. use lib_dispatch::runtime::tokio_default_runtime;
  26. use lib_infra::future::{to_fut, Fut};
  27. use module::make_plugins;
  28. pub use module::*;
  29. use crate::deps_resolve::*;
  30. use crate::integrate::server::{
  31. current_server_provider, AppFlowyServerProvider, ServerProviderType,
  32. };
  33. mod deps_resolve;
  34. mod integrate;
  35. pub mod module;
  36. static INIT_LOG: AtomicBool = AtomicBool::new(false);
  37. /// This name will be used as to identify the current [AppFlowyCore] instance.
  38. /// Don't change this.
  39. pub const DEFAULT_NAME: &str = "appflowy";
  40. #[derive(Clone)]
  41. pub struct AppFlowyCoreConfig {
  42. /// Different `AppFlowyCoreConfig` instance should have different name
  43. name: String,
  44. /// Panics if the `root` path is not existing
  45. pub storage_path: String,
  46. log_filter: String,
  47. }
  48. impl fmt::Debug for AppFlowyCoreConfig {
  49. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  50. f.debug_struct("AppFlowyCoreConfig")
  51. .field("storage_path", &self.storage_path)
  52. .finish()
  53. }
  54. }
  55. impl AppFlowyCoreConfig {
  56. pub fn new(root: &str, name: String) -> Self {
  57. AppFlowyCoreConfig {
  58. name,
  59. storage_path: root.to_owned(),
  60. log_filter: create_log_filter("info".to_owned(), vec![]),
  61. }
  62. }
  63. pub fn log_filter(mut self, level: &str, with_crates: Vec<String>) -> Self {
  64. self.log_filter = create_log_filter(level.to_owned(), with_crates);
  65. self
  66. }
  67. }
  68. fn create_log_filter(level: String, with_crates: Vec<String>) -> String {
  69. let level = std::env::var("RUST_LOG").unwrap_or(level);
  70. let mut filters = with_crates
  71. .into_iter()
  72. .map(|crate_name| format!("{}={}", crate_name, level))
  73. .collect::<Vec<String>>();
  74. filters.push(format!("flowy_core={}", level));
  75. filters.push(format!("flowy_folder2={}", level));
  76. filters.push(format!("collab_sync={}", level));
  77. filters.push(format!("collab_folder={}", level));
  78. filters.push(format!("collab_persistence={}", level));
  79. filters.push(format!("collab_database={}", level));
  80. filters.push(format!("collab_plugins={}", level));
  81. filters.push(format!("appflowy_integrate={}", level));
  82. filters.push(format!("collab={}", level));
  83. filters.push(format!("flowy_user={}", level));
  84. filters.push(format!("flowy_document2={}", level));
  85. filters.push(format!("flowy_database2={}", level));
  86. filters.push(format!("flowy_server={}", level));
  87. filters.push(format!("flowy_notification={}", "info"));
  88. filters.push(format!("lib_infra={}", level));
  89. filters.push(format!("flowy_task={}", level));
  90. filters.push(format!("dart_ffi={}", "info"));
  91. filters.push(format!("flowy_sqlite={}", "info"));
  92. filters.push(format!("flowy_net={}", level));
  93. #[cfg(feature = "profiling")]
  94. filters.push(format!("tokio={}", level));
  95. #[cfg(feature = "profiling")]
  96. filters.push(format!("runtime={}", level));
  97. filters.join(",")
  98. }
  99. #[derive(Clone)]
  100. pub struct AppFlowyCore {
  101. #[allow(dead_code)]
  102. pub config: AppFlowyCoreConfig,
  103. pub user_manager: Arc<UserManager>,
  104. pub document_manager: Arc<DocumentManager>,
  105. pub folder_manager: Arc<FolderManager>,
  106. pub database_manager: Arc<DatabaseManager>,
  107. pub event_dispatcher: Arc<AFPluginDispatcher>,
  108. pub server_provider: Arc<AppFlowyServerProvider>,
  109. pub task_dispatcher: Arc<RwLock<TaskDispatcher>>,
  110. pub store_preference: Arc<StorePreferences>,
  111. }
  112. impl AppFlowyCore {
  113. pub fn new(config: AppFlowyCoreConfig) -> Self {
  114. /// The profiling can be used to tracing the performance of the application.
  115. /// Check out the [Link](https://appflowy.gitbook.io/docs/essential-documentation/contribute-to-appflowy/architecture/backend/profiling)
  116. /// for more information.
  117. #[cfg(feature = "profiling")]
  118. console_subscriber::init();
  119. // Init the logger before anything else
  120. init_log(&config);
  121. // Init the key value database
  122. let store_preference = Arc::new(StorePreferences::new(&config.storage_path).unwrap());
  123. tracing::info!("🔥 {:?}", &config);
  124. let runtime = tokio_default_runtime().unwrap();
  125. let task_scheduler = TaskDispatcher::new(Duration::from_secs(2));
  126. let task_dispatcher = Arc::new(RwLock::new(task_scheduler));
  127. runtime.spawn(TaskRunner::run(task_dispatcher.clone()));
  128. let provider_type = current_server_provider(&store_preference);
  129. let server_provider = Arc::new(AppFlowyServerProvider::new(
  130. config.clone(),
  131. provider_type,
  132. Arc::downgrade(&store_preference),
  133. ));
  134. let (
  135. user_manager,
  136. folder_manager,
  137. server_provider,
  138. database_manager,
  139. document_manager,
  140. collab_builder,
  141. ) = runtime.block_on(async {
  142. /// The shared collab builder is used to build the [Collab] instance. The plugins will be loaded
  143. /// on demand based on the [CollabPluginConfig].
  144. let collab_builder = Arc::new(AppFlowyCollabBuilder::new(server_provider.clone()));
  145. let user_manager = mk_user_session(
  146. &config,
  147. &store_preference,
  148. server_provider.clone(),
  149. Arc::downgrade(&collab_builder),
  150. );
  151. collab_builder
  152. .set_snapshot_persistence(Arc::new(SnapshotDBImpl(Arc::downgrade(&user_manager))));
  153. let database_manager = DatabaseDepsResolver::resolve(
  154. Arc::downgrade(&user_manager),
  155. task_dispatcher.clone(),
  156. collab_builder.clone(),
  157. server_provider.clone(),
  158. )
  159. .await;
  160. let document_manager = DocumentDepsResolver::resolve(
  161. Arc::downgrade(&user_manager),
  162. &database_manager,
  163. collab_builder.clone(),
  164. server_provider.clone(),
  165. Arc::downgrade(&(server_provider.clone() as Arc<dyn FileStorageService>)),
  166. );
  167. let folder_manager = FolderDepsResolver::resolve(
  168. Arc::downgrade(&user_manager),
  169. &document_manager,
  170. &database_manager,
  171. collab_builder.clone(),
  172. server_provider.clone(),
  173. )
  174. .await;
  175. (
  176. user_manager,
  177. folder_manager,
  178. server_provider,
  179. database_manager,
  180. document_manager,
  181. collab_builder,
  182. )
  183. });
  184. let user_status_listener = UserStatusCallbackImpl {
  185. collab_builder,
  186. folder_manager: folder_manager.clone(),
  187. database_manager: database_manager.clone(),
  188. document_manager: document_manager.clone(),
  189. server_provider: server_provider.clone(),
  190. config: config.clone(),
  191. };
  192. let cloned_user_session = Arc::downgrade(&user_manager);
  193. runtime.block_on(async move {
  194. if let Some(user_session) = cloned_user_session.upgrade() {
  195. user_session.init(user_status_listener).await;
  196. }
  197. });
  198. let event_dispatcher = Arc::new(AFPluginDispatcher::construct(runtime, || {
  199. make_plugins(
  200. Arc::downgrade(&folder_manager),
  201. Arc::downgrade(&database_manager),
  202. Arc::downgrade(&user_manager),
  203. Arc::downgrade(&document_manager),
  204. )
  205. }));
  206. Self {
  207. config,
  208. user_manager,
  209. document_manager,
  210. folder_manager,
  211. database_manager,
  212. event_dispatcher,
  213. server_provider,
  214. task_dispatcher,
  215. store_preference,
  216. }
  217. }
  218. /// Only expose the dispatcher in test
  219. pub fn dispatcher(&self) -> Arc<AFPluginDispatcher> {
  220. self.event_dispatcher.clone()
  221. }
  222. }
  223. fn init_log(config: &AppFlowyCoreConfig) {
  224. if !INIT_LOG.load(Ordering::SeqCst) {
  225. INIT_LOG.store(true, Ordering::SeqCst);
  226. let _ = lib_log::Builder::new("AppFlowy-Client", &config.storage_path)
  227. .env_filter(&config.log_filter)
  228. .build();
  229. }
  230. }
  231. fn mk_user_session(
  232. config: &AppFlowyCoreConfig,
  233. storage_preference: &Arc<StorePreferences>,
  234. user_cloud_service_provider: Arc<dyn UserCloudServiceProvider>,
  235. collab_builder: Weak<AppFlowyCollabBuilder>,
  236. ) -> Arc<UserManager> {
  237. let user_config = UserSessionConfig::new(&config.name, &config.storage_path);
  238. UserManager::new(
  239. user_config,
  240. user_cloud_service_provider,
  241. storage_preference.clone(),
  242. collab_builder,
  243. )
  244. }
  245. struct UserStatusCallbackImpl {
  246. collab_builder: Arc<AppFlowyCollabBuilder>,
  247. folder_manager: Arc<FolderManager>,
  248. database_manager: Arc<DatabaseManager>,
  249. document_manager: Arc<DocumentManager>,
  250. server_provider: Arc<AppFlowyServerProvider>,
  251. #[allow(dead_code)]
  252. config: AppFlowyCoreConfig,
  253. }
  254. impl UserStatusCallback for UserStatusCallbackImpl {
  255. fn auth_type_did_changed(&self, _auth_type: AuthType) {}
  256. fn did_init(
  257. &self,
  258. user_id: i64,
  259. cloud_config: &Option<UserCloudConfig>,
  260. user_workspace: &UserWorkspace,
  261. _device_id: &str,
  262. ) -> Fut<FlowyResult<()>> {
  263. let user_workspace = user_workspace.clone();
  264. let collab_builder = self.collab_builder.clone();
  265. let folder_manager = self.folder_manager.clone();
  266. let database_manager = self.database_manager.clone();
  267. let document_manager = self.document_manager.clone();
  268. if let Some(cloud_config) = cloud_config {
  269. self
  270. .server_provider
  271. .set_enable_sync(user_id, cloud_config.enable_sync);
  272. if cloud_config.enable_encrypt() {
  273. self
  274. .server_provider
  275. .set_encrypt_secret(cloud_config.encrypt_secret.clone());
  276. }
  277. }
  278. to_fut(async move {
  279. collab_builder.initialize(user_workspace.id.clone());
  280. folder_manager
  281. .initialize(
  282. user_id,
  283. &user_workspace.id,
  284. FolderInitializeDataSource::LocalDisk {
  285. create_if_not_exist: false,
  286. },
  287. )
  288. .await?;
  289. database_manager
  290. .initialize(
  291. user_id,
  292. user_workspace.id.clone(),
  293. user_workspace.database_views_aggregate_id,
  294. )
  295. .await?;
  296. document_manager
  297. .initialize(user_id, user_workspace.id)
  298. .await?;
  299. Ok(())
  300. })
  301. }
  302. fn did_sign_in(
  303. &self,
  304. user_id: i64,
  305. user_workspace: &UserWorkspace,
  306. _device_id: &str,
  307. ) -> Fut<FlowyResult<()>> {
  308. let user_id = user_id.to_owned();
  309. let user_workspace = user_workspace.clone();
  310. let folder_manager = self.folder_manager.clone();
  311. let database_manager = self.database_manager.clone();
  312. let document_manager = self.document_manager.clone();
  313. to_fut(async move {
  314. folder_manager
  315. .initialize_with_workspace_id(user_id, &user_workspace.id)
  316. .await?;
  317. database_manager
  318. .initialize(
  319. user_id,
  320. user_workspace.id.clone(),
  321. user_workspace.database_views_aggregate_id,
  322. )
  323. .await?;
  324. document_manager
  325. .initialize(user_id, user_workspace.id)
  326. .await?;
  327. Ok(())
  328. })
  329. }
  330. fn did_sign_up(
  331. &self,
  332. is_new_user: bool,
  333. user_profile: &UserProfile,
  334. user_workspace: &UserWorkspace,
  335. _device_id: &str,
  336. ) -> Fut<FlowyResult<()>> {
  337. let user_profile = user_profile.clone();
  338. let folder_manager = self.folder_manager.clone();
  339. let database_manager = self.database_manager.clone();
  340. let user_workspace = user_workspace.clone();
  341. let document_manager = self.document_manager.clone();
  342. to_fut(async move {
  343. folder_manager
  344. .initialize_with_new_user(
  345. user_profile.uid,
  346. &user_profile.token,
  347. is_new_user,
  348. FolderInitializeDataSource::LocalDisk {
  349. create_if_not_exist: true,
  350. },
  351. &user_workspace.id,
  352. )
  353. .await?;
  354. database_manager
  355. .initialize_with_new_user(
  356. user_profile.uid,
  357. user_workspace.id.clone(),
  358. user_workspace.database_views_aggregate_id,
  359. )
  360. .await?;
  361. document_manager
  362. .initialize_with_new_user(user_profile.uid, user_workspace.id)
  363. .await?;
  364. Ok(())
  365. })
  366. }
  367. fn did_expired(&self, _token: &str, user_id: i64) -> Fut<FlowyResult<()>> {
  368. let folder_manager = self.folder_manager.clone();
  369. to_fut(async move {
  370. folder_manager.clear(user_id).await;
  371. Ok(())
  372. })
  373. }
  374. fn open_workspace(&self, user_id: i64, user_workspace: &UserWorkspace) -> Fut<FlowyResult<()>> {
  375. let user_workspace = user_workspace.clone();
  376. let collab_builder = self.collab_builder.clone();
  377. let folder_manager = self.folder_manager.clone();
  378. let database_manager = self.database_manager.clone();
  379. let document_manager = self.document_manager.clone();
  380. to_fut(async move {
  381. collab_builder.initialize(user_workspace.id.clone());
  382. folder_manager
  383. .initialize_with_workspace_id(user_id, &user_workspace.id)
  384. .await?;
  385. database_manager
  386. .initialize(
  387. user_id,
  388. user_workspace.id.clone(),
  389. user_workspace.database_views_aggregate_id,
  390. )
  391. .await?;
  392. document_manager
  393. .initialize(user_id, user_workspace.id)
  394. .await?;
  395. Ok(())
  396. })
  397. }
  398. fn did_update_network(&self, reachable: bool) {
  399. self.collab_builder.update_network(reachable);
  400. }
  401. }
  402. impl From<ServerProviderType> for CollabStorageType {
  403. fn from(server_provider: ServerProviderType) -> Self {
  404. match server_provider {
  405. ServerProviderType::Local => CollabStorageType::Local,
  406. ServerProviderType::AppFlowyCloud => CollabStorageType::Local,
  407. ServerProviderType::Supabase => CollabStorageType::Supabase,
  408. }
  409. }
  410. }