lib.rs 12 KB


  1. mod deps_resolve;
  2. pub mod module;
  3. use crate::deps_resolve::*;
  4. use flowy_client_ws::{listen_on_websocket, FlowyWebSocketConnect, NetworkType};
  5. use flowy_database::entities::DatabaseLayoutPB;
  6. use flowy_database::manager::DatabaseManager;
  7. use flowy_document::entities::DocumentVersionPB;
  8. use flowy_document::{DocumentConfig, DocumentManager};
  9. use flowy_document2::manager::DocumentManager as DocumentManager2;
  10. use flowy_error::FlowyResult;
  11. use flowy_folder::errors::FlowyError;
  12. use flowy_folder2::manager::Folder2Manager;
  13. pub use flowy_net::get_client_server_configuration;
  14. use flowy_net::local_server::LocalServer;
  15. use flowy_net::ClientServerConfiguration;
  16. use flowy_task::{TaskDispatcher, TaskRunner};
  17. use flowy_user::event_map::UserStatusCallback;
  18. use flowy_user::services::{UserSession, UserSessionConfig};
  19. use lib_dispatch::prelude::*;
  20. use lib_dispatch::runtime::tokio_default_runtime;
  21. use lib_infra::future::{to_fut, Fut};
  22. use module::make_plugins;
  23. pub use module::*;
  24. use std::time::Duration;
  25. use std::{
  26. fmt,
  27. sync::{
  28. atomic::{AtomicBool, Ordering},
  29. Arc,
  30. },
  31. };
  32. use tokio::sync::{broadcast, RwLock};
  33. use user_model::UserProfile;
  34. static INIT_LOG: AtomicBool = AtomicBool::new(false);
  35. /// This name will be used as to identify the current [AppFlowyCore] instance.
  36. /// Don't change this.
  37. pub const DEFAULT_NAME: &str = "appflowy";
  38. #[derive(Clone)]
  39. pub struct AppFlowyCoreConfig {
  40. /// Different `AppFlowyCoreConfig` instance should have different name
  41. name: String,
  42. /// Panics if the `root` path is not existing
  43. storage_path: String,
  44. log_filter: String,
  45. server_config: ClientServerConfiguration,
  46. pub document: DocumentConfig,
  47. }
  48. impl fmt::Debug for AppFlowyCoreConfig {
  49. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  50. f.debug_struct("AppFlowyCoreConfig")
  51. .field("storage_path", &self.storage_path)
  52. .field("server-config", &self.server_config)
  53. .field("document-config", &self.document)
  54. .finish()
  55. }
  56. }
  57. impl AppFlowyCoreConfig {
  58. pub fn new(root: &str, name: String, server_config: ClientServerConfiguration) -> Self {
  59. AppFlowyCoreConfig {
  60. name,
  61. storage_path: root.to_owned(),
  62. log_filter: create_log_filter("info".to_owned(), vec![]),
  63. server_config,
  64. document: DocumentConfig::default(),
  65. }
  66. }
  67. pub fn with_document_version(mut self, version: DocumentVersionPB) -> Self {
  68. self.document.version = version;
  69. self
  70. }
  71. pub fn log_filter(mut self, level: &str, with_crates: Vec<String>) -> Self {
  72. self.log_filter = create_log_filter(level.to_owned(), with_crates);
  73. self
  74. }
  75. }
  76. fn create_log_filter(level: String, with_crates: Vec<String>) -> String {
  77. let level = std::env::var("RUST_LOG").unwrap_or(level);
  78. let mut filters = with_crates
  79. .into_iter()
  80. .map(|crate_name| format!("{}={}", crate_name, level))
  81. .collect::<Vec<String>>();
  82. filters.push(format!("flowy_core={}", level));
  83. filters.push(format!("flowy_folder={}", level));
  84. filters.push(format!("flowy_folder2={}", level));
  85. filters.push(format!("collab_folder={}", level));
  86. filters.push(format!("collab_persistence={}", level));
  87. filters.push(format!("collab={}", level));
  88. filters.push(format!("flowy_user={}", level));
  89. filters.push(format!("flowy_document={}", level));
  90. filters.push(format!("flowy_document2={}", level));
  91. filters.push(format!("flowy_database={}", level));
  92. filters.push(format!("flowy_sync={}", "info"));
  93. filters.push(format!("flowy_client_sync={}", "info"));
  94. filters.push(format!("flowy_notification={}", "info"));
  95. filters.push(format!("lib_ot={}", level));
  96. filters.push(format!("lib_ws={}", level));
  97. filters.push(format!("lib_infra={}", level));
  98. filters.push(format!("flowy_sync={}", level));
  99. filters.push(format!("flowy_revision={}", level));
  100. filters.push(format!("flowy_revision_persistence={}", level));
  101. filters.push(format!("flowy_task={}", level));
  102. // filters.push(format!("lib_dispatch={}", level));
  103. filters.push(format!("dart_ffi={}", "info"));
  104. filters.push(format!("flowy_sqlite={}", "info"));
  105. filters.push(format!("flowy_net={}", level));
  106. #[cfg(feature = "profiling")]
  107. filters.push(format!("tokio={}", level));
  108. #[cfg(feature = "profiling")]
  109. filters.push(format!("runtime={}", level));
  110. filters.join(",")
  111. }
  112. #[derive(Clone)]
  113. pub struct AppFlowyCore {
  114. #[allow(dead_code)]
  115. pub config: AppFlowyCoreConfig,
  116. pub user_session: Arc<UserSession>,
  117. pub document_manager: Arc<DocumentManager>,
  118. pub document_manager2: Arc<DocumentManager2>,
  119. pub folder_manager: Arc<Folder2Manager>,
  120. pub database_manager: Arc<DatabaseManager>,
  121. pub event_dispatcher: Arc<AFPluginDispatcher>,
  122. pub ws_conn: Arc<FlowyWebSocketConnect>,
  123. pub local_server: Option<Arc<LocalServer>>,
  124. pub task_dispatcher: Arc<RwLock<TaskDispatcher>>,
  125. }
  126. impl AppFlowyCore {
  127. pub fn new(config: AppFlowyCoreConfig) -> Self {
  128. #[cfg(feature = "profiling")]
  129. console_subscriber::init();
  130. init_log(&config);
  131. init_kv(&config.storage_path);
  132. tracing::debug!("🔥 {:?}", config);
  133. let runtime = tokio_default_runtime().unwrap();
  134. let task_scheduler = TaskDispatcher::new(Duration::from_secs(2));
  135. let task_dispatcher = Arc::new(RwLock::new(task_scheduler));
  136. runtime.spawn(TaskRunner::run(task_dispatcher.clone()));
  137. let (local_server, ws_conn) = mk_local_server(&config.server_config);
  138. let (
  139. user_session,
  140. document_manager,
  141. folder_manager,
  142. local_server,
  143. database_manager,
  144. document_manager2,
  145. ) = runtime.block_on(async {
  146. let user_session = mk_user_session(&config, &local_server, &config.server_config);
  147. let document_manager = DocumentDepsResolver::resolve(
  148. local_server.clone(),
  149. ws_conn.clone(),
  150. user_session.clone(),
  151. &config.server_config,
  152. &config.document,
  153. );
  154. let database_manager = DatabaseDepsResolver::resolve(
  155. ws_conn.clone(),
  156. user_session.clone(),
  157. task_dispatcher.clone(),
  158. )
  159. .await;
  160. let folder_manager =
  161. Folder2DepsResolver::resolve(user_session.clone(), &document_manager, &database_manager)
  162. .await;
  163. let document_manager2 =
  164. Document2DepsResolver::resolve(user_session.clone(), &database_manager);
  165. if let Some(local_server) = local_server.as_ref() {
  166. local_server.run();
  167. }
  168. ws_conn.init().await;
  169. (
  170. user_session,
  171. document_manager,
  172. folder_manager,
  173. local_server,
  174. database_manager,
  175. document_manager2,
  176. )
  177. });
  178. let user_status_listener = UserStatusListener {
  179. document_manager: document_manager.clone(),
  180. folder_manager: folder_manager.clone(),
  181. database_manager: database_manager.clone(),
  182. ws_conn: ws_conn.clone(),
  183. config: config.clone(),
  184. };
  185. let user_status_callback = UserStatusCallbackImpl {
  186. listener: Arc::new(user_status_listener),
  187. };
  188. let cloned_user_session = user_session.clone();
  189. runtime.block_on(async move {
  190. cloned_user_session.clone().init(user_status_callback).await;
  191. });
  192. let event_dispatcher = Arc::new(AFPluginDispatcher::construct(runtime, || {
  193. make_plugins(
  194. &ws_conn,
  195. &folder_manager,
  196. &database_manager,
  197. &user_session,
  198. &document_manager,
  199. &document_manager2,
  200. )
  201. }));
  202. _start_listening(&event_dispatcher, &ws_conn, &folder_manager);
  203. Self {
  204. config,
  205. user_session,
  206. document_manager,
  207. document_manager2,
  208. folder_manager,
  209. database_manager,
  210. event_dispatcher,
  211. ws_conn,
  212. local_server,
  213. task_dispatcher,
  214. }
  215. }
  216. pub fn dispatcher(&self) -> Arc<AFPluginDispatcher> {
  217. self.event_dispatcher.clone()
  218. }
  219. }
  220. fn _start_listening(
  221. event_dispatcher: &AFPluginDispatcher,
  222. ws_conn: &Arc<FlowyWebSocketConnect>,
  223. folder_manager: &Arc<Folder2Manager>,
  224. ) {
  225. let subscribe_network_type = ws_conn.subscribe_network_ty();
  226. let folder_manager = folder_manager.clone();
  227. let _cloned_folder_manager = folder_manager;
  228. let ws_conn = ws_conn.clone();
  229. event_dispatcher.spawn(async move {
  230. listen_on_websocket(ws_conn.clone());
  231. });
  232. event_dispatcher.spawn(async move {
  233. _listen_network_status(subscribe_network_type).await;
  234. });
  235. }
  236. fn mk_local_server(
  237. server_config: &ClientServerConfiguration,
  238. ) -> (Option<Arc<LocalServer>>, Arc<FlowyWebSocketConnect>) {
  239. let ws_addr = server_config.ws_addr();
  240. if cfg!(feature = "http_sync") {
  241. let ws_conn = Arc::new(FlowyWebSocketConnect::new(ws_addr));
  242. (None, ws_conn)
  243. } else {
  244. let context = flowy_net::local_server::build_server(server_config);
  245. let local_ws = Arc::new(context.local_ws);
  246. let ws_conn = Arc::new(FlowyWebSocketConnect::from_local(ws_addr, local_ws));
  247. (Some(Arc::new(context.local_server)), ws_conn)
  248. }
  249. }
  250. async fn _listen_network_status(mut subscribe: broadcast::Receiver<NetworkType>) {
  251. while let Ok(_new_type) = subscribe.recv().await {
  252. // core.network_state_changed(new_type);
  253. }
  254. }
  255. fn init_kv(root: &str) {
  256. match flowy_sqlite::kv::KV::init(root) {
  257. Ok(_) => {},
  258. Err(e) => tracing::error!("Init kv store failed: {}", e),
  259. }
  260. }
  261. fn init_log(config: &AppFlowyCoreConfig) {
  262. if !INIT_LOG.load(Ordering::SeqCst) {
  263. INIT_LOG.store(true, Ordering::SeqCst);
  264. let _ = lib_log::Builder::new("AppFlowy-Client", &config.storage_path)
  265. .env_filter(&config.log_filter)
  266. .build();
  267. }
  268. }
  269. fn mk_user_session(
  270. config: &AppFlowyCoreConfig,
  271. local_server: &Option<Arc<LocalServer>>,
  272. server_config: &ClientServerConfiguration,
  273. ) -> Arc<UserSession> {
  274. let user_config = UserSessionConfig::new(&config.name, &config.storage_path);
  275. let cloud_service = UserDepsResolver::resolve(local_server, server_config);
  276. Arc::new(UserSession::new(user_config, cloud_service))
  277. }
  278. struct UserStatusListener {
  279. document_manager: Arc<DocumentManager>,
  280. folder_manager: Arc<Folder2Manager>,
  281. database_manager: Arc<DatabaseManager>,
  282. ws_conn: Arc<FlowyWebSocketConnect>,
  283. #[allow(dead_code)]
  284. config: AppFlowyCoreConfig,
  285. }
  286. impl UserStatusListener {
  287. async fn did_sign_in(&self, token: &str, user_id: i64) -> FlowyResult<()> {
  288. self.folder_manager.initialize(user_id).await?;
  289. self.document_manager.initialize(user_id).await?;
  290. let cloned_folder_manager = self.folder_manager.clone();
  291. let get_views_fn = to_fut(async move {
  292. cloned_folder_manager
  293. .get_current_workspace_views()
  294. .await
  295. .unwrap_or_default()
  296. .into_iter()
  297. .filter(|view| view.layout.is_database())
  298. .map(|view| {
  299. (
  300. view.id,
  301. view.name,
  302. layout_type_from_view_layout(view.layout),
  303. )
  304. })
  305. .collect::<Vec<(String, String, DatabaseLayoutPB)>>()
  306. });
  307. self
  308. .database_manager
  309. .initialize(user_id, token, get_views_fn)
  310. .await?;
  311. self
  312. .ws_conn
  313. .start(token.to_owned(), user_id.to_owned())
  314. .await?;
  315. Ok(())
  316. }
  317. async fn did_sign_up(&self, user_profile: &UserProfile) -> FlowyResult<()> {
  318. self
  319. .folder_manager
  320. .initialize_with_new_user(user_profile.id, &user_profile.token)
  321. .await?;
  322. self
  323. .document_manager
  324. .initialize_with_new_user(user_profile.id, &user_profile.token)
  325. .await?;
  326. self
  327. .database_manager
  328. .initialize_with_new_user(user_profile.id, &user_profile.token)
  329. .await?;
  330. self
  331. .ws_conn
  332. .start(user_profile.token.clone(), user_profile.id)
  333. .await?;
  334. Ok(())
  335. }
  336. async fn did_expired(&self, _token: &str, user_id: i64) -> FlowyResult<()> {
  337. self.folder_manager.clear(user_id).await;
  338. self.ws_conn.stop().await;
  339. Ok(())
  340. }
  341. }
  342. struct UserStatusCallbackImpl {
  343. listener: Arc<UserStatusListener>,
  344. }
  345. impl UserStatusCallback for UserStatusCallbackImpl {
  346. fn did_sign_in(&self, token: &str, user_id: i64) -> Fut<FlowyResult<()>> {
  347. let listener = self.listener.clone();
  348. let token = token.to_owned();
  349. let user_id = user_id.to_owned();
  350. to_fut(async move { listener.did_sign_in(&token, user_id).await })
  351. }
  352. fn did_sign_up(&self, user_profile: &UserProfile) -> Fut<FlowyResult<()>> {
  353. let listener = self.listener.clone();
  354. let user_profile = user_profile.clone();
  355. to_fut(async move { listener.did_sign_up(&user_profile).await })
  356. }
  357. fn did_expired(&self, token: &str, user_id: i64) -> Fut<FlowyResult<()>> {
  358. let listener = self.listener.clone();
  359. let token = token.to_owned();
  360. let user_id = user_id.to_owned();
  361. to_fut(async move { listener.did_expired(&token, user_id).await })
  362. }
  363. fn will_migrated(&self, _token: &str, _old_user_id: &str, _user_id: i64) -> Fut<FlowyResult<()>> {
  364. // Read the folder data
  365. todo!()
  366. }
  367. }