lib.rs 7.8 KB


  1. mod deps_resolve;
  2. pub mod module;
  3. use crate::deps_resolve::*;
  4. use backend_service::configuration::ClientServerConfiguration;
  5. use flowy_core::{context::CoreContext, errors::FlowyError, module::init_core};
  6. use flowy_document::context::DocumentContext;
  7. use flowy_net::{
  8. entities::NetworkType,
  9. ws::{
  10. connection::{listen_on_websocket, FlowyRawWebSocket, FlowyWebSocketConnect},
  11. local::LocalWebSocket,
  12. },
  13. };
  14. use flowy_user::services::{notifier::UserStatus, UserSession, UserSessionConfig};
  15. use lib_dispatch::prelude::*;
  16. use lib_ws::WSController;
  17. use module::mk_modules;
  18. pub use module::*;
  19. use std::{
  20. fmt,
  21. sync::{
  22. atomic::{AtomicBool, Ordering},
  23. Arc,
  24. },
  25. };
  26. use tokio::sync::broadcast;
  27. static INIT_LOG: AtomicBool = AtomicBool::new(false);
  28. #[derive(Clone)]
  29. pub struct FlowySDKConfig {
  30. name: String,
  31. root: String,
  32. log_filter: String,
  33. server_config: ClientServerConfiguration,
  34. }
  35. impl fmt::Debug for FlowySDKConfig {
  36. fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
  37. f.debug_struct("FlowySDKConfig")
  38. .field("name", &self.name)
  39. .field("root", &self.root)
  40. .field("server_config", &self.server_config)
  41. .finish()
  42. }
  43. }
  44. impl FlowySDKConfig {
  45. pub fn new(root: &str, server_config: ClientServerConfiguration, name: &str) -> Self {
  46. FlowySDKConfig {
  47. name: name.to_owned(),
  48. root: root.to_owned(),
  49. log_filter: crate_log_filter("info".to_owned()),
  50. server_config,
  51. }
  52. }
  53. pub fn log_filter(mut self, filter: &str) -> Self {
  54. self.log_filter = crate_log_filter(filter.to_owned());
  55. self
  56. }
  57. }
  58. fn crate_log_filter(level: String) -> String {
  59. let level = std::env::var("RUST_LOG").unwrap_or(level);
  60. let mut filters = vec![];
  61. filters.push(format!("flowy_sdk={}", level));
  62. filters.push(format!("flowy_core={}", level));
  63. filters.push(format!("flowy_user={}", level));
  64. filters.push(format!("flowy_document={}", level));
  65. filters.push(format!("flowy_collaboration={}", level));
  66. filters.push(format!("flowy_net={}", level));
  67. filters.push(format!("dart_ffi={}", "info"));
  68. filters.push(format!("dart_database={}", "info"));
  69. filters.push(format!("dart_notify={}", level));
  70. filters.push(format!("lib_ot={}", level));
  71. filters.push(format!("lib_ws={}", level));
  72. filters.push(format!("lib_infra={}", level));
  73. filters.join(",")
  74. }
  75. #[derive(Clone)]
  76. pub struct FlowySDK {
  77. #[allow(dead_code)]
  78. config: FlowySDKConfig,
  79. pub user_session: Arc<UserSession>,
  80. pub document_ctx: Arc<DocumentContext>,
  81. pub core: Arc<CoreContext>,
  82. pub dispatcher: Arc<EventDispatcher>,
  83. pub ws_conn: Arc<FlowyWebSocketConnect>,
  84. }
  85. impl FlowySDK {
  86. pub fn new(config: FlowySDKConfig) -> Self {
  87. init_log(&config);
  88. init_kv(&config.root);
  89. tracing::debug!("🔥 {:?}", config);
  90. let ws_conn = Arc::new(FlowyWebSocketConnect::new(
  91. config.server_config.ws_addr(),
  92. default_web_socket(),
  93. ));
  94. let user_session = mk_user_session(&config, &config.server_config);
  95. let flowy_document = mk_document(&ws_conn, &user_session, &config.server_config);
  96. let core_ctx = mk_core_context(&user_session, &flowy_document, &config.server_config);
  97. //
  98. let modules = mk_modules(&ws_conn, &core_ctx, &user_session);
  99. let dispatcher = Arc::new(EventDispatcher::construct(|| modules));
  100. _init(&dispatcher, &ws_conn, &user_session, &core_ctx);
  101. Self {
  102. config,
  103. user_session,
  104. document_ctx: flowy_document,
  105. core: core_ctx,
  106. dispatcher,
  107. ws_conn,
  108. }
  109. }
  110. pub fn dispatcher(&self) -> Arc<EventDispatcher> { self.dispatcher.clone() }
  111. }
  112. fn _init(
  113. dispatch: &EventDispatcher,
  114. ws_conn: &Arc<FlowyWebSocketConnect>,
  115. user_session: &Arc<UserSession>,
  116. core: &Arc<CoreContext>,
  117. ) {
  118. let subscribe_user_status = user_session.notifier.subscribe_user_status();
  119. let subscribe_network_type = ws_conn.subscribe_network_ty();
  120. let core = core.clone();
  121. let cloned_core = core.clone();
  122. let user_session = user_session.clone();
  123. let ws_conn = ws_conn.clone();
  124. dispatch.spawn(async move {
  125. user_session.init();
  126. ws_conn.init().await;
  127. listen_on_websocket(ws_conn.clone());
  128. _listen_user_status(ws_conn.clone(), subscribe_user_status, core.clone()).await;
  129. });
  130. dispatch.spawn(async move {
  131. _listen_network_status(subscribe_network_type, cloned_core).await;
  132. });
  133. }
  134. async fn _listen_user_status(
  135. ws_conn: Arc<FlowyWebSocketConnect>,
  136. mut subscribe: broadcast::Receiver<UserStatus>,
  137. core: Arc<CoreContext>,
  138. ) {
  139. while let Ok(status) = subscribe.recv().await {
  140. let result = || async {
  141. match status {
  142. UserStatus::Login { token, user_id } => {
  143. let _ = core.user_did_sign_in(&token).await?;
  144. let _ = ws_conn.start(token, user_id).await?;
  145. },
  146. UserStatus::Logout { .. } => {
  147. core.user_did_logout().await;
  148. let _ = ws_conn.stop().await;
  149. },
  150. UserStatus::Expired { .. } => {
  151. core.user_session_expired().await;
  152. let _ = ws_conn.stop().await;
  153. },
  154. UserStatus::SignUp { profile, ret } => {
  155. let _ = core.user_did_sign_up(&profile.token).await?;
  156. let _ = ws_conn.start(profile.token.clone(), profile.id.clone()).await?;
  157. let _ = ret.send(());
  158. },
  159. }
  160. Ok::<(), FlowyError>(())
  161. };
  162. match result().await {
  163. Ok(_) => {},
  164. Err(e) => log::error!("{}", e),
  165. }
  166. }
  167. }
  168. async fn _listen_network_status(mut subscribe: broadcast::Receiver<NetworkType>, _core: Arc<CoreContext>) {
  169. while let Ok(_new_type) = subscribe.recv().await {
  170. // core.network_state_changed(new_type);
  171. }
  172. }
  173. fn init_kv(root: &str) {
  174. match flowy_database::kv::KV::init(root) {
  175. Ok(_) => {},
  176. Err(e) => tracing::error!("Init kv store failedL: {}", e),
  177. }
  178. }
  179. fn init_log(config: &FlowySDKConfig) {
  180. if !INIT_LOG.load(Ordering::SeqCst) {
  181. INIT_LOG.store(true, Ordering::SeqCst);
  182. let _ = lib_log::Builder::new("flowy-client", &config.root)
  183. .env_filter(&config.log_filter)
  184. .build();
  185. }
  186. }
  187. fn mk_user_session(config: &FlowySDKConfig, server_config: &ClientServerConfiguration) -> Arc<UserSession> {
  188. let session_cache_key = format!("{}_session_cache", &config.name);
  189. let user_config = UserSessionConfig::new(&config.root, &session_cache_key);
  190. let cloud_service = UserDepsResolver::resolve(server_config);
  191. Arc::new(UserSession::new(user_config, cloud_service))
  192. }
  193. fn mk_core_context(
  194. user_session: &Arc<UserSession>,
  195. flowy_document: &Arc<DocumentContext>,
  196. server_config: &ClientServerConfiguration,
  197. ) -> Arc<CoreContext> {
  198. let (user, database, cloud_service) = CoreDepsResolver::resolve(user_session.clone(), server_config);
  199. init_core(user, database, flowy_document.clone(), cloud_service)
  200. }
  201. fn default_web_socket() -> Arc<dyn FlowyRawWebSocket> {
  202. if cfg!(feature = "http_server") {
  203. Arc::new(Arc::new(WSController::new()))
  204. } else {
  205. Arc::new(LocalWebSocket::default())
  206. }
  207. }
  208. pub fn mk_document(
  209. ws_conn: &Arc<FlowyWebSocketConnect>,
  210. user_session: &Arc<UserSession>,
  211. server_config: &ClientServerConfiguration,
  212. ) -> Arc<DocumentContext> {
  213. let dependencies = DocumentDepsResolver::resolve(ws_conn.clone(), user_session.clone(), server_config);
  214. Arc::new(DocumentContext::new(
  215. dependencies.user,
  216. dependencies.ws_receivers,
  217. dependencies.ws_sender,
  218. dependencies.cloud_service,
  219. ))
  220. }