event_handler.rs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549
  1. use std::sync::Weak;
  2. use std::{convert::TryInto, sync::Arc};
  3. use serde_json::Value;
  4. use flowy_error::{ErrorCode, FlowyError, FlowyResult};
  5. use flowy_sqlite::kv::StorePreferences;
  6. use flowy_user_deps::cloud::UserCloudConfig;
  7. use flowy_user_deps::entities::*;
  8. use lib_dispatch::prelude::*;
  9. use lib_infra::box_any::BoxAny;
  10. use crate::entities::*;
  11. use crate::manager::UserManager;
  12. use crate::notification::{send_notification, UserNotification};
  13. use crate::services::cloud_config::{
  14. get_cloud_config, get_or_create_cloud_config, save_cloud_config,
  15. };
  16. fn upgrade_manager(manager: AFPluginState<Weak<UserManager>>) -> FlowyResult<Arc<UserManager>> {
  17. let manager = manager
  18. .upgrade()
  19. .ok_or(FlowyError::internal().with_context("The user session is already drop"))?;
  20. Ok(manager)
  21. }
  22. fn upgrade_store_preferences(
  23. store: AFPluginState<Weak<StorePreferences>>,
  24. ) -> FlowyResult<Arc<StorePreferences>> {
  25. let store = store
  26. .upgrade()
  27. .ok_or(FlowyError::internal().with_context("The store preferences is already drop"))?;
  28. Ok(store)
  29. }
  30. #[tracing::instrument(level = "debug", name = "sign_in", skip(data, manager), fields(email = %data.email), err)]
  31. pub async fn sign_in(
  32. data: AFPluginData<SignInPayloadPB>,
  33. manager: AFPluginState<Weak<UserManager>>,
  34. ) -> DataResult<UserProfilePB, FlowyError> {
  35. let manager = upgrade_manager(manager)?;
  36. let params: SignInParams = data.into_inner().try_into()?;
  37. let auth_type = params.auth_type.clone();
  38. let user_profile: UserProfilePB = manager
  39. .sign_in(BoxAny::new(params), auth_type)
  40. .await?
  41. .into();
  42. data_result_ok(user_profile)
  43. }
  44. #[tracing::instrument(
  45. level = "debug",
  46. name = "sign_up",
  47. skip(data, manager),
  48. fields(
  49. email = %data.email,
  50. name = %data.name,
  51. ),
  52. err
  53. )]
  54. pub async fn sign_up(
  55. data: AFPluginData<SignUpPayloadPB>,
  56. manager: AFPluginState<Weak<UserManager>>,
  57. ) -> DataResult<UserProfilePB, FlowyError> {
  58. let manager = upgrade_manager(manager)?;
  59. let params: SignUpParams = data.into_inner().try_into()?;
  60. let auth_type = params.auth_type.clone();
  61. let user_profile = manager.sign_up(auth_type, BoxAny::new(params)).await?;
  62. data_result_ok(user_profile.into())
  63. }
  64. #[tracing::instrument(level = "debug", skip(manager))]
  65. pub async fn init_user_handler(
  66. manager: AFPluginState<Weak<UserManager>>,
  67. ) -> Result<(), FlowyError> {
  68. let manager = upgrade_manager(manager)?;
  69. manager.init_user().await?;
  70. Ok(())
  71. }
  72. #[tracing::instrument(level = "debug", skip(manager))]
  73. pub async fn check_user_handler(
  74. manager: AFPluginState<Weak<UserManager>>,
  75. ) -> Result<(), FlowyError> {
  76. let manager = upgrade_manager(manager)?;
  77. manager.check_user().await?;
  78. Ok(())
  79. }
  80. #[tracing::instrument(level = "debug", skip(manager))]
  81. pub async fn get_user_profile_handler(
  82. manager: AFPluginState<Weak<UserManager>>,
  83. ) -> DataResult<UserProfilePB, FlowyError> {
  84. let manager = upgrade_manager(manager)?;
  85. let uid = manager.get_session()?.user_id;
  86. let user_profile = manager.get_user_profile(uid).await?;
  87. let weak_manager = Arc::downgrade(&manager);
  88. let cloned_user_profile = user_profile.clone();
  89. tokio::spawn(async move {
  90. if let Some(manager) = weak_manager.upgrade() {
  91. let _ = manager.refresh_user_profile(&cloned_user_profile).await;
  92. }
  93. });
  94. data_result_ok(user_profile.into())
  95. }
  96. #[tracing::instrument(level = "debug", skip(manager))]
  97. pub async fn sign_out(manager: AFPluginState<Weak<UserManager>>) -> Result<(), FlowyError> {
  98. let manager = upgrade_manager(manager)?;
  99. manager.sign_out().await?;
  100. Ok(())
  101. }
  102. #[tracing::instrument(level = "debug", skip(data, manager))]
  103. pub async fn update_user_profile_handler(
  104. data: AFPluginData<UpdateUserProfilePayloadPB>,
  105. manager: AFPluginState<Weak<UserManager>>,
  106. ) -> Result<(), FlowyError> {
  107. let manager = upgrade_manager(manager)?;
  108. let params: UpdateUserProfileParams = data.into_inner().try_into()?;
  109. manager.update_user_profile(params).await?;
  110. Ok(())
  111. }
  112. const APPEARANCE_SETTING_CACHE_KEY: &str = "appearance_settings";
  113. #[tracing::instrument(level = "debug", skip_all, err)]
  114. pub async fn set_appearance_setting(
  115. store_preferences: AFPluginState<Weak<StorePreferences>>,
  116. data: AFPluginData<AppearanceSettingsPB>,
  117. ) -> Result<(), FlowyError> {
  118. let store_preferences = upgrade_store_preferences(store_preferences)?;
  119. let mut setting = data.into_inner();
  120. if setting.theme.is_empty() {
  121. setting.theme = APPEARANCE_DEFAULT_THEME.to_string();
  122. }
  123. store_preferences.set_object(APPEARANCE_SETTING_CACHE_KEY, setting)?;
  124. Ok(())
  125. }
  126. #[tracing::instrument(level = "debug", skip_all, err)]
  127. pub async fn get_appearance_setting(
  128. store_preferences: AFPluginState<Weak<StorePreferences>>,
  129. ) -> DataResult<AppearanceSettingsPB, FlowyError> {
  130. let store_preferences = upgrade_store_preferences(store_preferences)?;
  131. match store_preferences.get_str(APPEARANCE_SETTING_CACHE_KEY) {
  132. None => data_result_ok(AppearanceSettingsPB::default()),
  133. Some(s) => {
  134. let setting = match serde_json::from_str(&s) {
  135. Ok(setting) => setting,
  136. Err(e) => {
  137. tracing::error!(
  138. "Deserialize AppearanceSettings failed: {:?}, fallback to default",
  139. e
  140. );
  141. AppearanceSettingsPB::default()
  142. },
  143. };
  144. data_result_ok(setting)
  145. },
  146. }
  147. }
  148. const DATE_TIME_SETTINGS_CACHE_KEY: &str = "date_time_settings";
  149. #[tracing::instrument(level = "debug", skip_all, err)]
  150. pub async fn set_date_time_settings(
  151. store_preferences: AFPluginState<Weak<StorePreferences>>,
  152. data: AFPluginData<DateTimeSettingsPB>,
  153. ) -> Result<(), FlowyError> {
  154. let store_preferences = upgrade_store_preferences(store_preferences)?;
  155. let mut setting = data.into_inner();
  156. if setting.timezone_id.is_empty() {
  157. setting.timezone_id = "".to_string();
  158. }
  159. store_preferences.set_object(DATE_TIME_SETTINGS_CACHE_KEY, setting)?;
  160. Ok(())
  161. }
  162. #[tracing::instrument(level = "debug", skip_all, err)]
  163. pub async fn get_date_time_settings(
  164. store_preferences: AFPluginState<Weak<StorePreferences>>,
  165. ) -> DataResult<DateTimeSettingsPB, FlowyError> {
  166. let store_preferences = upgrade_store_preferences(store_preferences)?;
  167. match store_preferences.get_str(DATE_TIME_SETTINGS_CACHE_KEY) {
  168. None => data_result_ok(DateTimeSettingsPB::default()),
  169. Some(s) => {
  170. let setting = match serde_json::from_str(&s) {
  171. Ok(setting) => setting,
  172. Err(e) => {
  173. tracing::error!(
  174. "Deserialize AppearanceSettings failed: {:?}, fallback to default",
  175. e
  176. );
  177. DateTimeSettingsPB::default()
  178. },
  179. };
  180. data_result_ok(setting)
  181. },
  182. }
  183. }
  184. #[tracing::instrument(level = "debug", skip_all, err)]
  185. pub async fn get_user_setting(
  186. manager: AFPluginState<Weak<UserManager>>,
  187. ) -> DataResult<UserSettingPB, FlowyError> {
  188. let manager = upgrade_manager(manager)?;
  189. let user_setting = manager.user_setting()?;
  190. data_result_ok(user_setting)
  191. }
  192. #[tracing::instrument(level = "debug", skip(data, manager), err)]
  193. pub async fn oauth_handler(
  194. data: AFPluginData<OauthSignInPB>,
  195. manager: AFPluginState<Weak<UserManager>>,
  196. ) -> DataResult<UserProfilePB, FlowyError> {
  197. let manager = upgrade_manager(manager)?;
  198. let params = data.into_inner();
  199. let auth_type: AuthType = params.auth_type.into();
  200. let user_profile = manager.sign_up(auth_type, BoxAny::new(params.map)).await?;
  201. data_result_ok(user_profile.into())
  202. }
  203. #[tracing::instrument(level = "debug", skip(data, manager), err)]
  204. pub async fn get_sign_in_url_handler(
  205. data: AFPluginData<SignInUrlPayloadPB>,
  206. manager: AFPluginState<Weak<UserManager>>,
  207. ) -> DataResult<SignInUrlPB, FlowyError> {
  208. let manager = upgrade_manager(manager)?;
  209. let params = data.into_inner();
  210. let auth_type: AuthType = params.auth_type.into();
  211. let sign_in_url = manager
  212. .generate_sign_in_url_with_email(&auth_type, &params.email)
  213. .await?;
  214. let resp = SignInUrlPB { sign_in_url };
  215. data_result_ok(resp)
  216. }
  217. #[tracing::instrument(level = "debug", skip_all, err)]
  218. pub async fn sign_in_with_provider_handler(
  219. data: AFPluginData<OauthProviderPB>,
  220. manager: AFPluginState<Weak<UserManager>>,
  221. ) -> DataResult<OauthProviderDataPB, FlowyError> {
  222. let manager = upgrade_manager(manager)?;
  223. tracing::debug!("Sign in with provider: {:?}", data.provider.as_str());
  224. let sign_in_url = manager.generate_oauth_url(data.provider.as_str()).await?;
  225. data_result_ok(OauthProviderDataPB {
  226. oauth_url: sign_in_url,
  227. })
  228. }
  229. #[tracing::instrument(level = "debug", skip_all, err)]
  230. pub async fn set_encrypt_secret_handler(
  231. manager: AFPluginState<Weak<UserManager>>,
  232. data: AFPluginData<UserSecretPB>,
  233. store_preferences: AFPluginState<Weak<StorePreferences>>,
  234. ) -> Result<(), FlowyError> {
  235. let manager = upgrade_manager(manager)?;
  236. let store_preferences = upgrade_store_preferences(store_preferences)?;
  237. let data = data.into_inner();
  238. match data.encryption_type {
  239. EncryptionTypePB::NoEncryption => {
  240. tracing::error!("Encryption type is NoEncryption, but set encrypt secret");
  241. },
  242. EncryptionTypePB::Symmetric => {
  243. manager.check_encryption_sign_with_secret(
  244. data.user_id,
  245. &data.encryption_sign,
  246. &data.encryption_secret,
  247. )?;
  248. let config = UserCloudConfig::new(data.encryption_secret).with_enable_encrypt(true);
  249. manager
  250. .set_encrypt_secret(
  251. data.user_id,
  252. config.encrypt_secret.clone(),
  253. EncryptionType::SelfEncryption(data.encryption_sign),
  254. )
  255. .await?;
  256. save_cloud_config(data.user_id, &store_preferences, config)?;
  257. },
  258. }
  259. manager.resume_sign_up().await?;
  260. Ok(())
  261. }
  262. #[tracing::instrument(level = "debug", skip_all, err)]
  263. pub async fn check_encrypt_secret_handler(
  264. manager: AFPluginState<Weak<UserManager>>,
  265. ) -> DataResult<UserEncryptionSecretCheckPB, FlowyError> {
  266. let manager = upgrade_manager(manager)?;
  267. let uid = manager.get_session()?.user_id;
  268. let profile = manager.get_user_profile(uid).await?;
  269. let is_need_secret = match profile.encryption_type {
  270. EncryptionType::NoEncryption => false,
  271. EncryptionType::SelfEncryption(sign) => {
  272. if sign.is_empty() {
  273. false
  274. } else {
  275. manager.check_encryption_sign(uid, &sign).is_err()
  276. }
  277. },
  278. };
  279. data_result_ok(UserEncryptionSecretCheckPB { is_need_secret })
  280. }
  281. #[tracing::instrument(level = "debug", skip_all, err)]
  282. pub async fn set_cloud_config_handler(
  283. manager: AFPluginState<Weak<UserManager>>,
  284. data: AFPluginData<UpdateCloudConfigPB>,
  285. store_preferences: AFPluginState<Weak<StorePreferences>>,
  286. ) -> Result<(), FlowyError> {
  287. let manager = upgrade_manager(manager)?;
  288. let session = manager.get_session()?;
  289. let update = data.into_inner();
  290. let store_preferences = upgrade_store_preferences(store_preferences)?;
  291. let mut config = get_cloud_config(session.user_id, &store_preferences)
  292. .ok_or(FlowyError::internal().with_context("Can't find any cloud config"))?;
  293. if let Some(enable_sync) = update.enable_sync {
  294. manager
  295. .cloud_services
  296. .set_enable_sync(session.user_id, enable_sync);
  297. config.enable_sync = enable_sync;
  298. }
  299. if let Some(enable_encrypt) = update.enable_encrypt {
  300. debug_assert!(enable_encrypt, "Disable encryption is not supported");
  301. if enable_encrypt {
  302. tracing::info!("Enable encryption for user: {}", session.user_id);
  303. config = config.with_enable_encrypt(enable_encrypt);
  304. let encrypt_secret = config.encrypt_secret.clone();
  305. // The encryption secret is generated when the user first enables encryption and will be
  306. // used to validate the encryption secret is correct when the user logs in.
  307. let encryption_sign = manager.generate_encryption_sign(session.user_id, &encrypt_secret)?;
  308. let encryption_type = EncryptionType::SelfEncryption(encryption_sign);
  309. manager
  310. .set_encrypt_secret(session.user_id, encrypt_secret, encryption_type.clone())
  311. .await?;
  312. save_cloud_config(session.user_id, &store_preferences, config.clone())?;
  313. let params =
  314. UpdateUserProfileParams::new(session.user_id).with_encryption_type(encryption_type);
  315. manager.update_user_profile(params).await?;
  316. }
  317. }
  318. let config_pb = UserCloudConfigPB::from(config);
  319. send_notification(
  320. &session.user_id.to_string(),
  321. UserNotification::DidUpdateCloudConfig,
  322. )
  323. .payload(config_pb)
  324. .send();
  325. Ok(())
  326. }
  327. #[tracing::instrument(level = "debug", skip_all, err)]
  328. pub async fn get_cloud_config_handler(
  329. manager: AFPluginState<Weak<UserManager>>,
  330. store_preferences: AFPluginState<Weak<StorePreferences>>,
  331. ) -> DataResult<UserCloudConfigPB, FlowyError> {
  332. let manager = upgrade_manager(manager)?;
  333. let session = manager.get_session()?;
  334. let store_preferences = upgrade_store_preferences(store_preferences)?;
  335. // Generate the default config if the config is not exist
  336. let config = get_or_create_cloud_config(session.user_id, &store_preferences);
  337. data_result_ok(config.into())
  338. }
  339. #[tracing::instrument(level = "debug", skip(manager), err)]
  340. pub async fn get_all_user_workspace_handler(
  341. manager: AFPluginState<Weak<UserManager>>,
  342. ) -> DataResult<RepeatedUserWorkspacePB, FlowyError> {
  343. let manager = upgrade_manager(manager)?;
  344. let uid = manager.get_session()?.user_id;
  345. let user_workspaces = manager.get_all_user_workspaces(uid)?;
  346. data_result_ok(user_workspaces.into())
  347. }
  348. #[tracing::instrument(level = "debug", skip(data, manager), err)]
  349. pub async fn open_workspace_handler(
  350. data: AFPluginData<UserWorkspacePB>,
  351. manager: AFPluginState<Weak<UserManager>>,
  352. ) -> Result<(), FlowyError> {
  353. let manager = upgrade_manager(manager)?;
  354. let params = data.into_inner();
  355. manager.open_workspace(&params.id).await?;
  356. Ok(())
  357. }
  358. #[tracing::instrument(level = "debug", skip(data, manager), err)]
  359. pub async fn add_user_to_workspace_handler(
  360. data: AFPluginData<AddWorkspaceUserPB>,
  361. manager: AFPluginState<Weak<UserManager>>,
  362. ) -> Result<(), FlowyError> {
  363. let manager = upgrade_manager(manager)?;
  364. let params = data.into_inner();
  365. manager
  366. .add_user_to_workspace(params.email, params.workspace_id)
  367. .await?;
  368. Ok(())
  369. }
  370. #[tracing::instrument(level = "debug", skip(data, manager), err)]
  371. pub async fn remove_user_from_workspace_handler(
  372. data: AFPluginData<RemoveWorkspaceUserPB>,
  373. manager: AFPluginState<Weak<UserManager>>,
  374. ) -> Result<(), FlowyError> {
  375. let manager = upgrade_manager(manager)?;
  376. let params = data.into_inner();
  377. manager
  378. .remove_user_to_workspace(params.email, params.workspace_id)
  379. .await?;
  380. Ok(())
  381. }
  382. #[tracing::instrument(level = "debug", skip(data, manager), err)]
  383. pub async fn update_network_state_handler(
  384. data: AFPluginData<NetworkStatePB>,
  385. manager: AFPluginState<Weak<UserManager>>,
  386. ) -> Result<(), FlowyError> {
  387. let manager = upgrade_manager(manager)?;
  388. let reachable = data.into_inner().ty.is_reachable();
  389. manager
  390. .user_status_callback
  391. .read()
  392. .await
  393. .did_update_network(reachable);
  394. Ok(())
  395. }
  396. #[tracing::instrument(level = "debug", skip_all, err)]
  397. pub async fn get_historical_users_handler(
  398. manager: AFPluginState<Weak<UserManager>>,
  399. ) -> DataResult<RepeatedHistoricalUserPB, FlowyError> {
  400. let manager = upgrade_manager(manager)?;
  401. let users = RepeatedHistoricalUserPB::from(manager.get_historical_users());
  402. data_result_ok(users)
  403. }
  404. #[tracing::instrument(level = "debug", skip_all, err)]
  405. pub async fn open_historical_users_handler(
  406. user: AFPluginData<HistoricalUserPB>,
  407. manager: AFPluginState<Weak<UserManager>>,
  408. ) -> Result<(), FlowyError> {
  409. let user = user.into_inner();
  410. let manager = upgrade_manager(manager)?;
  411. let auth_type = AuthType::from(user.auth_type);
  412. manager
  413. .open_historical_user(user.user_id, user.device_id, auth_type)
  414. .await?;
  415. Ok(())
  416. }
  417. pub async fn push_realtime_event_handler(
  418. payload: AFPluginData<RealtimePayloadPB>,
  419. manager: AFPluginState<Weak<UserManager>>,
  420. ) -> Result<(), FlowyError> {
  421. match serde_json::from_str::<Value>(&payload.into_inner().json_str) {
  422. Ok(json) => {
  423. let manager = upgrade_manager(manager)?;
  424. manager.receive_realtime_event(json).await;
  425. },
  426. Err(e) => {
  427. tracing::error!("Deserialize RealtimePayload failed: {:?}", e);
  428. },
  429. }
  430. Ok(())
  431. }
  432. #[tracing::instrument(level = "debug", skip_all, err)]
  433. pub async fn create_reminder_event_handler(
  434. data: AFPluginData<ReminderPB>,
  435. manager: AFPluginState<Weak<UserManager>>,
  436. ) -> Result<(), FlowyError> {
  437. let manager = upgrade_manager(manager)?;
  438. let params = data.into_inner();
  439. manager.add_reminder(params).await?;
  440. Ok(())
  441. }
  442. #[tracing::instrument(level = "debug", skip_all, err)]
  443. pub async fn get_all_reminder_event_handler(
  444. manager: AFPluginState<Weak<UserManager>>,
  445. ) -> DataResult<RepeatedReminderPB, FlowyError> {
  446. let manager = upgrade_manager(manager)?;
  447. let reminders = manager
  448. .get_all_reminders()
  449. .await
  450. .into_iter()
  451. .map(ReminderPB::from)
  452. .collect::<Vec<_>>();
  453. data_result_ok(reminders.into())
  454. }
  455. #[tracing::instrument(level = "debug", skip_all, err)]
  456. pub async fn reset_workspace_handler(
  457. data: AFPluginData<ResetWorkspacePB>,
  458. manager: AFPluginState<Weak<UserManager>>,
  459. ) -> Result<(), FlowyError> {
  460. let manager = upgrade_manager(manager)?;
  461. let reset_pb = data.into_inner();
  462. if reset_pb.workspace_id.is_empty() {
  463. return Err(FlowyError::new(
  464. ErrorCode::WorkspaceIdInvalid,
  465. "The workspace id is empty",
  466. ));
  467. }
  468. let session = manager.get_session()?;
  469. manager.reset_workspace(reset_pb, session.device_id).await?;
  470. Ok(())
  471. }
  472. #[tracing::instrument(level = "debug", skip_all, err)]
  473. pub async fn remove_reminder_event_handler(
  474. data: AFPluginData<ReminderIdentifierPB>,
  475. manager: AFPluginState<Weak<UserManager>>,
  476. ) -> Result<(), FlowyError> {
  477. let manager = upgrade_manager(manager)?;
  478. let params = data.into_inner();
  479. let _ = manager.remove_reminder(params.id.as_str()).await;
  480. Ok(())
  481. }
  482. #[tracing::instrument(level = "debug", skip_all, err)]
  483. pub async fn update_reminder_event_handler(
  484. data: AFPluginData<ReminderPB>,
  485. manager: AFPluginState<Weak<UserManager>>,
  486. ) -> Result<(), FlowyError> {
  487. let manager = upgrade_manager(manager)?;
  488. let params = data.into_inner();
  489. manager.update_reminder(params).await?;
  490. Ok(())
  491. }