lib.rs 13 KB


  1. #![allow(unused_doc_comments)]
  2. use std::time::Duration;
  3. use std::{
  4. fmt,
  5. sync::{
  6. atomic::{AtomicBool, Ordering},
  7. Arc,
  8. },
  9. };
  10. use appflowy_integrate::collab_builder::{AppFlowyCollabBuilder, CollabStorageType};
  11. use tokio::sync::RwLock;
  12. use flowy_database2::DatabaseManager;
  13. use flowy_document2::manager::DocumentManager;
  14. use flowy_error::FlowyResult;
  15. use flowy_folder2::manager::{FolderInitializeData, FolderManager};
  16. use flowy_sqlite::kv::StorePreferences;
  17. use flowy_task::{TaskDispatcher, TaskRunner};
  18. use flowy_user::event_map::{SignUpContext, UserCloudServiceProvider, UserStatusCallback};
  19. use flowy_user::services::{get_supabase_config, UserSession, UserSessionConfig};
  20. use flowy_user_deps::entities::{AuthType, UserProfile, UserWorkspace};
  21. use lib_dispatch::prelude::*;
  22. use lib_dispatch::runtime::tokio_default_runtime;
  23. use lib_infra::future::{to_fut, Fut};
  24. use module::make_plugins;
  25. pub use module::*;
  26. use crate::deps_resolve::*;
  27. use crate::integrate::server::{
  28. current_server_provider, AppFlowyServerProvider, ServerProviderType,
  29. };
  30. mod deps_resolve;
  31. mod integrate;
  32. pub mod module;
  33. static INIT_LOG: AtomicBool = AtomicBool::new(false);
  34. /// This name will be used as to identify the current [AppFlowyCore] instance.
  35. /// Don't change this.
  36. pub const DEFAULT_NAME: &str = "appflowy";
  37. #[derive(Clone)]
  38. pub struct AppFlowyCoreConfig {
  39. /// Different `AppFlowyCoreConfig` instance should have different name
  40. name: String,
  41. /// Panics if the `root` path is not existing
  42. pub storage_path: String,
  43. log_filter: String,
  44. }
  45. impl fmt::Debug for AppFlowyCoreConfig {
  46. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  47. f.debug_struct("AppFlowyCoreConfig")
  48. .field("storage_path", &self.storage_path)
  49. .finish()
  50. }
  51. }
  52. impl AppFlowyCoreConfig {
  53. pub fn new(root: &str, name: String) -> Self {
  54. AppFlowyCoreConfig {
  55. name,
  56. storage_path: root.to_owned(),
  57. log_filter: create_log_filter("info".to_owned(), vec![]),
  58. }
  59. }
  60. pub fn log_filter(mut self, level: &str, with_crates: Vec<String>) -> Self {
  61. self.log_filter = create_log_filter(level.to_owned(), with_crates);
  62. self
  63. }
  64. }
  65. fn create_log_filter(level: String, with_crates: Vec<String>) -> String {
  66. let level = std::env::var("RUST_LOG").unwrap_or(level);
  67. let mut filters = with_crates
  68. .into_iter()
  69. .map(|crate_name| format!("{}={}", crate_name, level))
  70. .collect::<Vec<String>>();
  71. filters.push(format!("flowy_core={}", level));
  72. filters.push(format!("flowy_folder2={}", level));
  73. filters.push(format!("collab_sync={}", level));
  74. filters.push(format!("collab_folder={}", level));
  75. filters.push(format!("collab_persistence={}", level));
  76. filters.push(format!("collab_database={}", level));
  77. filters.push(format!("collab_plugins={}", level));
  78. filters.push(format!("appflowy_integrate={}", level));
  79. filters.push(format!("collab={}", level));
  80. filters.push(format!("flowy_user={}", level));
  81. filters.push(format!("flowy_document2={}", level));
  82. filters.push(format!("flowy_database2={}", level));
  83. filters.push(format!("flowy_server={}", level));
  84. filters.push(format!("flowy_notification={}", "info"));
  85. filters.push(format!("lib_infra={}", level));
  86. filters.push(format!("flowy_task={}", level));
  87. filters.push(format!("dart_ffi={}", "info"));
  88. filters.push(format!("flowy_sqlite={}", "info"));
  89. filters.push(format!("flowy_net={}", level));
  90. #[cfg(feature = "profiling")]
  91. filters.push(format!("tokio={}", level));
  92. #[cfg(feature = "profiling")]
  93. filters.push(format!("runtime={}", level));
  94. filters.join(",")
  95. }
  96. #[derive(Clone)]
  97. pub struct AppFlowyCore {
  98. #[allow(dead_code)]
  99. pub config: AppFlowyCoreConfig,
  100. pub user_session: Arc<UserSession>,
  101. pub document_manager: Arc<DocumentManager>,
  102. pub folder_manager: Arc<FolderManager>,
  103. pub database_manager: Arc<DatabaseManager>,
  104. pub event_dispatcher: Arc<AFPluginDispatcher>,
  105. pub server_provider: Arc<AppFlowyServerProvider>,
  106. pub task_dispatcher: Arc<RwLock<TaskDispatcher>>,
  107. pub storage_preference: Arc<StorePreferences>,
  108. }
  109. impl AppFlowyCore {
  110. pub fn new(config: AppFlowyCoreConfig) -> Self {
  111. /// The profiling can be used to tracing the performance of the application.
  112. /// Check out the [Link](https://appflowy.gitbook.io/docs/essential-documentation/contribute-to-appflowy/architecture/backend/profiling)
  113. /// for more information.
  114. #[cfg(feature = "profiling")]
  115. console_subscriber::init();
  116. // Init the logger before anything else
  117. init_log(&config);
  118. // Init the key value database
  119. let store_preference = Arc::new(StorePreferences::new(&config.storage_path).unwrap());
  120. tracing::info!("🔥 {:?}", &config);
  121. let runtime = tokio_default_runtime().unwrap();
  122. let task_scheduler = TaskDispatcher::new(Duration::from_secs(2));
  123. let task_dispatcher = Arc::new(RwLock::new(task_scheduler));
  124. runtime.spawn(TaskRunner::run(task_dispatcher.clone()));
  125. let provider_type = current_server_provider(&store_preference);
  126. let server_provider = Arc::new(AppFlowyServerProvider::new(
  127. config.clone(),
  128. provider_type,
  129. get_supabase_config(&store_preference),
  130. Arc::downgrade(&store_preference),
  131. ));
  132. let (
  133. user_session,
  134. folder_manager,
  135. server_provider,
  136. database_manager,
  137. document_manager,
  138. collab_builder,
  139. ) = runtime.block_on(async {
  140. let user_session = mk_user_session(&config, &store_preference, server_provider.clone());
  141. /// The shared collab builder is used to build the [Collab] instance. The plugins will be loaded
  142. /// on demand based on the [CollabPluginConfig].
  143. let collab_builder = Arc::new(AppFlowyCollabBuilder::new(
  144. server_provider.clone(),
  145. Some(Arc::new(SnapshotDBImpl(Arc::downgrade(&user_session)))),
  146. ));
  147. let database_manager = DatabaseDepsResolver::resolve(
  148. Arc::downgrade(&user_session),
  149. task_dispatcher.clone(),
  150. collab_builder.clone(),
  151. server_provider.clone(),
  152. )
  153. .await;
  154. let document_manager = DocumentDepsResolver::resolve(
  155. Arc::downgrade(&user_session),
  156. &database_manager,
  157. collab_builder.clone(),
  158. server_provider.clone(),
  159. );
  160. let folder_manager = FolderDepsResolver::resolve(
  161. Arc::downgrade(&user_session),
  162. &document_manager,
  163. &database_manager,
  164. collab_builder.clone(),
  165. server_provider.clone(),
  166. )
  167. .await;
  168. (
  169. user_session,
  170. folder_manager,
  171. server_provider,
  172. database_manager,
  173. document_manager,
  174. collab_builder,
  175. )
  176. });
  177. let user_status_listener = UserStatusCallbackImpl {
  178. collab_builder,
  179. folder_manager: folder_manager.clone(),
  180. database_manager: database_manager.clone(),
  181. document_manager: document_manager.clone(),
  182. config: config.clone(),
  183. };
  184. let cloned_user_session = Arc::downgrade(&user_session);
  185. runtime.block_on(async move {
  186. if let Some(user_session) = cloned_user_session.upgrade() {
  187. user_session.init(user_status_listener).await;
  188. }
  189. });
  190. let event_dispatcher = Arc::new(AFPluginDispatcher::construct(runtime, || {
  191. make_plugins(
  192. Arc::downgrade(&folder_manager),
  193. Arc::downgrade(&database_manager),
  194. Arc::downgrade(&user_session),
  195. Arc::downgrade(&document_manager),
  196. )
  197. }));
  198. Self {
  199. config,
  200. user_session,
  201. document_manager,
  202. folder_manager,
  203. database_manager,
  204. event_dispatcher,
  205. server_provider,
  206. task_dispatcher,
  207. storage_preference: store_preference,
  208. }
  209. }
  210. /// Only expose the dispatcher in test
  211. pub fn dispatcher(&self) -> Arc<AFPluginDispatcher> {
  212. self.event_dispatcher.clone()
  213. }
  214. }
  215. fn init_log(config: &AppFlowyCoreConfig) {
  216. if !INIT_LOG.load(Ordering::SeqCst) {
  217. INIT_LOG.store(true, Ordering::SeqCst);
  218. let _ = lib_log::Builder::new("AppFlowy-Client", &config.storage_path)
  219. .env_filter(&config.log_filter)
  220. .build();
  221. }
  222. }
  223. fn mk_user_session(
  224. config: &AppFlowyCoreConfig,
  225. storage_preference: &Arc<StorePreferences>,
  226. user_cloud_service_provider: Arc<dyn UserCloudServiceProvider>,
  227. ) -> Arc<UserSession> {
  228. let user_config = UserSessionConfig::new(&config.name, &config.storage_path);
  229. Arc::new(UserSession::new(
  230. user_config,
  231. user_cloud_service_provider,
  232. storage_preference.clone(),
  233. ))
  234. }
  235. struct UserStatusCallbackImpl {
  236. collab_builder: Arc<AppFlowyCollabBuilder>,
  237. folder_manager: Arc<FolderManager>,
  238. database_manager: Arc<DatabaseManager>,
  239. document_manager: Arc<DocumentManager>,
  240. #[allow(dead_code)]
  241. config: AppFlowyCoreConfig,
  242. }
  243. impl UserStatusCallback for UserStatusCallbackImpl {
  244. fn auth_type_did_changed(&self, _auth_type: AuthType) {}
  245. fn did_init(&self, user_id: i64, user_workspace: &UserWorkspace) -> Fut<FlowyResult<()>> {
  246. let user_id = user_id.to_owned();
  247. let user_workspace = user_workspace.clone();
  248. let collab_builder = self.collab_builder.clone();
  249. let folder_manager = self.folder_manager.clone();
  250. let database_manager = self.database_manager.clone();
  251. let document_manager = self.document_manager.clone();
  252. to_fut(async move {
  253. collab_builder.initialize(user_workspace.id.clone());
  254. folder_manager
  255. .initialize(user_id, &user_workspace.id, FolderInitializeData::Empty)
  256. .await?;
  257. database_manager
  258. .initialize(
  259. user_id,
  260. user_workspace.id.clone(),
  261. user_workspace.database_storage_id,
  262. )
  263. .await?;
  264. document_manager
  265. .initialize(user_id, user_workspace.id)
  266. .await?;
  267. Ok(())
  268. })
  269. }
  270. fn did_sign_in(&self, user_id: i64, user_workspace: &UserWorkspace) -> Fut<FlowyResult<()>> {
  271. let user_id = user_id.to_owned();
  272. let user_workspace = user_workspace.clone();
  273. let collab_builder = self.collab_builder.clone();
  274. let folder_manager = self.folder_manager.clone();
  275. let database_manager = self.database_manager.clone();
  276. let document_manager = self.document_manager.clone();
  277. to_fut(async move {
  278. collab_builder.initialize(user_workspace.id.clone());
  279. folder_manager
  280. .initialize_with_workspace_id(user_id, &user_workspace.id)
  281. .await?;
  282. database_manager
  283. .initialize(
  284. user_id,
  285. user_workspace.id.clone(),
  286. user_workspace.database_storage_id,
  287. )
  288. .await?;
  289. document_manager
  290. .initialize(user_id, user_workspace.id)
  291. .await?;
  292. Ok(())
  293. })
  294. }
  295. fn did_sign_up(
  296. &self,
  297. context: SignUpContext,
  298. user_profile: &UserProfile,
  299. user_workspace: &UserWorkspace,
  300. ) -> Fut<FlowyResult<()>> {
  301. let user_profile = user_profile.clone();
  302. let collab_builder = self.collab_builder.clone();
  303. let folder_manager = self.folder_manager.clone();
  304. let database_manager = self.database_manager.clone();
  305. let user_workspace = user_workspace.clone();
  306. let document_manager = self.document_manager.clone();
  307. to_fut(async move {
  308. collab_builder.initialize(user_workspace.id.clone());
  309. folder_manager
  310. .initialize_with_new_user(
  311. user_profile.id,
  312. &user_profile.token,
  313. context.is_new,
  314. context.local_folder,
  315. &user_workspace.id,
  316. )
  317. .await?;
  318. database_manager
  319. .initialize_with_new_user(
  320. user_profile.id,
  321. user_workspace.id.clone(),
  322. user_workspace.database_storage_id,
  323. )
  324. .await?;
  325. document_manager
  326. .initialize_with_new_user(user_profile.id, user_workspace.id)
  327. .await?;
  328. Ok(())
  329. })
  330. }
  331. fn did_expired(&self, _token: &str, user_id: i64) -> Fut<FlowyResult<()>> {
  332. let folder_manager = self.folder_manager.clone();
  333. to_fut(async move {
  334. folder_manager.clear(user_id).await;
  335. Ok(())
  336. })
  337. }
  338. fn open_workspace(&self, user_id: i64, user_workspace: &UserWorkspace) -> Fut<FlowyResult<()>> {
  339. let user_workspace = user_workspace.clone();
  340. let collab_builder = self.collab_builder.clone();
  341. let folder_manager = self.folder_manager.clone();
  342. let database_manager = self.database_manager.clone();
  343. let document_manager = self.document_manager.clone();
  344. to_fut(async move {
  345. collab_builder.initialize(user_workspace.id.clone());
  346. folder_manager
  347. .initialize_with_workspace_id(user_id, &user_workspace.id)
  348. .await?;
  349. database_manager
  350. .initialize(
  351. user_id,
  352. user_workspace.id.clone(),
  353. user_workspace.database_storage_id,
  354. )
  355. .await?;
  356. document_manager
  357. .initialize(user_id, user_workspace.id)
  358. .await?;
  359. Ok(())
  360. })
  361. }
  362. fn did_update_network(&self, reachable: bool) {
  363. self.collab_builder.update_network(reachable);
  364. }
  365. }
  366. impl From<ServerProviderType> for CollabStorageType {
  367. fn from(server_provider: ServerProviderType) -> Self {
  368. match server_provider {
  369. ServerProviderType::Local => CollabStorageType::Local,
  370. ServerProviderType::AppFlowyCloud => CollabStorageType::Local,
  371. ServerProviderType::Supabase => CollabStorageType::Supabase,
  372. }
  373. }
  374. }