lib.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. mod deps_resolve;
  2. pub mod module;
  3. use crate::deps_resolve::*;
  4. use flowy_client_ws::{listen_on_websocket, FlowyWebSocketConnect, NetworkType};
  5. use flowy_database::manager::DatabaseManager;
  6. use flowy_document::entities::DocumentVersionPB;
  7. use flowy_document::{DocumentConfig, DocumentManager};
  8. use flowy_error::FlowyResult;
  9. use flowy_folder::entities::ViewDataFormatPB;
  10. use flowy_folder::{errors::FlowyError, manager::FolderManager};
  11. pub use flowy_net::get_client_server_configuration;
  12. use flowy_net::local_server::LocalServer;
  13. use flowy_net::ClientServerConfiguration;
  14. use flowy_task::{TaskDispatcher, TaskRunner};
  15. use flowy_user::event_map::UserStatusCallback;
  16. use flowy_user::services::{UserSession, UserSessionConfig};
  17. use lib_dispatch::prelude::*;
  18. use lib_dispatch::runtime::tokio_default_runtime;
  19. use lib_infra::future::{to_fut, Fut};
  20. use module::make_plugins;
  21. pub use module::*;
  22. use std::time::Duration;
  23. use std::{
  24. fmt,
  25. sync::{
  26. atomic::{AtomicBool, Ordering},
  27. Arc,
  28. },
  29. };
  30. use tokio::sync::{broadcast, RwLock};
  31. use user_model::UserProfile;
  32. static INIT_LOG: AtomicBool = AtomicBool::new(false);
  33. /// This name will be used as to identify the current [AppFlowyCore] instance.
  34. /// Don't change this.
  35. pub const DEFAULT_NAME: &str = "appflowy";
  36. #[derive(Clone)]
  37. pub struct AppFlowyCoreConfig {
  38. /// Different `AppFlowyCoreConfig` instance should have different name
  39. name: String,
  40. /// Panics if the `root` path is not existing
  41. storage_path: String,
  42. log_filter: String,
  43. server_config: ClientServerConfiguration,
  44. pub document: DocumentConfig,
  45. }
  46. impl fmt::Debug for AppFlowyCoreConfig {
  47. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  48. f.debug_struct("AppFlowyCoreConfig")
  49. .field("storage_path", &self.storage_path)
  50. .field("server-config", &self.server_config)
  51. .field("document-config", &self.document)
  52. .finish()
  53. }
  54. }
  55. impl AppFlowyCoreConfig {
  56. pub fn new(root: &str, name: String, server_config: ClientServerConfiguration) -> Self {
  57. AppFlowyCoreConfig {
  58. name,
  59. storage_path: root.to_owned(),
  60. log_filter: create_log_filter("info".to_owned(), vec![]),
  61. server_config,
  62. document: DocumentConfig::default(),
  63. }
  64. }
  65. pub fn with_document_version(mut self, version: DocumentVersionPB) -> Self {
  66. self.document.version = version;
  67. self
  68. }
  69. pub fn log_filter(mut self, level: &str, with_crates: Vec<String>) -> Self {
  70. self.log_filter = create_log_filter(level.to_owned(), with_crates);
  71. self
  72. }
  73. }
  74. fn create_log_filter(level: String, with_crates: Vec<String>) -> String {
  75. let level = std::env::var("RUST_LOG").unwrap_or(level);
  76. let mut filters = with_crates
  77. .into_iter()
  78. .map(|crate_name| format!("{}={}", crate_name, level))
  79. .collect::<Vec<String>>();
  80. filters.push(format!("flowy_core={}", level));
  81. filters.push(format!("flowy_folder={}", level));
  82. filters.push(format!("flowy_user={}", level));
  83. filters.push(format!("flowy_document={}", level));
  84. filters.push(format!("flowy_database={}", level));
  85. filters.push(format!("flowy_sync={}", "info"));
  86. filters.push(format!("flowy_client_sync={}", "info"));
  87. filters.push(format!("flowy_notification={}", "info"));
  88. filters.push(format!("lib_ot={}", level));
  89. filters.push(format!("lib_ws={}", level));
  90. filters.push(format!("lib_infra={}", level));
  91. filters.push(format!("flowy_sync={}", level));
  92. filters.push(format!("flowy_revision={}", level));
  93. filters.push(format!("flowy_revision_persistence={}", level));
  94. filters.push(format!("flowy_task={}", level));
  95. // filters.push(format!("lib_dispatch={}", level));
  96. filters.push(format!("dart_ffi={}", "info"));
  97. filters.push(format!("flowy_sqlite={}", "info"));
  98. filters.push(format!("flowy_net={}", "info"));
  99. #[cfg(feature = "profiling")]
  100. filters.push(format!("tokio={}", level));
  101. #[cfg(feature = "profiling")]
  102. filters.push(format!("runtime={}", level));
  103. filters.join(",")
  104. }
  105. #[derive(Clone)]
  106. pub struct AppFlowyCore {
  107. #[allow(dead_code)]
  108. pub config: AppFlowyCoreConfig,
  109. pub user_session: Arc<UserSession>,
  110. pub document_manager: Arc<DocumentManager>,
  111. pub folder_manager: Arc<FolderManager>,
  112. pub database_manager: Arc<DatabaseManager>,
  113. pub event_dispatcher: Arc<AFPluginDispatcher>,
  114. pub ws_conn: Arc<FlowyWebSocketConnect>,
  115. pub local_server: Option<Arc<LocalServer>>,
  116. pub task_dispatcher: Arc<RwLock<TaskDispatcher>>,
  117. }
  118. impl AppFlowyCore {
  119. pub fn new(config: AppFlowyCoreConfig) -> Self {
  120. #[cfg(feature = "profiling")]
  121. console_subscriber::init();
  122. init_log(&config);
  123. init_kv(&config.storage_path);
  124. tracing::debug!("🔥 {:?}", config);
  125. let runtime = tokio_default_runtime().unwrap();
  126. let task_scheduler = TaskDispatcher::new(Duration::from_secs(2));
  127. let task_dispatcher = Arc::new(RwLock::new(task_scheduler));
  128. runtime.spawn(TaskRunner::run(task_dispatcher.clone()));
  129. let (local_server, ws_conn) = mk_local_server(&config.server_config);
  130. let (user_session, document_manager, folder_manager, local_server, database_manager) = runtime
  131. .block_on(async {
  132. let user_session = mk_user_session(&config, &local_server, &config.server_config);
  133. let document_manager = DocumentDepsResolver::resolve(
  134. local_server.clone(),
  135. ws_conn.clone(),
  136. user_session.clone(),
  137. &config.server_config,
  138. &config.document,
  139. );
  140. let database_manager = DatabaseDepsResolver::resolve(
  141. ws_conn.clone(),
  142. user_session.clone(),
  143. task_dispatcher.clone(),
  144. )
  145. .await;
  146. let folder_manager = FolderDepsResolver::resolve(
  147. local_server.clone(),
  148. user_session.clone(),
  149. &config.server_config,
  150. &ws_conn,
  151. &document_manager,
  152. &database_manager,
  153. )
  154. .await;
  155. if let Some(local_server) = local_server.as_ref() {
  156. local_server.run();
  157. }
  158. ws_conn.init().await;
  159. (
  160. user_session,
  161. document_manager,
  162. folder_manager,
  163. local_server,
  164. database_manager,
  165. )
  166. });
  167. let user_status_listener = UserStatusListener {
  168. document_manager: document_manager.clone(),
  169. folder_manager: folder_manager.clone(),
  170. database_manager: database_manager.clone(),
  171. ws_conn: ws_conn.clone(),
  172. config: config.clone(),
  173. };
  174. let user_status_callback = UserStatusCallbackImpl {
  175. listener: Arc::new(user_status_listener),
  176. };
  177. let cloned_user_session = user_session.clone();
  178. runtime.block_on(async move {
  179. cloned_user_session.clone().init(user_status_callback).await;
  180. });
  181. let event_dispatcher = Arc::new(AFPluginDispatcher::construct(runtime, || {
  182. make_plugins(
  183. &ws_conn,
  184. &folder_manager,
  185. &database_manager,
  186. &user_session,
  187. &document_manager,
  188. )
  189. }));
  190. _start_listening(&event_dispatcher, &ws_conn, &folder_manager);
  191. Self {
  192. config,
  193. user_session,
  194. document_manager,
  195. folder_manager,
  196. database_manager,
  197. event_dispatcher,
  198. ws_conn,
  199. local_server,
  200. task_dispatcher,
  201. }
  202. }
  203. pub fn dispatcher(&self) -> Arc<AFPluginDispatcher> {
  204. self.event_dispatcher.clone()
  205. }
  206. }
  207. fn _start_listening(
  208. event_dispatcher: &AFPluginDispatcher,
  209. ws_conn: &Arc<FlowyWebSocketConnect>,
  210. folder_manager: &Arc<FolderManager>,
  211. ) {
  212. let subscribe_network_type = ws_conn.subscribe_network_ty();
  213. let folder_manager = folder_manager.clone();
  214. let cloned_folder_manager = folder_manager;
  215. let ws_conn = ws_conn.clone();
  216. event_dispatcher.spawn(async move {
  217. listen_on_websocket(ws_conn.clone());
  218. });
  219. event_dispatcher.spawn(async move {
  220. _listen_network_status(subscribe_network_type, cloned_folder_manager).await;
  221. });
  222. }
  223. fn mk_local_server(
  224. server_config: &ClientServerConfiguration,
  225. ) -> (Option<Arc<LocalServer>>, Arc<FlowyWebSocketConnect>) {
  226. let ws_addr = server_config.ws_addr();
  227. if cfg!(feature = "http_sync") {
  228. let ws_conn = Arc::new(FlowyWebSocketConnect::new(ws_addr));
  229. (None, ws_conn)
  230. } else {
  231. let context = flowy_net::local_server::build_server(server_config);
  232. let local_ws = Arc::new(context.local_ws);
  233. let ws_conn = Arc::new(FlowyWebSocketConnect::from_local(ws_addr, local_ws));
  234. (Some(Arc::new(context.local_server)), ws_conn)
  235. }
  236. }
  237. async fn _listen_network_status(
  238. mut subscribe: broadcast::Receiver<NetworkType>,
  239. _core: Arc<FolderManager>,
  240. ) {
  241. while let Ok(_new_type) = subscribe.recv().await {
  242. // core.network_state_changed(new_type);
  243. }
  244. }
  245. fn init_kv(root: &str) {
  246. match flowy_sqlite::kv::KV::init(root) {
  247. Ok(_) => {},
  248. Err(e) => tracing::error!("Init kv store failed: {}", e),
  249. }
  250. }
  251. fn init_log(config: &AppFlowyCoreConfig) {
  252. if !INIT_LOG.load(Ordering::SeqCst) {
  253. INIT_LOG.store(true, Ordering::SeqCst);
  254. let _ = lib_log::Builder::new("AppFlowy-Client", &config.storage_path)
  255. .env_filter(&config.log_filter)
  256. .build();
  257. }
  258. }
  259. fn mk_user_session(
  260. config: &AppFlowyCoreConfig,
  261. local_server: &Option<Arc<LocalServer>>,
  262. server_config: &ClientServerConfiguration,
  263. ) -> Arc<UserSession> {
  264. let user_config = UserSessionConfig::new(&config.name, &config.storage_path);
  265. let cloud_service = UserDepsResolver::resolve(local_server, server_config);
  266. Arc::new(UserSession::new(user_config, cloud_service))
  267. }
  268. struct UserStatusListener {
  269. document_manager: Arc<DocumentManager>,
  270. folder_manager: Arc<FolderManager>,
  271. database_manager: Arc<DatabaseManager>,
  272. ws_conn: Arc<FlowyWebSocketConnect>,
  273. config: AppFlowyCoreConfig,
  274. }
  275. impl UserStatusListener {
  276. async fn did_sign_in(&self, token: &str, user_id: &str) -> FlowyResult<()> {
  277. self.folder_manager.initialize(user_id, token).await?;
  278. self.document_manager.initialize(user_id).await?;
  279. self.database_manager.initialize(user_id, token).await?;
  280. self
  281. .ws_conn
  282. .start(token.to_owned(), user_id.to_owned())
  283. .await?;
  284. Ok(())
  285. }
  286. async fn did_sign_up(&self, user_profile: &UserProfile) -> FlowyResult<()> {
  287. let view_data_type = match self.config.document.version {
  288. DocumentVersionPB::V0 => ViewDataFormatPB::DeltaFormat,
  289. DocumentVersionPB::V1 => ViewDataFormatPB::NodeFormat,
  290. };
  291. self
  292. .folder_manager
  293. .initialize_with_new_user(&user_profile.id, &user_profile.token, view_data_type)
  294. .await?;
  295. self
  296. .document_manager
  297. .initialize_with_new_user(&user_profile.id, &user_profile.token)
  298. .await?;
  299. self
  300. .database_manager
  301. .initialize_with_new_user(&user_profile.id, &user_profile.token)
  302. .await?;
  303. self
  304. .ws_conn
  305. .start(user_profile.token.clone(), user_profile.id.clone())
  306. .await?;
  307. Ok(())
  308. }
  309. async fn did_expired(&self, _token: &str, user_id: &str) -> FlowyResult<()> {
  310. self.folder_manager.clear(user_id).await;
  311. self.ws_conn.stop().await;
  312. Ok(())
  313. }
  314. }
  315. struct UserStatusCallbackImpl {
  316. listener: Arc<UserStatusListener>,
  317. }
  318. impl UserStatusCallback for UserStatusCallbackImpl {
  319. fn did_sign_in(&self, token: &str, user_id: &str) -> Fut<FlowyResult<()>> {
  320. let listener = self.listener.clone();
  321. let token = token.to_owned();
  322. let user_id = user_id.to_owned();
  323. to_fut(async move { listener.did_sign_in(&token, &user_id).await })
  324. }
  325. fn did_sign_up(&self, user_profile: &UserProfile) -> Fut<FlowyResult<()>> {
  326. let listener = self.listener.clone();
  327. let user_profile = user_profile.clone();
  328. to_fut(async move { listener.did_sign_up(&user_profile).await })
  329. }
  330. fn did_expired(&self, token: &str, user_id: &str) -> Fut<FlowyResult<()>> {
  331. let listener = self.listener.clone();
  332. let token = token.to_owned();
  333. let user_id = user_id.to_owned();
  334. to_fut(async move { listener.did_expired(&token, &user_id).await })
  335. }
  336. }