lib.rs 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. mod deps_resolve;
  2. pub mod module;
  3. use crate::deps_resolve::{DocumentDepsResolver, WorkspaceDepsResolver};
  4. use backend_service::configuration::ClientServerConfiguration;
  5. use flowy_core::{errors::FlowyError, module::init_core, prelude::CoreContext};
  6. use flowy_document::context::DocumentContext;
  7. use flowy_net::{
  8. entities::NetworkType,
  9. services::ws::{listen_on_websocket, FlowyWSConnect, FlowyWebSocket},
  10. };
  11. use flowy_user::{
  12. prelude::UserStatus,
  13. services::user::{UserSession, UserSessionConfig},
  14. };
  15. use flowy_virtual_net::local_web_socket;
  16. use lib_dispatch::prelude::*;
  17. use lib_ws::WSController;
  18. use module::mk_modules;
  19. pub use module::*;
  20. use std::sync::{
  21. atomic::{AtomicBool, Ordering},
  22. Arc,
  23. };
  24. use tokio::sync::broadcast;
  25. static INIT_LOG: AtomicBool = AtomicBool::new(false);
  26. #[derive(Debug, Clone)]
  27. pub struct FlowySDKConfig {
  28. name: String,
  29. root: String,
  30. log_filter: String,
  31. server_config: ClientServerConfiguration,
  32. }
  33. impl FlowySDKConfig {
  34. pub fn new(root: &str, server_config: ClientServerConfiguration, name: &str) -> Self {
  35. FlowySDKConfig {
  36. name: name.to_owned(),
  37. root: root.to_owned(),
  38. log_filter: crate_log_filter(None),
  39. server_config,
  40. }
  41. }
  42. pub fn log_filter(mut self, filter: &str) -> Self {
  43. self.log_filter = crate_log_filter(Some(filter.to_owned()));
  44. self
  45. }
  46. }
  47. fn crate_log_filter(level: Option<String>) -> String {
  48. let level = level.unwrap_or_else(|| std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_owned()));
  49. let mut filters = vec![];
  50. filters.push(format!("flowy_sdk={}", level));
  51. filters.push(format!("flowy_workspace={}", level));
  52. filters.push(format!("flowy_user={}", level));
  53. filters.push(format!("flowy_document={}", level));
  54. filters.push(format!("flowy_document_infra={}", level));
  55. filters.push(format!("flowy_net={}", level));
  56. filters.push(format!("dart_notify={}", level));
  57. filters.push(format!("lib_ot={}", level));
  58. filters.push(format!("lib_ws={}", level));
  59. filters.push(format!("lib_infra={}", level));
  60. filters.join(",")
  61. }
  62. #[derive(Clone)]
  63. pub struct FlowySDK {
  64. #[allow(dead_code)]
  65. config: FlowySDKConfig,
  66. pub user_session: Arc<UserSession>,
  67. pub flowy_document: Arc<DocumentContext>,
  68. pub core: Arc<CoreContext>,
  69. pub dispatcher: Arc<EventDispatcher>,
  70. pub ws_manager: Arc<FlowyWSConnect>,
  71. }
  72. impl FlowySDK {
  73. pub fn new(config: FlowySDKConfig) -> Self {
  74. init_log(&config);
  75. init_kv(&config.root);
  76. tracing::debug!("🔥 {:?}", config);
  77. let ws: Arc<dyn FlowyWebSocket> = if cfg!(feature = "http_server") {
  78. Arc::new(Arc::new(WSController::new()))
  79. } else {
  80. local_web_socket()
  81. };
  82. let ws_manager = Arc::new(FlowyWSConnect::new(config.server_config.ws_addr(), ws));
  83. let user_session = mk_user_session(&config);
  84. let flowy_document = mk_document(ws_manager.clone(), user_session.clone(), &config.server_config);
  85. let core_ctx = mk_core_context(user_session.clone(), flowy_document.clone(), &config.server_config);
  86. //
  87. let modules = mk_modules(ws_manager.clone(), core_ctx.clone(), user_session.clone());
  88. let dispatcher = Arc::new(EventDispatcher::construct(|| modules));
  89. _init(&dispatcher, ws_manager.clone(), user_session.clone(), core_ctx.clone());
  90. Self {
  91. config,
  92. user_session,
  93. flowy_document,
  94. core: core_ctx,
  95. dispatcher,
  96. ws_manager,
  97. }
  98. }
  99. pub fn dispatcher(&self) -> Arc<EventDispatcher> { self.dispatcher.clone() }
  100. }
  101. fn _init(
  102. dispatch: &EventDispatcher,
  103. ws_manager: Arc<FlowyWSConnect>,
  104. user_session: Arc<UserSession>,
  105. core: Arc<CoreContext>,
  106. ) {
  107. let subscribe_user_status = user_session.notifier.subscribe_user_status();
  108. let subscribe_network_type = ws_manager.subscribe_network_ty();
  109. let cloned_core = core.clone();
  110. dispatch.spawn(async move {
  111. user_session.init();
  112. listen_on_websocket(ws_manager.clone());
  113. _listen_user_status(ws_manager.clone(), subscribe_user_status, core.clone()).await;
  114. });
  115. dispatch.spawn(async move {
  116. _listen_network_status(subscribe_network_type, cloned_core).await;
  117. });
  118. }
  119. async fn _listen_user_status(
  120. ws_manager: Arc<FlowyWSConnect>,
  121. mut subscribe: broadcast::Receiver<UserStatus>,
  122. core: Arc<CoreContext>,
  123. ) {
  124. while let Ok(status) = subscribe.recv().await {
  125. let result = || async {
  126. match status {
  127. UserStatus::Login { token } => {
  128. let _ = core.user_did_sign_in(&token).await?;
  129. let _ = ws_manager.start(token).await?;
  130. },
  131. UserStatus::Logout { .. } => {
  132. core.user_did_logout().await;
  133. let _ = ws_manager.stop().await;
  134. },
  135. UserStatus::Expired { .. } => {
  136. core.user_session_expired().await;
  137. let _ = ws_manager.stop().await;
  138. },
  139. UserStatus::SignUp { profile, ret } => {
  140. let _ = core.user_did_sign_up(&profile.token).await?;
  141. let _ = ws_manager.start(profile.token.clone()).await?;
  142. let _ = ret.send(());
  143. },
  144. }
  145. Ok::<(), FlowyError>(())
  146. };
  147. match result().await {
  148. Ok(_) => {},
  149. Err(e) => log::error!("{}", e),
  150. }
  151. }
  152. }
  153. async fn _listen_network_status(mut subscribe: broadcast::Receiver<NetworkType>, core: Arc<CoreContext>) {
  154. while let Ok(new_type) = subscribe.recv().await {
  155. core.network_state_changed(new_type);
  156. }
  157. }
  158. fn init_kv(root: &str) {
  159. match flowy_database::kv::KV::init(root) {
  160. Ok(_) => {},
  161. Err(e) => tracing::error!("Init kv store failedL: {}", e),
  162. }
  163. }
  164. fn init_log(config: &FlowySDKConfig) {
  165. if !INIT_LOG.load(Ordering::SeqCst) {
  166. INIT_LOG.store(true, Ordering::SeqCst);
  167. let _ = lib_log::Builder::new("flowy-client", &config.root)
  168. .env_filter(&config.log_filter)
  169. .build();
  170. }
  171. }
  172. fn mk_user_session(config: &FlowySDKConfig) -> Arc<UserSession> {
  173. let session_cache_key = format!("{}_session_cache", &config.name);
  174. let user_config = UserSessionConfig::new(&config.root, &config.server_config, &session_cache_key);
  175. Arc::new(UserSession::new(user_config))
  176. }
  177. fn mk_core_context(
  178. user_session: Arc<UserSession>,
  179. flowy_document: Arc<DocumentContext>,
  180. server_config: &ClientServerConfiguration,
  181. ) -> Arc<CoreContext> {
  182. let workspace_deps = WorkspaceDepsResolver::new(user_session);
  183. let (user, database) = workspace_deps.split_into();
  184. init_core(user, database, flowy_document, server_config)
  185. }
  186. pub fn mk_document(
  187. ws_manager: Arc<FlowyWSConnect>,
  188. user_session: Arc<UserSession>,
  189. server_config: &ClientServerConfiguration,
  190. ) -> Arc<DocumentContext> {
  191. let (user, ws_receivers, ws_sender) = DocumentDepsResolver::resolve(ws_manager, user_session);
  192. Arc::new(DocumentContext::new(user, ws_receivers, ws_sender, server_config))
  193. }