lib.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408
  1. mod deps_resolve;
  2. pub mod module;
  3. use crate::deps_resolve::*;
  4. use flowy_client_ws::{listen_on_websocket, FlowyWebSocketConnect, NetworkType};
  5. use flowy_database::entities::DatabaseLayoutPB;
  6. use flowy_database::manager::DatabaseManager;
  7. use flowy_document::entities::DocumentVersionPB;
  8. use flowy_document::{DocumentConfig, DocumentManager};
  9. use flowy_document2::manager::DocumentManager as DocumentManager2;
  10. use flowy_error::FlowyResult;
  11. use flowy_folder::errors::FlowyError;
  12. use flowy_folder2::manager::Folder2Manager;
  13. pub use flowy_net::get_client_server_configuration;
  14. use flowy_net::local_server::LocalServer;
  15. use flowy_net::ClientServerConfiguration;
  16. use flowy_task::{TaskDispatcher, TaskRunner};
  17. use flowy_user::event_map::UserStatusCallback;
  18. use flowy_user::services::{UserSession, UserSessionConfig};
  19. use lib_dispatch::prelude::*;
  20. use lib_dispatch::runtime::tokio_default_runtime;
  21. use lib_infra::future::{to_fut, Fut};
  22. use module::make_plugins;
  23. pub use module::*;
  24. use std::time::Duration;
  25. use std::{
  26. fmt,
  27. sync::{
  28. atomic::{AtomicBool, Ordering},
  29. Arc,
  30. },
  31. };
  32. use tokio::sync::{broadcast, RwLock};
  33. use user_model::UserProfile;
  34. static INIT_LOG: AtomicBool = AtomicBool::new(false);
  35. /// This name will be used as to identify the current [AppFlowyCore] instance.
  36. /// Don't change this.
  37. pub const DEFAULT_NAME: &str = "appflowy";
  38. #[derive(Clone)]
  39. pub struct AppFlowyCoreConfig {
  40. /// Different `AppFlowyCoreConfig` instance should have different name
  41. name: String,
  42. /// Panics if the `root` path is not existing
  43. storage_path: String,
  44. log_filter: String,
  45. server_config: ClientServerConfiguration,
  46. pub document: DocumentConfig,
  47. }
  48. impl fmt::Debug for AppFlowyCoreConfig {
  49. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  50. f.debug_struct("AppFlowyCoreConfig")
  51. .field("storage_path", &self.storage_path)
  52. .field("server-config", &self.server_config)
  53. .field("document-config", &self.document)
  54. .finish()
  55. }
  56. }
  57. impl AppFlowyCoreConfig {
  58. pub fn new(root: &str, name: String, server_config: ClientServerConfiguration) -> Self {
  59. AppFlowyCoreConfig {
  60. name,
  61. storage_path: root.to_owned(),
  62. log_filter: create_log_filter("info".to_owned(), vec![]),
  63. server_config,
  64. document: DocumentConfig::default(),
  65. }
  66. }
  67. pub fn with_document_version(mut self, version: DocumentVersionPB) -> Self {
  68. self.document.version = version;
  69. self
  70. }
  71. pub fn log_filter(mut self, level: &str, with_crates: Vec<String>) -> Self {
  72. self.log_filter = create_log_filter(level.to_owned(), with_crates);
  73. self
  74. }
  75. }
  76. fn create_log_filter(level: String, with_crates: Vec<String>) -> String {
  77. let level = std::env::var("RUST_LOG").unwrap_or(level);
  78. let mut filters = with_crates
  79. .into_iter()
  80. .map(|crate_name| format!("{}={}", crate_name, level))
  81. .collect::<Vec<String>>();
  82. filters.push(format!("flowy_core={}", level));
  83. filters.push(format!("flowy_folder={}", level));
  84. filters.push(format!("flowy_folder2={}", level));
  85. filters.push(format!("collab_folder={}", level));
  86. filters.push(format!("flowy_user={}", level));
  87. filters.push(format!("flowy_document={}", level));
  88. filters.push(format!("flowy_database={}", level));
  89. filters.push(format!("flowy_sync={}", "info"));
  90. filters.push(format!("flowy_client_sync={}", "info"));
  91. filters.push(format!("flowy_notification={}", "info"));
  92. filters.push(format!("lib_ot={}", level));
  93. filters.push(format!("lib_ws={}", level));
  94. filters.push(format!("lib_infra={}", level));
  95. filters.push(format!("flowy_sync={}", level));
  96. filters.push(format!("flowy_revision={}", level));
  97. filters.push(format!("flowy_revision_persistence={}", level));
  98. filters.push(format!("flowy_task={}", level));
  99. // filters.push(format!("lib_dispatch={}", level));
  100. filters.push(format!("dart_ffi={}", "info"));
  101. filters.push(format!("flowy_sqlite={}", "info"));
  102. filters.push(format!("flowy_net={}", level));
  103. #[cfg(feature = "profiling")]
  104. filters.push(format!("tokio={}", level));
  105. #[cfg(feature = "profiling")]
  106. filters.push(format!("runtime={}", level));
  107. filters.join(",")
  108. }
  109. #[derive(Clone)]
  110. pub struct AppFlowyCore {
  111. #[allow(dead_code)]
  112. pub config: AppFlowyCoreConfig,
  113. pub user_session: Arc<UserSession>,
  114. pub document_manager: Arc<DocumentManager>,
  115. pub document_manager2: Arc<DocumentManager2>,
  116. pub folder_manager: Arc<Folder2Manager>,
  117. pub database_manager: Arc<DatabaseManager>,
  118. pub event_dispatcher: Arc<AFPluginDispatcher>,
  119. pub ws_conn: Arc<FlowyWebSocketConnect>,
  120. pub local_server: Option<Arc<LocalServer>>,
  121. pub task_dispatcher: Arc<RwLock<TaskDispatcher>>,
  122. }
  123. impl AppFlowyCore {
  124. pub fn new(config: AppFlowyCoreConfig) -> Self {
  125. #[cfg(feature = "profiling")]
  126. console_subscriber::init();
  127. init_log(&config);
  128. init_kv(&config.storage_path);
  129. tracing::debug!("🔥 {:?}", config);
  130. let runtime = tokio_default_runtime().unwrap();
  131. let task_scheduler = TaskDispatcher::new(Duration::from_secs(2));
  132. let task_dispatcher = Arc::new(RwLock::new(task_scheduler));
  133. runtime.spawn(TaskRunner::run(task_dispatcher.clone()));
  134. let (local_server, ws_conn) = mk_local_server(&config.server_config);
  135. let (
  136. user_session,
  137. document_manager,
  138. folder_manager,
  139. local_server,
  140. database_manager,
  141. document_manager2,
  142. ) = runtime.block_on(async {
  143. let user_session = mk_user_session(&config, &local_server, &config.server_config);
  144. let document_manager = DocumentDepsResolver::resolve(
  145. local_server.clone(),
  146. ws_conn.clone(),
  147. user_session.clone(),
  148. &config.server_config,
  149. &config.document,
  150. );
  151. let database_manager = DatabaseDepsResolver::resolve(
  152. ws_conn.clone(),
  153. user_session.clone(),
  154. task_dispatcher.clone(),
  155. )
  156. .await;
  157. let folder_manager =
  158. Folder2DepsResolver::resolve(user_session.clone(), &document_manager, &database_manager)
  159. .await;
  160. let document_manager2 =
  161. Document2DepsResolver::resolve(user_session.clone(), &database_manager);
  162. if let Some(local_server) = local_server.as_ref() {
  163. local_server.run();
  164. }
  165. ws_conn.init().await;
  166. (
  167. user_session,
  168. document_manager,
  169. folder_manager,
  170. local_server,
  171. database_manager,
  172. document_manager2,
  173. )
  174. });
  175. let user_status_listener = UserStatusListener {
  176. document_manager: document_manager.clone(),
  177. folder_manager: folder_manager.clone(),
  178. database_manager: database_manager.clone(),
  179. ws_conn: ws_conn.clone(),
  180. config: config.clone(),
  181. };
  182. let user_status_callback = UserStatusCallbackImpl {
  183. listener: Arc::new(user_status_listener),
  184. };
  185. let cloned_user_session = user_session.clone();
  186. runtime.block_on(async move {
  187. cloned_user_session.clone().init(user_status_callback).await;
  188. });
  189. let event_dispatcher = Arc::new(AFPluginDispatcher::construct(runtime, || {
  190. make_plugins(
  191. &ws_conn,
  192. &folder_manager,
  193. &database_manager,
  194. &user_session,
  195. &document_manager,
  196. &document_manager2,
  197. )
  198. }));
  199. _start_listening(&event_dispatcher, &ws_conn, &folder_manager);
  200. Self {
  201. config,
  202. user_session,
  203. document_manager,
  204. document_manager2,
  205. folder_manager,
  206. database_manager,
  207. event_dispatcher,
  208. ws_conn,
  209. local_server,
  210. task_dispatcher,
  211. }
  212. }
  213. pub fn dispatcher(&self) -> Arc<AFPluginDispatcher> {
  214. self.event_dispatcher.clone()
  215. }
  216. }
  217. fn _start_listening(
  218. event_dispatcher: &AFPluginDispatcher,
  219. ws_conn: &Arc<FlowyWebSocketConnect>,
  220. folder_manager: &Arc<Folder2Manager>,
  221. ) {
  222. let subscribe_network_type = ws_conn.subscribe_network_ty();
  223. let folder_manager = folder_manager.clone();
  224. let _cloned_folder_manager = folder_manager;
  225. let ws_conn = ws_conn.clone();
  226. event_dispatcher.spawn(async move {
  227. listen_on_websocket(ws_conn.clone());
  228. });
  229. event_dispatcher.spawn(async move {
  230. _listen_network_status(subscribe_network_type).await;
  231. });
  232. }
  233. fn mk_local_server(
  234. server_config: &ClientServerConfiguration,
  235. ) -> (Option<Arc<LocalServer>>, Arc<FlowyWebSocketConnect>) {
  236. let ws_addr = server_config.ws_addr();
  237. if cfg!(feature = "http_sync") {
  238. let ws_conn = Arc::new(FlowyWebSocketConnect::new(ws_addr));
  239. (None, ws_conn)
  240. } else {
  241. let context = flowy_net::local_server::build_server(server_config);
  242. let local_ws = Arc::new(context.local_ws);
  243. let ws_conn = Arc::new(FlowyWebSocketConnect::from_local(ws_addr, local_ws));
  244. (Some(Arc::new(context.local_server)), ws_conn)
  245. }
  246. }
  247. async fn _listen_network_status(mut subscribe: broadcast::Receiver<NetworkType>) {
  248. while let Ok(_new_type) = subscribe.recv().await {
  249. // core.network_state_changed(new_type);
  250. }
  251. }
  252. fn init_kv(root: &str) {
  253. match flowy_sqlite::kv::KV::init(root) {
  254. Ok(_) => {},
  255. Err(e) => tracing::error!("Init kv store failed: {}", e),
  256. }
  257. }
  258. fn init_log(config: &AppFlowyCoreConfig) {
  259. if !INIT_LOG.load(Ordering::SeqCst) {
  260. INIT_LOG.store(true, Ordering::SeqCst);
  261. let _ = lib_log::Builder::new("AppFlowy-Client", &config.storage_path)
  262. .env_filter(&config.log_filter)
  263. .build();
  264. }
  265. }
  266. fn mk_user_session(
  267. config: &AppFlowyCoreConfig,
  268. local_server: &Option<Arc<LocalServer>>,
  269. server_config: &ClientServerConfiguration,
  270. ) -> Arc<UserSession> {
  271. let user_config = UserSessionConfig::new(&config.name, &config.storage_path);
  272. let cloud_service = UserDepsResolver::resolve(local_server, server_config);
  273. Arc::new(UserSession::new(user_config, cloud_service))
  274. }
  275. struct UserStatusListener {
  276. document_manager: Arc<DocumentManager>,
  277. folder_manager: Arc<Folder2Manager>,
  278. database_manager: Arc<DatabaseManager>,
  279. ws_conn: Arc<FlowyWebSocketConnect>,
  280. #[allow(dead_code)]
  281. config: AppFlowyCoreConfig,
  282. }
  283. impl UserStatusListener {
  284. async fn did_sign_in(&self, token: &str, user_id: i64) -> FlowyResult<()> {
  285. self.folder_manager.initialize(user_id).await?;
  286. self.document_manager.initialize(user_id).await?;
  287. let cloned_folder_manager = self.folder_manager.clone();
  288. let get_views_fn = to_fut(async move {
  289. cloned_folder_manager
  290. .get_current_workspace_views()
  291. .await
  292. .unwrap_or_default()
  293. .into_iter()
  294. .filter(|view| view.layout.is_database())
  295. .map(|view| {
  296. (
  297. view.id,
  298. view.name,
  299. layout_type_from_view_layout(view.layout),
  300. )
  301. })
  302. .collect::<Vec<(String, String, DatabaseLayoutPB)>>()
  303. });
  304. self
  305. .database_manager
  306. .initialize(user_id, token, get_views_fn)
  307. .await?;
  308. self
  309. .ws_conn
  310. .start(token.to_owned(), user_id.to_owned())
  311. .await?;
  312. Ok(())
  313. }
  314. async fn did_sign_up(&self, user_profile: &UserProfile) -> FlowyResult<()> {
  315. self
  316. .folder_manager
  317. .initialize_with_new_user(user_profile.id, &user_profile.token)
  318. .await?;
  319. self
  320. .document_manager
  321. .initialize_with_new_user(user_profile.id, &user_profile.token)
  322. .await?;
  323. self
  324. .database_manager
  325. .initialize_with_new_user(user_profile.id, &user_profile.token)
  326. .await?;
  327. self
  328. .ws_conn
  329. .start(user_profile.token.clone(), user_profile.id)
  330. .await?;
  331. Ok(())
  332. }
  333. async fn did_expired(&self, _token: &str, user_id: i64) -> FlowyResult<()> {
  334. self.folder_manager.clear(user_id).await;
  335. self.ws_conn.stop().await;
  336. Ok(())
  337. }
  338. }
  339. struct UserStatusCallbackImpl {
  340. listener: Arc<UserStatusListener>,
  341. }
  342. impl UserStatusCallback for UserStatusCallbackImpl {
  343. fn did_sign_in(&self, token: &str, user_id: i64) -> Fut<FlowyResult<()>> {
  344. let listener = self.listener.clone();
  345. let token = token.to_owned();
  346. let user_id = user_id.to_owned();
  347. to_fut(async move { listener.did_sign_in(&token, user_id).await })
  348. }
  349. fn did_sign_up(&self, user_profile: &UserProfile) -> Fut<FlowyResult<()>> {
  350. let listener = self.listener.clone();
  351. let user_profile = user_profile.clone();
  352. to_fut(async move { listener.did_sign_up(&user_profile).await })
  353. }
  354. fn did_expired(&self, token: &str, user_id: i64) -> Fut<FlowyResult<()>> {
  355. let listener = self.listener.clone();
  356. let token = token.to_owned();
  357. let user_id = user_id.to_owned();
  358. to_fut(async move { listener.did_expired(&token, user_id).await })
  359. }
  360. fn will_migrated(&self, _token: &str, _old_user_id: &str, _user_id: i64) -> Fut<FlowyResult<()>> {
  361. // Read the folder data
  362. todo!()
  363. }
  364. }