123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- mod deps_resolve;
- pub mod module;
- use crate::deps_resolve::*;
- use backend_service::configuration::ClientServerConfiguration;
- use flowy_core::{context::CoreContext, errors::FlowyError, module::init_core};
- use flowy_document::context::DocumentContext;
- use flowy_net::{
- entities::NetworkType,
- ws::{
- connection::{listen_on_websocket, FlowyRawWebSocket, FlowyWebSocketConnect},
- local::LocalWebSocket,
- },
- };
- use flowy_user::services::{notifier::UserStatus, UserSession, UserSessionConfig};
- use lib_dispatch::prelude::*;
- use lib_ws::WSController;
- use module::mk_modules;
- pub use module::*;
- use std::{
- fmt,
- sync::{
- atomic::{AtomicBool, Ordering},
- Arc,
- },
- };
- use tokio::sync::broadcast;
- static INIT_LOG: AtomicBool = AtomicBool::new(false);
- #[derive(Clone)]
- pub struct FlowySDKConfig {
- name: String,
- root: String,
- log_filter: String,
- server_config: ClientServerConfiguration,
- }
- impl fmt::Debug for FlowySDKConfig {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("FlowySDKConfig")
- .field("name", &self.name)
- .field("root", &self.root)
- .field("server_config", &self.server_config)
- .finish()
- }
- }
- impl FlowySDKConfig {
- pub fn new(root: &str, server_config: ClientServerConfiguration, name: &str) -> Self {
- FlowySDKConfig {
- name: name.to_owned(),
- root: root.to_owned(),
- log_filter: crate_log_filter("info".to_owned()),
- server_config,
- }
- }
- pub fn log_filter(mut self, filter: &str) -> Self {
- self.log_filter = crate_log_filter(filter.to_owned());
- self
- }
- }
- fn crate_log_filter(level: String) -> String {
- let level = std::env::var("RUST_LOG").unwrap_or(level);
- let mut filters = vec![];
- filters.push(format!("flowy_sdk={}", level));
- filters.push(format!("flowy_core={}", level));
- filters.push(format!("flowy_user={}", level));
- filters.push(format!("flowy_document={}", level));
- filters.push(format!("flowy_collaboration={}", level));
- filters.push(format!("flowy_net={}", level));
- filters.push(format!("dart_ffi={}", "info"));
- filters.push(format!("dart_database={}", "info"));
- filters.push(format!("dart_notify={}", level));
- filters.push(format!("lib_ot={}", level));
- filters.push(format!("lib_ws={}", level));
- filters.push(format!("lib_infra={}", level));
- filters.join(",")
- }
- #[derive(Clone)]
- pub struct FlowySDK {
- #[allow(dead_code)]
- config: FlowySDKConfig,
- pub user_session: Arc<UserSession>,
- pub document_ctx: Arc<DocumentContext>,
- pub core: Arc<CoreContext>,
- pub dispatcher: Arc<EventDispatcher>,
- pub ws_conn: Arc<FlowyWebSocketConnect>,
- }
- impl FlowySDK {
- pub fn new(config: FlowySDKConfig) -> Self {
- init_log(&config);
- init_kv(&config.root);
- tracing::debug!("🔥 {:?}", config);
- let ws_conn = Arc::new(FlowyWebSocketConnect::new(
- config.server_config.ws_addr(),
- default_web_socket(),
- ));
- let user_session = mk_user_session(&config, &config.server_config);
- let flowy_document = mk_document(&ws_conn, &user_session, &config.server_config);
- let core_ctx = mk_core_context(&user_session, &flowy_document, &config.server_config);
- //
- let modules = mk_modules(&ws_conn, &core_ctx, &user_session);
- let dispatcher = Arc::new(EventDispatcher::construct(|| modules));
- _init(&dispatcher, &ws_conn, &user_session, &core_ctx);
- Self {
- config,
- user_session,
- document_ctx: flowy_document,
- core: core_ctx,
- dispatcher,
- ws_conn,
- }
- }
- pub fn dispatcher(&self) -> Arc<EventDispatcher> { self.dispatcher.clone() }
- }
- fn _init(
- dispatch: &EventDispatcher,
- ws_conn: &Arc<FlowyWebSocketConnect>,
- user_session: &Arc<UserSession>,
- core: &Arc<CoreContext>,
- ) {
- let subscribe_user_status = user_session.notifier.subscribe_user_status();
- let subscribe_network_type = ws_conn.subscribe_network_ty();
- let core = core.clone();
- let cloned_core = core.clone();
- let user_session = user_session.clone();
- let ws_conn = ws_conn.clone();
- dispatch.spawn(async move {
- user_session.init();
- ws_conn.init().await;
- listen_on_websocket(ws_conn.clone());
- _listen_user_status(ws_conn.clone(), subscribe_user_status, core.clone()).await;
- });
- dispatch.spawn(async move {
- _listen_network_status(subscribe_network_type, cloned_core).await;
- });
- }
- async fn _listen_user_status(
- ws_conn: Arc<FlowyWebSocketConnect>,
- mut subscribe: broadcast::Receiver<UserStatus>,
- core: Arc<CoreContext>,
- ) {
- while let Ok(status) = subscribe.recv().await {
- let result = || async {
- match status {
- UserStatus::Login { token, user_id } => {
- let _ = core.user_did_sign_in(&token).await?;
- let _ = ws_conn.start(token, user_id).await?;
- },
- UserStatus::Logout { .. } => {
- core.user_did_logout().await;
- let _ = ws_conn.stop().await;
- },
- UserStatus::Expired { .. } => {
- core.user_session_expired().await;
- let _ = ws_conn.stop().await;
- },
- UserStatus::SignUp { profile, ret } => {
- let _ = core.user_did_sign_up(&profile.token).await?;
- let _ = ws_conn.start(profile.token.clone(), profile.id.clone()).await?;
- let _ = ret.send(());
- },
- }
- Ok::<(), FlowyError>(())
- };
- match result().await {
- Ok(_) => {},
- Err(e) => log::error!("{}", e),
- }
- }
- }
- async fn _listen_network_status(mut subscribe: broadcast::Receiver<NetworkType>, _core: Arc<CoreContext>) {
- while let Ok(_new_type) = subscribe.recv().await {
- // core.network_state_changed(new_type);
- }
- }
- fn init_kv(root: &str) {
- match flowy_database::kv::KV::init(root) {
- Ok(_) => {},
- Err(e) => tracing::error!("Init kv store failedL: {}", e),
- }
- }
- fn init_log(config: &FlowySDKConfig) {
- if !INIT_LOG.load(Ordering::SeqCst) {
- INIT_LOG.store(true, Ordering::SeqCst);
- let _ = lib_log::Builder::new("flowy-client", &config.root)
- .env_filter(&config.log_filter)
- .build();
- }
- }
- fn mk_user_session(config: &FlowySDKConfig, server_config: &ClientServerConfiguration) -> Arc<UserSession> {
- let session_cache_key = format!("{}_session_cache", &config.name);
- let user_config = UserSessionConfig::new(&config.root, &session_cache_key);
- let cloud_service = UserDepsResolver::resolve(server_config);
- Arc::new(UserSession::new(user_config, cloud_service))
- }
- fn mk_core_context(
- user_session: &Arc<UserSession>,
- flowy_document: &Arc<DocumentContext>,
- server_config: &ClientServerConfiguration,
- ) -> Arc<CoreContext> {
- let (user, database, cloud_service) = CoreDepsResolver::resolve(user_session.clone(), server_config);
- init_core(user, database, flowy_document.clone(), cloud_service)
- }
- fn default_web_socket() -> Arc<dyn FlowyRawWebSocket> {
- if cfg!(feature = "http_server") {
- Arc::new(Arc::new(WSController::new()))
- } else {
- Arc::new(LocalWebSocket::default())
- }
- }
- pub fn mk_document(
- ws_conn: &Arc<FlowyWebSocketConnect>,
- user_session: &Arc<UserSession>,
- server_config: &ClientServerConfiguration,
- ) -> Arc<DocumentContext> {
- let dependencies = DocumentDepsResolver::resolve(ws_conn.clone(), user_session.clone(), server_config);
- Arc::new(DocumentContext::new(
- dependencies.user,
- dependencies.ws_receivers,
- dependencies.ws_sender,
- dependencies.cloud_service,
- ))
- }
|