lib.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. mod deps_resolve;
  2. pub mod module;
  3. use crate::deps_resolve::*;
  4. use flowy_client_ws::{listen_on_websocket, FlowyWebSocketConnect, NetworkType};
  5. use flowy_database::manager::DatabaseManager;
  6. use flowy_document::entities::DocumentVersionPB;
  7. use flowy_document::{DocumentConfig, DocumentManager};
  8. use flowy_error::FlowyResult;
  9. use flowy_folder::entities::ViewDataFormatPB;
  10. use flowy_folder::{errors::FlowyError, manager::FolderManager};
  11. pub use flowy_net::get_client_server_configuration;
  12. use flowy_net::local_server::LocalServer;
  13. use flowy_net::ClientServerConfiguration;
  14. use flowy_task::{TaskDispatcher, TaskRunner};
  15. use flowy_user::event_map::UserStatusCallback;
  16. use flowy_user::services::{UserSession, UserSessionConfig};
  17. use lib_dispatch::prelude::*;
  18. use lib_dispatch::runtime::tokio_default_runtime;
  19. use lib_infra::future::{to_fut, Fut};
  20. use module::make_plugins;
  21. pub use module::*;
  22. use std::time::Duration;
  23. use std::{
  24. fmt,
  25. sync::{
  26. atomic::{AtomicBool, Ordering},
  27. Arc,
  28. },
  29. };
  30. use tokio::sync::{broadcast, RwLock};
  31. use user_model::UserProfile;
  32. static INIT_LOG: AtomicBool = AtomicBool::new(false);
  33. #[derive(Clone)]
  34. pub struct AppFlowyCoreConfig {
  35. /// Different `AppFlowyCoreConfig` instance should have different name
  36. name: String,
  37. /// Panics if the `root` path is not existing
  38. storage_path: String,
  39. log_filter: String,
  40. server_config: ClientServerConfiguration,
  41. pub document: DocumentConfig,
  42. }
  43. impl fmt::Debug for AppFlowyCoreConfig {
  44. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  45. f.debug_struct("AppFlowyCoreConfig")
  46. .field("storage_path", &self.storage_path)
  47. .field("server-config", &self.server_config)
  48. .field("document-config", &self.document)
  49. .finish()
  50. }
  51. }
  52. impl AppFlowyCoreConfig {
  53. pub fn new(root: &str, name: String, server_config: ClientServerConfiguration) -> Self {
  54. AppFlowyCoreConfig {
  55. name,
  56. storage_path: root.to_owned(),
  57. log_filter: create_log_filter("info".to_owned(), vec![]),
  58. server_config,
  59. document: DocumentConfig::default(),
  60. }
  61. }
  62. pub fn with_document_version(mut self, version: DocumentVersionPB) -> Self {
  63. self.document.version = version;
  64. self
  65. }
  66. pub fn log_filter(mut self, level: &str, with_crates: Vec<String>) -> Self {
  67. self.log_filter = create_log_filter(level.to_owned(), with_crates);
  68. self
  69. }
  70. }
  71. fn create_log_filter(level: String, with_crates: Vec<String>) -> String {
  72. let level = std::env::var("RUST_LOG").unwrap_or(level);
  73. let mut filters = with_crates
  74. .into_iter()
  75. .map(|crate_name| format!("{}={}", crate_name, level))
  76. .collect::<Vec<String>>();
  77. filters.push(format!("flowy_core={}", level));
  78. filters.push(format!("flowy_folder={}", level));
  79. filters.push(format!("flowy_user={}", level));
  80. filters.push(format!("flowy_document={}", level));
  81. filters.push(format!("flowy_database={}", level));
  82. filters.push(format!("flowy_sync={}", "info"));
  83. filters.push(format!("flowy_client_sync={}", "info"));
  84. filters.push(format!("flowy_notification={}", "info"));
  85. filters.push(format!("lib_ot={}", level));
  86. filters.push(format!("lib_ws={}", level));
  87. filters.push(format!("lib_infra={}", level));
  88. filters.push(format!("flowy_sync={}", level));
  89. filters.push(format!("flowy_revision={}", level));
  90. filters.push(format!("flowy_revision_persistence={}", level));
  91. filters.push(format!("flowy_task={}", level));
  92. // filters.push(format!("lib_dispatch={}", level));
  93. filters.push(format!("dart_ffi={}", "info"));
  94. filters.push(format!("flowy_sqlite={}", "info"));
  95. filters.push(format!("flowy_net={}", "info"));
  96. filters.join(",")
  97. }
  98. #[derive(Clone)]
  99. pub struct AppFlowyCore {
  100. #[allow(dead_code)]
  101. pub config: AppFlowyCoreConfig,
  102. pub user_session: Arc<UserSession>,
  103. pub document_manager: Arc<DocumentManager>,
  104. pub folder_manager: Arc<FolderManager>,
  105. pub database_manager: Arc<DatabaseManager>,
  106. pub event_dispatcher: Arc<AFPluginDispatcher>,
  107. pub ws_conn: Arc<FlowyWebSocketConnect>,
  108. pub local_server: Option<Arc<LocalServer>>,
  109. pub task_dispatcher: Arc<RwLock<TaskDispatcher>>,
  110. }
  111. impl AppFlowyCore {
  112. pub fn new(config: AppFlowyCoreConfig) -> Self {
  113. init_log(&config);
  114. init_kv(&config.storage_path);
  115. tracing::debug!("🔥 {:?}", config);
  116. let runtime = tokio_default_runtime().unwrap();
  117. let task_scheduler = TaskDispatcher::new(Duration::from_secs(2));
  118. let task_dispatcher = Arc::new(RwLock::new(task_scheduler));
  119. runtime.spawn(TaskRunner::run(task_dispatcher.clone()));
  120. let (local_server, ws_conn) = mk_local_server(&config.server_config);
  121. let (user_session, document_manager, folder_manager, local_server, database_manager) = runtime
  122. .block_on(async {
  123. let user_session = mk_user_session(&config, &local_server, &config.server_config);
  124. let document_manager = DocumentDepsResolver::resolve(
  125. local_server.clone(),
  126. ws_conn.clone(),
  127. user_session.clone(),
  128. &config.server_config,
  129. &config.document,
  130. );
  131. let database_manager = DatabaseDepsResolver::resolve(
  132. ws_conn.clone(),
  133. user_session.clone(),
  134. task_dispatcher.clone(),
  135. )
  136. .await;
  137. let folder_manager = FolderDepsResolver::resolve(
  138. local_server.clone(),
  139. user_session.clone(),
  140. &config.server_config,
  141. &ws_conn,
  142. &document_manager,
  143. &database_manager,
  144. )
  145. .await;
  146. if let Some(local_server) = local_server.as_ref() {
  147. local_server.run();
  148. }
  149. ws_conn.init().await;
  150. (
  151. user_session,
  152. document_manager,
  153. folder_manager,
  154. local_server,
  155. database_manager,
  156. )
  157. });
  158. let user_status_listener = UserStatusListener {
  159. document_manager: document_manager.clone(),
  160. folder_manager: folder_manager.clone(),
  161. database_manager: database_manager.clone(),
  162. ws_conn: ws_conn.clone(),
  163. config: config.clone(),
  164. };
  165. let user_status_callback = UserStatusCallbackImpl {
  166. listener: Arc::new(user_status_listener),
  167. };
  168. let cloned_user_session = user_session.clone();
  169. runtime.block_on(async move {
  170. cloned_user_session.clone().init(user_status_callback).await;
  171. });
  172. let event_dispatcher = Arc::new(AFPluginDispatcher::construct(runtime, || {
  173. make_plugins(
  174. &ws_conn,
  175. &folder_manager,
  176. &database_manager,
  177. &user_session,
  178. &document_manager,
  179. )
  180. }));
  181. _start_listening(&event_dispatcher, &ws_conn, &folder_manager);
  182. Self {
  183. config,
  184. user_session,
  185. document_manager,
  186. folder_manager,
  187. database_manager,
  188. event_dispatcher,
  189. ws_conn,
  190. local_server,
  191. task_dispatcher,
  192. }
  193. }
  194. pub fn dispatcher(&self) -> Arc<AFPluginDispatcher> {
  195. self.event_dispatcher.clone()
  196. }
  197. }
  198. fn _start_listening(
  199. event_dispatcher: &AFPluginDispatcher,
  200. ws_conn: &Arc<FlowyWebSocketConnect>,
  201. folder_manager: &Arc<FolderManager>,
  202. ) {
  203. let subscribe_network_type = ws_conn.subscribe_network_ty();
  204. let folder_manager = folder_manager.clone();
  205. let cloned_folder_manager = folder_manager;
  206. let ws_conn = ws_conn.clone();
  207. event_dispatcher.spawn(async move {
  208. listen_on_websocket(ws_conn.clone());
  209. });
  210. event_dispatcher.spawn(async move {
  211. _listen_network_status(subscribe_network_type, cloned_folder_manager).await;
  212. });
  213. }
  214. fn mk_local_server(
  215. server_config: &ClientServerConfiguration,
  216. ) -> (Option<Arc<LocalServer>>, Arc<FlowyWebSocketConnect>) {
  217. let ws_addr = server_config.ws_addr();
  218. if cfg!(feature = "http_sync") {
  219. let ws_conn = Arc::new(FlowyWebSocketConnect::new(ws_addr));
  220. (None, ws_conn)
  221. } else {
  222. let context = flowy_net::local_server::build_server(server_config);
  223. let local_ws = Arc::new(context.local_ws);
  224. let ws_conn = Arc::new(FlowyWebSocketConnect::from_local(ws_addr, local_ws));
  225. (Some(Arc::new(context.local_server)), ws_conn)
  226. }
  227. }
  228. async fn _listen_network_status(
  229. mut subscribe: broadcast::Receiver<NetworkType>,
  230. _core: Arc<FolderManager>,
  231. ) {
  232. while let Ok(_new_type) = subscribe.recv().await {
  233. // core.network_state_changed(new_type);
  234. }
  235. }
  236. fn init_kv(root: &str) {
  237. match flowy_sqlite::kv::KV::init(root) {
  238. Ok(_) => {},
  239. Err(e) => tracing::error!("Init kv store failed: {}", e),
  240. }
  241. }
  242. fn init_log(config: &AppFlowyCoreConfig) {
  243. if !INIT_LOG.load(Ordering::SeqCst) {
  244. INIT_LOG.store(true, Ordering::SeqCst);
  245. let _ = lib_log::Builder::new("AppFlowy-Client", &config.storage_path)
  246. .env_filter(&config.log_filter)
  247. .build();
  248. }
  249. }
  250. fn mk_user_session(
  251. config: &AppFlowyCoreConfig,
  252. local_server: &Option<Arc<LocalServer>>,
  253. server_config: &ClientServerConfiguration,
  254. ) -> Arc<UserSession> {
  255. let user_config = UserSessionConfig::new(&config.name, &config.storage_path);
  256. let cloud_service = UserDepsResolver::resolve(local_server, server_config);
  257. Arc::new(UserSession::new(user_config, cloud_service))
  258. }
  259. struct UserStatusListener {
  260. document_manager: Arc<DocumentManager>,
  261. folder_manager: Arc<FolderManager>,
  262. database_manager: Arc<DatabaseManager>,
  263. ws_conn: Arc<FlowyWebSocketConnect>,
  264. config: AppFlowyCoreConfig,
  265. }
  266. impl UserStatusListener {
  267. async fn did_sign_in(&self, token: &str, user_id: &str) -> FlowyResult<()> {
  268. self.folder_manager.initialize(user_id, token).await?;
  269. self.document_manager.initialize(user_id).await?;
  270. self.database_manager.initialize(user_id, token).await?;
  271. self
  272. .ws_conn
  273. .start(token.to_owned(), user_id.to_owned())
  274. .await?;
  275. Ok(())
  276. }
  277. async fn did_sign_up(&self, user_profile: &UserProfile) -> FlowyResult<()> {
  278. let view_data_type = match self.config.document.version {
  279. DocumentVersionPB::V0 => ViewDataFormatPB::DeltaFormat,
  280. DocumentVersionPB::V1 => ViewDataFormatPB::NodeFormat,
  281. };
  282. self
  283. .folder_manager
  284. .initialize_with_new_user(&user_profile.id, &user_profile.token, view_data_type)
  285. .await?;
  286. self
  287. .document_manager
  288. .initialize_with_new_user(&user_profile.id, &user_profile.token)
  289. .await?;
  290. self
  291. .database_manager
  292. .initialize_with_new_user(&user_profile.id, &user_profile.token)
  293. .await?;
  294. self
  295. .ws_conn
  296. .start(user_profile.token.clone(), user_profile.id.clone())
  297. .await?;
  298. Ok(())
  299. }
  300. async fn did_expired(&self, _token: &str, user_id: &str) -> FlowyResult<()> {
  301. self.folder_manager.clear(user_id).await;
  302. self.ws_conn.stop().await;
  303. Ok(())
  304. }
  305. }
  306. struct UserStatusCallbackImpl {
  307. listener: Arc<UserStatusListener>,
  308. }
  309. impl UserStatusCallback for UserStatusCallbackImpl {
  310. fn did_sign_in(&self, token: &str, user_id: &str) -> Fut<FlowyResult<()>> {
  311. let listener = self.listener.clone();
  312. let token = token.to_owned();
  313. let user_id = user_id.to_owned();
  314. to_fut(async move { listener.did_sign_in(&token, &user_id).await })
  315. }
  316. fn did_sign_up(&self, user_profile: &UserProfile) -> Fut<FlowyResult<()>> {
  317. let listener = self.listener.clone();
  318. let user_profile = user_profile.clone();
  319. to_fut(async move { listener.did_sign_up(&user_profile).await })
  320. }
  321. fn did_expired(&self, token: &str, user_id: &str) -> Fut<FlowyResult<()>> {
  322. let listener = self.listener.clone();
  323. let token = token.to_owned();
  324. let user_id = user_id.to_owned();
  325. to_fut(async move { listener.did_expired(&token, &user_id).await })
  326. }
  327. }