lib.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. use std::time::Duration;
  2. use std::{
  3. fmt,
  4. sync::{
  5. atomic::{AtomicBool, Ordering},
  6. Arc,
  7. },
  8. };
  9. use tokio::sync::{broadcast, RwLock};
  10. use flowy_client_ws::{listen_on_websocket, FlowyWebSocketConnect, NetworkType};
  11. use flowy_database2::DatabaseManager2;
  12. use flowy_document::entities::DocumentVersionPB;
  13. use flowy_document::{DocumentConfig, DocumentManager};
  14. use flowy_document2::manager::DocumentManager as DocumentManager2;
  15. use flowy_error::FlowyResult;
  16. use flowy_folder2::manager::Folder2Manager;
  17. pub use flowy_net::get_client_server_configuration;
  18. use flowy_net::local_server::LocalServer;
  19. use flowy_net::ClientServerConfiguration;
  20. use flowy_task::{TaskDispatcher, TaskRunner};
  21. use flowy_user::event_map::UserStatusCallback;
  22. use flowy_user::services::{UserSession, UserSessionConfig};
  23. use lib_dispatch::prelude::*;
  24. use lib_dispatch::runtime::tokio_default_runtime;
  25. use lib_infra::future::{to_fut, Fut};
  26. use module::make_plugins;
  27. pub use module::*;
  28. use user_model::UserProfile;
  29. use crate::deps_resolve::*;
  30. mod deps_resolve;
  31. pub mod module;
  32. static INIT_LOG: AtomicBool = AtomicBool::new(false);
  33. /// This name will be used as to identify the current [AppFlowyCore] instance.
  34. /// Don't change this.
  35. pub const DEFAULT_NAME: &str = "appflowy";
  36. #[derive(Clone)]
  37. pub struct AppFlowyCoreConfig {
  38. /// Different `AppFlowyCoreConfig` instance should have different name
  39. name: String,
  40. /// Panics if the `root` path is not existing
  41. storage_path: String,
  42. log_filter: String,
  43. server_config: ClientServerConfiguration,
  44. pub document: DocumentConfig,
  45. }
  46. impl fmt::Debug for AppFlowyCoreConfig {
  47. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  48. f.debug_struct("AppFlowyCoreConfig")
  49. .field("storage_path", &self.storage_path)
  50. .field("server-config", &self.server_config)
  51. .field("document-config", &self.document)
  52. .finish()
  53. }
  54. }
  55. impl AppFlowyCoreConfig {
  56. pub fn new(root: &str, name: String, server_config: ClientServerConfiguration) -> Self {
  57. AppFlowyCoreConfig {
  58. name,
  59. storage_path: root.to_owned(),
  60. log_filter: create_log_filter("info".to_owned(), vec![]),
  61. server_config,
  62. document: DocumentConfig::default(),
  63. }
  64. }
  65. pub fn with_document_version(mut self, version: DocumentVersionPB) -> Self {
  66. self.document.version = version;
  67. self
  68. }
  69. pub fn log_filter(mut self, level: &str, with_crates: Vec<String>) -> Self {
  70. self.log_filter = create_log_filter(level.to_owned(), with_crates);
  71. self
  72. }
  73. }
  74. fn create_log_filter(level: String, with_crates: Vec<String>) -> String {
  75. let level = std::env::var("RUST_LOG").unwrap_or(level);
  76. let mut filters = with_crates
  77. .into_iter()
  78. .map(|crate_name| format!("{}={}", crate_name, level))
  79. .collect::<Vec<String>>();
  80. filters.push(format!("flowy_core={}", level));
  81. filters.push(format!("flowy_folder={}", level));
  82. filters.push(format!("flowy_folder2={}", level));
  83. filters.push(format!("collab_folder={}", level));
  84. // filters.push(format!("collab_persistence={}", level));
  85. filters.push(format!("collab_database={}", level));
  86. filters.push(format!("collab={}", level));
  87. filters.push(format!("flowy_user={}", level));
  88. filters.push(format!("flowy_document={}", level));
  89. filters.push(format!("flowy_document2={}", level));
  90. filters.push(format!("flowy_database={}", level));
  91. filters.push(format!("flowy_database2={}", level));
  92. filters.push(format!("flowy_sync={}", "info"));
  93. filters.push(format!("flowy_client_sync={}", "info"));
  94. filters.push(format!("flowy_notification={}", "info"));
  95. filters.push(format!("lib_ot={}", level));
  96. filters.push(format!("lib_ws={}", level));
  97. filters.push(format!("lib_infra={}", level));
  98. filters.push(format!("flowy_sync={}", level));
  99. filters.push(format!("flowy_revision={}", level));
  100. filters.push(format!("flowy_revision_persistence={}", level));
  101. filters.push(format!("flowy_task={}", level));
  102. // filters.push(format!("lib_dispatch={}", level));
  103. filters.push(format!("dart_ffi={}", "info"));
  104. filters.push(format!("flowy_sqlite={}", "info"));
  105. filters.push(format!("flowy_net={}", level));
  106. #[cfg(feature = "profiling")]
  107. filters.push(format!("tokio={}", level));
  108. #[cfg(feature = "profiling")]
  109. filters.push(format!("runtime={}", level));
  110. filters.join(",")
  111. }
  112. #[derive(Clone)]
  113. pub struct AppFlowyCore {
  114. #[allow(dead_code)]
  115. pub config: AppFlowyCoreConfig,
  116. pub user_session: Arc<UserSession>,
  117. pub document_manager: Arc<DocumentManager>,
  118. pub document_manager2: Arc<DocumentManager2>,
  119. pub folder_manager: Arc<Folder2Manager>,
  120. // pub database_manager: Arc<DatabaseManager>,
  121. pub database_manager: Arc<DatabaseManager2>,
  122. pub event_dispatcher: Arc<AFPluginDispatcher>,
  123. pub ws_conn: Arc<FlowyWebSocketConnect>,
  124. pub local_server: Option<Arc<LocalServer>>,
  125. pub task_dispatcher: Arc<RwLock<TaskDispatcher>>,
  126. }
  127. impl AppFlowyCore {
  128. pub fn new(config: AppFlowyCoreConfig) -> Self {
  129. #[cfg(feature = "profiling")]
  130. console_subscriber::init();
  131. init_log(&config);
  132. init_kv(&config.storage_path);
  133. tracing::debug!("🔥 {:?}", config);
  134. let runtime = tokio_default_runtime().unwrap();
  135. let task_scheduler = TaskDispatcher::new(Duration::from_secs(2));
  136. let task_dispatcher = Arc::new(RwLock::new(task_scheduler));
  137. runtime.spawn(TaskRunner::run(task_dispatcher.clone()));
  138. let (local_server, ws_conn) = mk_local_server(&config.server_config);
  139. let (
  140. user_session,
  141. document_manager,
  142. folder_manager,
  143. local_server,
  144. database_manager,
  145. document_manager2,
  146. ) = runtime.block_on(async {
  147. let user_session = mk_user_session(&config, &local_server, &config.server_config);
  148. let document_manager = DocumentDepsResolver::resolve(
  149. local_server.clone(),
  150. ws_conn.clone(),
  151. user_session.clone(),
  152. &config.server_config,
  153. &config.document,
  154. );
  155. let database_manager2 = Database2DepsResolver::resolve(
  156. ws_conn.clone(),
  157. user_session.clone(),
  158. task_dispatcher.clone(),
  159. )
  160. .await;
  161. let folder_manager =
  162. Folder2DepsResolver::resolve(user_session.clone(), &document_manager, &database_manager2)
  163. .await;
  164. let document_manager2 =
  165. Document2DepsResolver::resolve(user_session.clone(), &database_manager2);
  166. if let Some(local_server) = local_server.as_ref() {
  167. local_server.run();
  168. }
  169. ws_conn.init().await;
  170. (
  171. user_session,
  172. document_manager,
  173. folder_manager,
  174. local_server,
  175. database_manager2,
  176. document_manager2,
  177. )
  178. });
  179. let user_status_listener = UserStatusListener {
  180. document_manager: document_manager.clone(),
  181. folder_manager: folder_manager.clone(),
  182. database_manager: database_manager.clone(),
  183. ws_conn: ws_conn.clone(),
  184. config: config.clone(),
  185. };
  186. let user_status_callback = UserStatusCallbackImpl {
  187. listener: Arc::new(user_status_listener),
  188. };
  189. let cloned_user_session = user_session.clone();
  190. runtime.block_on(async move {
  191. cloned_user_session.clone().init(user_status_callback).await;
  192. });
  193. let event_dispatcher = Arc::new(AFPluginDispatcher::construct(runtime, || {
  194. make_plugins(
  195. &ws_conn,
  196. &folder_manager,
  197. &database_manager,
  198. &user_session,
  199. &document_manager,
  200. &document_manager2,
  201. )
  202. }));
  203. _start_listening(&event_dispatcher, &ws_conn, &folder_manager);
  204. Self {
  205. config,
  206. user_session,
  207. document_manager,
  208. document_manager2,
  209. folder_manager,
  210. database_manager,
  211. event_dispatcher,
  212. ws_conn,
  213. local_server,
  214. task_dispatcher,
  215. }
  216. }
  217. pub fn dispatcher(&self) -> Arc<AFPluginDispatcher> {
  218. self.event_dispatcher.clone()
  219. }
  220. }
  221. fn _start_listening(
  222. event_dispatcher: &AFPluginDispatcher,
  223. ws_conn: &Arc<FlowyWebSocketConnect>,
  224. folder_manager: &Arc<Folder2Manager>,
  225. ) {
  226. let subscribe_network_type = ws_conn.subscribe_network_ty();
  227. let folder_manager = folder_manager.clone();
  228. let _cloned_folder_manager = folder_manager;
  229. let ws_conn = ws_conn.clone();
  230. event_dispatcher.spawn(async move {
  231. listen_on_websocket(ws_conn.clone());
  232. });
  233. event_dispatcher.spawn(async move {
  234. _listen_network_status(subscribe_network_type).await;
  235. });
  236. }
  237. fn mk_local_server(
  238. server_config: &ClientServerConfiguration,
  239. ) -> (Option<Arc<LocalServer>>, Arc<FlowyWebSocketConnect>) {
  240. let ws_addr = server_config.ws_addr();
  241. if cfg!(feature = "http_sync") {
  242. let ws_conn = Arc::new(FlowyWebSocketConnect::new(ws_addr));
  243. (None, ws_conn)
  244. } else {
  245. let context = flowy_net::local_server::build_server(server_config);
  246. let local_ws = Arc::new(context.local_ws);
  247. let ws_conn = Arc::new(FlowyWebSocketConnect::from_local(ws_addr, local_ws));
  248. (Some(Arc::new(context.local_server)), ws_conn)
  249. }
  250. }
  251. async fn _listen_network_status(mut subscribe: broadcast::Receiver<NetworkType>) {
  252. while let Ok(_new_type) = subscribe.recv().await {
  253. // core.network_state_changed(new_type);
  254. }
  255. }
  256. fn init_kv(root: &str) {
  257. match flowy_sqlite::kv::KV::init(root) {
  258. Ok(_) => {},
  259. Err(e) => tracing::error!("Init kv store failed: {}", e),
  260. }
  261. }
  262. fn init_log(config: &AppFlowyCoreConfig) {
  263. if !INIT_LOG.load(Ordering::SeqCst) {
  264. INIT_LOG.store(true, Ordering::SeqCst);
  265. let _ = lib_log::Builder::new("AppFlowy-Client", &config.storage_path)
  266. .env_filter(&config.log_filter)
  267. .build();
  268. }
  269. }
  270. fn mk_user_session(
  271. config: &AppFlowyCoreConfig,
  272. local_server: &Option<Arc<LocalServer>>,
  273. server_config: &ClientServerConfiguration,
  274. ) -> Arc<UserSession> {
  275. let user_config = UserSessionConfig::new(&config.name, &config.storage_path);
  276. let cloud_service = UserDepsResolver::resolve(local_server, server_config);
  277. Arc::new(UserSession::new(user_config, cloud_service))
  278. }
  279. struct UserStatusListener {
  280. document_manager: Arc<DocumentManager>,
  281. folder_manager: Arc<Folder2Manager>,
  282. database_manager: Arc<DatabaseManager2>,
  283. ws_conn: Arc<FlowyWebSocketConnect>,
  284. #[allow(dead_code)]
  285. config: AppFlowyCoreConfig,
  286. }
  287. impl UserStatusListener {
  288. async fn did_sign_in(&self, token: &str, user_id: i64) -> FlowyResult<()> {
  289. self.folder_manager.initialize(user_id).await?;
  290. self.document_manager.initialize(user_id).await?;
  291. self.database_manager.initialize(user_id, token).await?;
  292. self
  293. .ws_conn
  294. .start(token.to_owned(), user_id.to_owned())
  295. .await?;
  296. Ok(())
  297. }
  298. async fn did_sign_up(&self, user_profile: &UserProfile) -> FlowyResult<()> {
  299. self
  300. .folder_manager
  301. .initialize_with_new_user(user_profile.id, &user_profile.token)
  302. .await?;
  303. self
  304. .document_manager
  305. .initialize_with_new_user(user_profile.id, &user_profile.token)
  306. .await?;
  307. self
  308. .database_manager
  309. .initialize_with_new_user(user_profile.id, &user_profile.token)
  310. .await?;
  311. self
  312. .ws_conn
  313. .start(user_profile.token.clone(), user_profile.id)
  314. .await?;
  315. Ok(())
  316. }
  317. async fn did_expired(&self, _token: &str, user_id: i64) -> FlowyResult<()> {
  318. self.folder_manager.clear(user_id).await;
  319. self.ws_conn.stop().await;
  320. Ok(())
  321. }
  322. }
  323. struct UserStatusCallbackImpl {
  324. listener: Arc<UserStatusListener>,
  325. }
  326. impl UserStatusCallback for UserStatusCallbackImpl {
  327. fn did_sign_in(&self, token: &str, user_id: i64) -> Fut<FlowyResult<()>> {
  328. let listener = self.listener.clone();
  329. let token = token.to_owned();
  330. let user_id = user_id.to_owned();
  331. to_fut(async move { listener.did_sign_in(&token, user_id).await })
  332. }
  333. fn did_sign_up(&self, user_profile: &UserProfile) -> Fut<FlowyResult<()>> {
  334. let listener = self.listener.clone();
  335. let user_profile = user_profile.clone();
  336. to_fut(async move { listener.did_sign_up(&user_profile).await })
  337. }
  338. fn did_expired(&self, token: &str, user_id: i64) -> Fut<FlowyResult<()>> {
  339. let listener = self.listener.clone();
  340. let token = token.to_owned();
  341. let user_id = user_id.to_owned();
  342. to_fut(async move { listener.did_expired(&token, user_id).await })
  343. }
  344. fn will_migrated(&self, _token: &str, _old_user_id: &str, _user_id: i64) -> Fut<FlowyResult<()>> {
  345. // Read the folder data
  346. todo!()
  347. }
  348. }