lib.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. mod deps_resolve;
  2. pub mod module;
  3. use crate::deps_resolve::*;
  4. use flowy_client_ws::{listen_on_websocket, FlowyWebSocketConnect, NetworkType};
  5. use flowy_database::manager::DatabaseManager;
  6. use flowy_document::entities::DocumentVersionPB;
  7. use flowy_document::{DocumentConfig, DocumentManager};
  8. use flowy_error::FlowyResult;
  9. use flowy_folder::entities::ViewDataFormatPB;
  10. use flowy_folder::{errors::FlowyError, manager::FolderManager};
  11. pub use flowy_net::get_client_server_configuration;
  12. use flowy_net::local_server::LocalServer;
  13. use flowy_net::ClientServerConfiguration;
  14. use flowy_task::{TaskDispatcher, TaskRunner};
  15. use flowy_user::event_map::UserStatusCallback;
  16. use flowy_user::services::{UserSession, UserSessionConfig};
  17. use lib_dispatch::prelude::*;
  18. use lib_dispatch::runtime::tokio_default_runtime;
  19. use lib_infra::future::{to_fut, Fut};
  20. use module::make_plugins;
  21. pub use module::*;
  22. use std::time::Duration;
  23. use std::{
  24. fmt,
  25. sync::{
  26. atomic::{AtomicBool, Ordering},
  27. Arc,
  28. },
  29. };
  30. use tokio::sync::{broadcast, RwLock};
  31. use user_model::UserProfile;
  32. static INIT_LOG: AtomicBool = AtomicBool::new(false);
  33. #[derive(Clone)]
  34. pub struct AppFlowyCoreConfig {
  35. /// Different `AppFlowyCoreConfig` instance should have different name
  36. name: String,
  37. /// Panics if the `root` path is not existing
  38. storage_path: String,
  39. log_filter: String,
  40. server_config: ClientServerConfiguration,
  41. pub document: DocumentConfig,
  42. }
  43. impl fmt::Debug for AppFlowyCoreConfig {
  44. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  45. f.debug_struct("AppFlowyCoreConfig")
  46. .field("storage_path", &self.storage_path)
  47. .field("server-config", &self.server_config)
  48. .field("document-config", &self.document)
  49. .finish()
  50. }
  51. }
  52. impl AppFlowyCoreConfig {
  53. pub fn new(root: &str, name: String, server_config: ClientServerConfiguration) -> Self {
  54. AppFlowyCoreConfig {
  55. name,
  56. storage_path: root.to_owned(),
  57. log_filter: create_log_filter("info".to_owned(), vec![]),
  58. server_config,
  59. document: DocumentConfig::default(),
  60. }
  61. }
  62. pub fn with_document_version(mut self, version: DocumentVersionPB) -> Self {
  63. self.document.version = version;
  64. self
  65. }
  66. pub fn log_filter(mut self, level: &str, with_crates: Vec<String>) -> Self {
  67. self.log_filter = create_log_filter(level.to_owned(), with_crates);
  68. self
  69. }
  70. }
  71. fn create_log_filter(level: String, with_crates: Vec<String>) -> String {
  72. let level = std::env::var("RUST_LOG").unwrap_or(level);
  73. let mut filters = with_crates
  74. .into_iter()
  75. .map(|crate_name| format!("{}={}", crate_name, level))
  76. .collect::<Vec<String>>();
  77. filters.push(format!("flowy_core={}", level));
  78. filters.push(format!("flowy_folder={}", level));
  79. filters.push(format!("flowy_user={}", level));
  80. filters.push(format!("flowy_document={}", level));
  81. filters.push(format!("flowy_database={}", level));
  82. filters.push(format!("flowy_sync={}", "info"));
  83. filters.push(format!("flowy_client_sync={}", "info"));
  84. filters.push(format!("flowy_notification={}", "info"));
  85. filters.push(format!("lib_ot={}", level));
  86. filters.push(format!("lib_ws={}", level));
  87. filters.push(format!("lib_infra={}", level));
  88. filters.push(format!("flowy_sync={}", level));
  89. filters.push(format!("flowy_revision={}", level));
  90. filters.push(format!("flowy_revision_persistence={}", level));
  91. filters.push(format!("flowy_task={}", level));
  92. // filters.push(format!("lib_dispatch={}", level));
  93. filters.push(format!("dart_ffi={}", "info"));
  94. filters.push(format!("flowy_sqlite={}", "info"));
  95. filters.push(format!("flowy_net={}", "info"));
  96. filters.push(format!("tokio=trace,runtime=trace"));
  97. filters.join(",")
  98. }
  99. #[derive(Clone)]
  100. pub struct AppFlowyCore {
  101. #[allow(dead_code)]
  102. pub config: AppFlowyCoreConfig,
  103. pub user_session: Arc<UserSession>,
  104. pub document_manager: Arc<DocumentManager>,
  105. pub folder_manager: Arc<FolderManager>,
  106. pub database_manager: Arc<DatabaseManager>,
  107. pub event_dispatcher: Arc<AFPluginDispatcher>,
  108. pub ws_conn: Arc<FlowyWebSocketConnect>,
  109. pub local_server: Option<Arc<LocalServer>>,
  110. pub task_dispatcher: Arc<RwLock<TaskDispatcher>>,
  111. }
  112. impl AppFlowyCore {
  113. pub fn new(config: AppFlowyCoreConfig) -> Self {
  114. console_subscriber::init();
  115. init_log(&config);
  116. init_kv(&config.storage_path);
  117. tracing::debug!("🔥 {:?}", config);
  118. let runtime = tokio_default_runtime().unwrap();
  119. let task_scheduler = TaskDispatcher::new(Duration::from_secs(2));
  120. let task_dispatcher = Arc::new(RwLock::new(task_scheduler));
  121. runtime.spawn(TaskRunner::run(task_dispatcher.clone()));
  122. let (local_server, ws_conn) = mk_local_server(&config.server_config);
  123. let (user_session, document_manager, folder_manager, local_server, database_manager) = runtime
  124. .block_on(async {
  125. let user_session = mk_user_session(&config, &local_server, &config.server_config);
  126. let document_manager = DocumentDepsResolver::resolve(
  127. local_server.clone(),
  128. ws_conn.clone(),
  129. user_session.clone(),
  130. &config.server_config,
  131. &config.document,
  132. );
  133. let database_manager = DatabaseDepsResolver::resolve(
  134. ws_conn.clone(),
  135. user_session.clone(),
  136. task_dispatcher.clone(),
  137. )
  138. .await;
  139. let folder_manager = FolderDepsResolver::resolve(
  140. local_server.clone(),
  141. user_session.clone(),
  142. &config.server_config,
  143. &ws_conn,
  144. &document_manager,
  145. &database_manager,
  146. )
  147. .await;
  148. if let Some(local_server) = local_server.as_ref() {
  149. local_server.run();
  150. }
  151. ws_conn.init().await;
  152. (
  153. user_session,
  154. document_manager,
  155. folder_manager,
  156. local_server,
  157. database_manager,
  158. )
  159. });
  160. let user_status_listener = UserStatusListener {
  161. document_manager: document_manager.clone(),
  162. folder_manager: folder_manager.clone(),
  163. database_manager: database_manager.clone(),
  164. ws_conn: ws_conn.clone(),
  165. config: config.clone(),
  166. };
  167. let user_status_callback = UserStatusCallbackImpl {
  168. listener: Arc::new(user_status_listener),
  169. };
  170. let cloned_user_session = user_session.clone();
  171. runtime.block_on(async move {
  172. cloned_user_session.clone().init(user_status_callback).await;
  173. });
  174. let event_dispatcher = Arc::new(AFPluginDispatcher::construct(runtime, || {
  175. make_plugins(
  176. &ws_conn,
  177. &folder_manager,
  178. &database_manager,
  179. &user_session,
  180. &document_manager,
  181. )
  182. }));
  183. _start_listening(&event_dispatcher, &ws_conn, &folder_manager);
  184. Self {
  185. config,
  186. user_session,
  187. document_manager,
  188. folder_manager,
  189. database_manager,
  190. event_dispatcher,
  191. ws_conn,
  192. local_server,
  193. task_dispatcher,
  194. }
  195. }
  196. pub fn dispatcher(&self) -> Arc<AFPluginDispatcher> {
  197. self.event_dispatcher.clone()
  198. }
  199. }
  200. fn _start_listening(
  201. event_dispatcher: &AFPluginDispatcher,
  202. ws_conn: &Arc<FlowyWebSocketConnect>,
  203. folder_manager: &Arc<FolderManager>,
  204. ) {
  205. let subscribe_network_type = ws_conn.subscribe_network_ty();
  206. let folder_manager = folder_manager.clone();
  207. let cloned_folder_manager = folder_manager;
  208. let ws_conn = ws_conn.clone();
  209. event_dispatcher.spawn(async move {
  210. listen_on_websocket(ws_conn.clone());
  211. });
  212. event_dispatcher.spawn(async move {
  213. _listen_network_status(subscribe_network_type, cloned_folder_manager).await;
  214. });
  215. }
  216. fn mk_local_server(
  217. server_config: &ClientServerConfiguration,
  218. ) -> (Option<Arc<LocalServer>>, Arc<FlowyWebSocketConnect>) {
  219. let ws_addr = server_config.ws_addr();
  220. if cfg!(feature = "http_sync") {
  221. let ws_conn = Arc::new(FlowyWebSocketConnect::new(ws_addr));
  222. (None, ws_conn)
  223. } else {
  224. let context = flowy_net::local_server::build_server(server_config);
  225. let local_ws = Arc::new(context.local_ws);
  226. let ws_conn = Arc::new(FlowyWebSocketConnect::from_local(ws_addr, local_ws));
  227. (Some(Arc::new(context.local_server)), ws_conn)
  228. }
  229. }
  230. async fn _listen_network_status(
  231. mut subscribe: broadcast::Receiver<NetworkType>,
  232. _core: Arc<FolderManager>,
  233. ) {
  234. while let Ok(_new_type) = subscribe.recv().await {
  235. // core.network_state_changed(new_type);
  236. }
  237. }
  238. fn init_kv(root: &str) {
  239. match flowy_sqlite::kv::KV::init(root) {
  240. Ok(_) => {},
  241. Err(e) => tracing::error!("Init kv store failed: {}", e),
  242. }
  243. }
  244. fn init_log(config: &AppFlowyCoreConfig) {
  245. if !INIT_LOG.load(Ordering::SeqCst) {
  246. INIT_LOG.store(true, Ordering::SeqCst);
  247. let _ = lib_log::Builder::new("AppFlowy-Client", &config.storage_path)
  248. .env_filter(&config.log_filter)
  249. .build();
  250. }
  251. }
  252. fn mk_user_session(
  253. config: &AppFlowyCoreConfig,
  254. local_server: &Option<Arc<LocalServer>>,
  255. server_config: &ClientServerConfiguration,
  256. ) -> Arc<UserSession> {
  257. let user_config = UserSessionConfig::new(&config.name, &config.storage_path);
  258. let cloud_service = UserDepsResolver::resolve(local_server, server_config);
  259. Arc::new(UserSession::new(user_config, cloud_service))
  260. }
  261. struct UserStatusListener {
  262. document_manager: Arc<DocumentManager>,
  263. folder_manager: Arc<FolderManager>,
  264. database_manager: Arc<DatabaseManager>,
  265. ws_conn: Arc<FlowyWebSocketConnect>,
  266. config: AppFlowyCoreConfig,
  267. }
  268. impl UserStatusListener {
  269. async fn did_sign_in(&self, token: &str, user_id: &str) -> FlowyResult<()> {
  270. self.folder_manager.initialize(user_id, token).await?;
  271. self.document_manager.initialize(user_id).await?;
  272. self.database_manager.initialize(user_id, token).await?;
  273. self
  274. .ws_conn
  275. .start(token.to_owned(), user_id.to_owned())
  276. .await?;
  277. Ok(())
  278. }
  279. async fn did_sign_up(&self, user_profile: &UserProfile) -> FlowyResult<()> {
  280. let view_data_type = match self.config.document.version {
  281. DocumentVersionPB::V0 => ViewDataFormatPB::DeltaFormat,
  282. DocumentVersionPB::V1 => ViewDataFormatPB::NodeFormat,
  283. };
  284. self
  285. .folder_manager
  286. .initialize_with_new_user(&user_profile.id, &user_profile.token, view_data_type)
  287. .await?;
  288. self
  289. .document_manager
  290. .initialize_with_new_user(&user_profile.id, &user_profile.token)
  291. .await?;
  292. self
  293. .database_manager
  294. .initialize_with_new_user(&user_profile.id, &user_profile.token)
  295. .await?;
  296. self
  297. .ws_conn
  298. .start(user_profile.token.clone(), user_profile.id.clone())
  299. .await?;
  300. Ok(())
  301. }
  302. async fn did_expired(&self, _token: &str, user_id: &str) -> FlowyResult<()> {
  303. self.folder_manager.clear(user_id).await;
  304. self.ws_conn.stop().await;
  305. Ok(())
  306. }
  307. }
  308. struct UserStatusCallbackImpl {
  309. listener: Arc<UserStatusListener>,
  310. }
  311. impl UserStatusCallback for UserStatusCallbackImpl {
  312. fn did_sign_in(&self, token: &str, user_id: &str) -> Fut<FlowyResult<()>> {
  313. let listener = self.listener.clone();
  314. let token = token.to_owned();
  315. let user_id = user_id.to_owned();
  316. to_fut(async move { listener.did_sign_in(&token, &user_id).await })
  317. }
  318. fn did_sign_up(&self, user_profile: &UserProfile) -> Fut<FlowyResult<()>> {
  319. let listener = self.listener.clone();
  320. let user_profile = user_profile.clone();
  321. to_fut(async move { listener.did_sign_up(&user_profile).await })
  322. }
  323. fn did_expired(&self, token: &str, user_id: &str) -> Fut<FlowyResult<()>> {
  324. let listener = self.listener.clone();
  325. let token = token.to_owned();
  326. let user_id = user_id.to_owned();
  327. to_fut(async move { listener.did_expired(&token, &user_id).await })
  328. }
  329. }