event_handler.rs 19 KB


  1. use std::sync::Weak;
  2. use std::{convert::TryInto, sync::Arc};
  3. use serde_json::Value;
  4. use flowy_error::{ErrorCode, FlowyError, FlowyResult};
  5. use flowy_sqlite::kv::StorePreferences;
  6. use flowy_user_deps::cloud::UserCloudConfig;
  7. use flowy_user_deps::entities::*;
  8. use lib_dispatch::prelude::*;
  9. use lib_infra::box_any::BoxAny;
  10. use crate::entities::*;
  11. use crate::manager::UserManager;
  12. use crate::notification::{send_notification, UserNotification};
  13. use crate::services::cloud_config::{
  14. get_cloud_config, get_or_create_cloud_config, save_cloud_config,
  15. };
  16. fn upgrade_manager(manager: AFPluginState<Weak<UserManager>>) -> FlowyResult<Arc<UserManager>> {
  17. let manager = manager
  18. .upgrade()
  19. .ok_or(FlowyError::internal().with_context("The user session is already drop"))?;
  20. Ok(manager)
  21. }
  22. fn upgrade_store_preferences(
  23. store: AFPluginState<Weak<StorePreferences>>,
  24. ) -> FlowyResult<Arc<StorePreferences>> {
  25. let store = store
  26. .upgrade()
  27. .ok_or(FlowyError::internal().with_context("The store preferences is already drop"))?;
  28. Ok(store)
  29. }
  30. #[tracing::instrument(level = "debug", name = "sign_in", skip(data, manager), fields(email = %data.email), err)]
  31. pub async fn sign_in(
  32. data: AFPluginData<SignInPayloadPB>,
  33. manager: AFPluginState<Weak<UserManager>>,
  34. ) -> DataResult<UserProfilePB, FlowyError> {
  35. let manager = upgrade_manager(manager)?;
  36. let params: SignInParams = data.into_inner().try_into()?;
  37. let auth_type = params.auth_type.clone();
  38. let user_profile: UserProfilePB = manager
  39. .sign_in(BoxAny::new(params), auth_type)
  40. .await?
  41. .into();
  42. data_result_ok(user_profile)
  43. }
  44. #[tracing::instrument(
  45. level = "debug",
  46. name = "sign_up",
  47. skip(data, manager),
  48. fields(
  49. email = %data.email,
  50. name = %data.name,
  51. ),
  52. err
  53. )]
  54. pub async fn sign_up(
  55. data: AFPluginData<SignUpPayloadPB>,
  56. manager: AFPluginState<Weak<UserManager>>,
  57. ) -> DataResult<UserProfilePB, FlowyError> {
  58. let manager = upgrade_manager(manager)?;
  59. let params: SignUpParams = data.into_inner().try_into()?;
  60. let auth_type = params.auth_type.clone();
  61. let user_profile = manager.sign_up(auth_type, BoxAny::new(params)).await?;
  62. data_result_ok(user_profile.into())
  63. }
  64. #[tracing::instrument(level = "debug", skip(manager))]
  65. pub async fn init_user_handler(
  66. manager: AFPluginState<Weak<UserManager>>,
  67. ) -> Result<(), FlowyError> {
  68. let manager = upgrade_manager(manager)?;
  69. manager.init_user().await?;
  70. Ok(())
  71. }
  72. #[tracing::instrument(level = "debug", skip(manager))]
  73. pub async fn check_user_handler(
  74. manager: AFPluginState<Weak<UserManager>>,
  75. ) -> Result<(), FlowyError> {
  76. let manager = upgrade_manager(manager)?;
  77. manager.check_user().await?;
  78. Ok(())
  79. }
  80. #[tracing::instrument(level = "debug", skip(manager))]
  81. pub async fn get_user_profile_handler(
  82. manager: AFPluginState<Weak<UserManager>>,
  83. ) -> DataResult<UserProfilePB, FlowyError> {
  84. let manager = upgrade_manager(manager)?;
  85. let uid = manager.get_session()?.user_id;
  86. let user_profile = manager.get_user_profile(uid).await?;
  87. let weak_manager = Arc::downgrade(&manager);
  88. let cloned_user_profile = user_profile.clone();
  89. tokio::spawn(async move {
  90. if let Some(manager) = weak_manager.upgrade() {
  91. let _ = manager.refresh_user_profile(&cloned_user_profile).await;
  92. }
  93. });
  94. data_result_ok(user_profile.into())
  95. }
  96. #[tracing::instrument(level = "debug", skip(manager))]
  97. pub async fn sign_out(manager: AFPluginState<Weak<UserManager>>) -> Result<(), FlowyError> {
  98. let manager = upgrade_manager(manager)?;
  99. manager.sign_out().await?;
  100. Ok(())
  101. }
  102. #[tracing::instrument(level = "debug", skip(data, manager))]
  103. pub async fn update_user_profile_handler(
  104. data: AFPluginData<UpdateUserProfilePayloadPB>,
  105. manager: AFPluginState<Weak<UserManager>>,
  106. ) -> Result<(), FlowyError> {
  107. let manager = upgrade_manager(manager)?;
  108. let params: UpdateUserProfileParams = data.into_inner().try_into()?;
  109. manager.update_user_profile(params).await?;
  110. Ok(())
  111. }
  112. const APPEARANCE_SETTING_CACHE_KEY: &str = "appearance_settings";
  113. #[tracing::instrument(level = "debug", skip_all, err)]
  114. pub async fn set_appearance_setting(
  115. store_preferences: AFPluginState<Weak<StorePreferences>>,
  116. data: AFPluginData<AppearanceSettingsPB>,
  117. ) -> Result<(), FlowyError> {
  118. let store_preferences = upgrade_store_preferences(store_preferences)?;
  119. let mut setting = data.into_inner();
  120. if setting.theme.is_empty() {
  121. setting.theme = APPEARANCE_DEFAULT_THEME.to_string();
  122. }
  123. store_preferences.set_object(APPEARANCE_SETTING_CACHE_KEY, setting)?;
  124. Ok(())
  125. }
  126. #[tracing::instrument(level = "debug", skip_all, err)]
  127. pub async fn get_appearance_setting(
  128. store_preferences: AFPluginState<Weak<StorePreferences>>,
  129. ) -> DataResult<AppearanceSettingsPB, FlowyError> {
  130. let store_preferences = upgrade_store_preferences(store_preferences)?;
  131. match store_preferences.get_str(APPEARANCE_SETTING_CACHE_KEY) {
  132. None => data_result_ok(AppearanceSettingsPB::default()),
  133. Some(s) => {
  134. let setting = match serde_json::from_str(&s) {
  135. Ok(setting) => setting,
  136. Err(e) => {
  137. tracing::error!(
  138. "Deserialize AppearanceSettings failed: {:?}, fallback to default",
  139. e
  140. );
  141. AppearanceSettingsPB::default()
  142. },
  143. };
  144. data_result_ok(setting)
  145. },
  146. }
  147. }
  148. const DATE_TIME_SETTINGS_CACHE_KEY: &str = "date_time_settings";
  149. #[tracing::instrument(level = "debug", skip_all, err)]
  150. pub async fn set_date_time_settings(
  151. store_preferences: AFPluginState<Weak<StorePreferences>>,
  152. data: AFPluginData<DateTimeSettingsPB>,
  153. ) -> Result<(), FlowyError> {
  154. let store_preferences = upgrade_store_preferences(store_preferences)?;
  155. let mut setting = data.into_inner();
  156. if setting.timezone_id.is_empty() {
  157. setting.timezone_id = "".to_string();
  158. }
  159. store_preferences.set_object(DATE_TIME_SETTINGS_CACHE_KEY, setting)?;
  160. Ok(())
  161. }
  162. #[tracing::instrument(level = "debug", skip_all, err)]
  163. pub async fn get_date_time_settings(
  164. store_preferences: AFPluginState<Weak<StorePreferences>>,
  165. ) -> DataResult<DateTimeSettingsPB, FlowyError> {
  166. let store_preferences = upgrade_store_preferences(store_preferences)?;
  167. match store_preferences.get_str(DATE_TIME_SETTINGS_CACHE_KEY) {
  168. None => data_result_ok(DateTimeSettingsPB::default()),
  169. Some(s) => {
  170. let setting = match serde_json::from_str(&s) {
  171. Ok(setting) => setting,
  172. Err(e) => {
  173. tracing::error!(
  174. "Deserialize DateTimeSettings failed: {:?}, fallback to default",
  175. e
  176. );
  177. DateTimeSettingsPB::default()
  178. },
  179. };
  180. data_result_ok(setting)
  181. },
  182. }
  183. }
  184. const NOTIFICATION_SETTINGS_CACHE_KEY: &str = "notification_settings";
  185. #[tracing::instrument(level = "debug", skip_all, err)]
  186. pub async fn set_notification_settings(
  187. store_preferences: AFPluginState<Weak<StorePreferences>>,
  188. data: AFPluginData<NotificationSettingsPB>,
  189. ) -> Result<(), FlowyError> {
  190. let store_preferences = upgrade_store_preferences(store_preferences)?;
  191. let setting = data.into_inner();
  192. store_preferences.set_object(NOTIFICATION_SETTINGS_CACHE_KEY, setting)?;
  193. Ok(())
  194. }
  195. #[tracing::instrument(level = "debug", skip_all, err)]
  196. pub async fn get_notification_settings(
  197. store_preferences: AFPluginState<Weak<StorePreferences>>,
  198. ) -> DataResult<NotificationSettingsPB, FlowyError> {
  199. let store_preferences = upgrade_store_preferences(store_preferences)?;
  200. match store_preferences.get_str(NOTIFICATION_SETTINGS_CACHE_KEY) {
  201. None => data_result_ok(NotificationSettingsPB::default()),
  202. Some(s) => {
  203. let setting = match serde_json::from_str(&s) {
  204. Ok(setting) => setting,
  205. Err(e) => {
  206. tracing::error!(
  207. "Deserialize NotificationSettings failed: {:?}, fallback to default",
  208. e
  209. );
  210. NotificationSettingsPB::default()
  211. },
  212. };
  213. data_result_ok(setting)
  214. },
  215. }
  216. }
  217. #[tracing::instrument(level = "debug", skip_all, err)]
  218. pub async fn get_user_setting(
  219. manager: AFPluginState<Weak<UserManager>>,
  220. ) -> DataResult<UserSettingPB, FlowyError> {
  221. let manager = upgrade_manager(manager)?;
  222. let user_setting = manager.user_setting()?;
  223. data_result_ok(user_setting)
  224. }
  225. #[tracing::instrument(level = "debug", skip(data, manager), err)]
  226. pub async fn oauth_handler(
  227. data: AFPluginData<OauthSignInPB>,
  228. manager: AFPluginState<Weak<UserManager>>,
  229. ) -> DataResult<UserProfilePB, FlowyError> {
  230. let manager = upgrade_manager(manager)?;
  231. let params = data.into_inner();
  232. let auth_type: AuthType = params.auth_type.into();
  233. let user_profile = manager.sign_up(auth_type, BoxAny::new(params.map)).await?;
  234. data_result_ok(user_profile.into())
  235. }
  236. #[tracing::instrument(level = "debug", skip(data, manager), err)]
  237. pub async fn get_sign_in_url_handler(
  238. data: AFPluginData<SignInUrlPayloadPB>,
  239. manager: AFPluginState<Weak<UserManager>>,
  240. ) -> DataResult<SignInUrlPB, FlowyError> {
  241. let manager = upgrade_manager(manager)?;
  242. let params = data.into_inner();
  243. let auth_type: AuthType = params.auth_type.into();
  244. let sign_in_url = manager
  245. .generate_sign_in_url_with_email(&auth_type, &params.email)
  246. .await?;
  247. let resp = SignInUrlPB { sign_in_url };
  248. data_result_ok(resp)
  249. }
  250. #[tracing::instrument(level = "debug", skip_all, err)]
  251. pub async fn sign_in_with_provider_handler(
  252. data: AFPluginData<OauthProviderPB>,
  253. manager: AFPluginState<Weak<UserManager>>,
  254. ) -> DataResult<OauthProviderDataPB, FlowyError> {
  255. let manager = upgrade_manager(manager)?;
  256. tracing::debug!("Sign in with provider: {:?}", data.provider.as_str());
  257. let sign_in_url = manager.generate_oauth_url(data.provider.as_str()).await?;
  258. data_result_ok(OauthProviderDataPB {
  259. oauth_url: sign_in_url,
  260. })
  261. }
  262. #[tracing::instrument(level = "debug", skip_all, err)]
  263. pub async fn set_encrypt_secret_handler(
  264. manager: AFPluginState<Weak<UserManager>>,
  265. data: AFPluginData<UserSecretPB>,
  266. store_preferences: AFPluginState<Weak<StorePreferences>>,
  267. ) -> Result<(), FlowyError> {
  268. let manager = upgrade_manager(manager)?;
  269. let store_preferences = upgrade_store_preferences(store_preferences)?;
  270. let data = data.into_inner();
  271. match data.encryption_type {
  272. EncryptionTypePB::NoEncryption => {
  273. tracing::error!("Encryption type is NoEncryption, but set encrypt secret");
  274. },
  275. EncryptionTypePB::Symmetric => {
  276. manager.check_encryption_sign_with_secret(
  277. data.user_id,
  278. &data.encryption_sign,
  279. &data.encryption_secret,
  280. )?;
  281. let config = UserCloudConfig::new(data.encryption_secret).with_enable_encrypt(true);
  282. manager
  283. .set_encrypt_secret(
  284. data.user_id,
  285. config.encrypt_secret.clone(),
  286. EncryptionType::SelfEncryption(data.encryption_sign),
  287. )
  288. .await?;
  289. save_cloud_config(data.user_id, &store_preferences, config)?;
  290. },
  291. }
  292. manager.resume_sign_up().await?;
  293. Ok(())
  294. }
  295. #[tracing::instrument(level = "debug", skip_all, err)]
  296. pub async fn check_encrypt_secret_handler(
  297. manager: AFPluginState<Weak<UserManager>>,
  298. ) -> DataResult<UserEncryptionSecretCheckPB, FlowyError> {
  299. let manager = upgrade_manager(manager)?;
  300. let uid = manager.get_session()?.user_id;
  301. let profile = manager.get_user_profile(uid).await?;
  302. let is_need_secret = match profile.encryption_type {
  303. EncryptionType::NoEncryption => false,
  304. EncryptionType::SelfEncryption(sign) => {
  305. if sign.is_empty() {
  306. false
  307. } else {
  308. manager.check_encryption_sign(uid, &sign).is_err()
  309. }
  310. },
  311. };
  312. data_result_ok(UserEncryptionSecretCheckPB { is_need_secret })
  313. }
  314. #[tracing::instrument(level = "debug", skip_all, err)]
  315. pub async fn set_cloud_config_handler(
  316. manager: AFPluginState<Weak<UserManager>>,
  317. data: AFPluginData<UpdateCloudConfigPB>,
  318. store_preferences: AFPluginState<Weak<StorePreferences>>,
  319. ) -> Result<(), FlowyError> {
  320. let manager = upgrade_manager(manager)?;
  321. let session = manager.get_session()?;
  322. let update = data.into_inner();
  323. let store_preferences = upgrade_store_preferences(store_preferences)?;
  324. let mut config = get_cloud_config(session.user_id, &store_preferences)
  325. .ok_or(FlowyError::internal().with_context("Can't find any cloud config"))?;
  326. if let Some(enable_sync) = update.enable_sync {
  327. manager
  328. .cloud_services
  329. .set_enable_sync(session.user_id, enable_sync);
  330. config.enable_sync = enable_sync;
  331. }
  332. if let Some(enable_encrypt) = update.enable_encrypt {
  333. debug_assert!(enable_encrypt, "Disable encryption is not supported");
  334. if enable_encrypt {
  335. tracing::info!("Enable encryption for user: {}", session.user_id);
  336. config = config.with_enable_encrypt(enable_encrypt);
  337. let encrypt_secret = config.encrypt_secret.clone();
  338. // The encryption secret is generated when the user first enables encryption and will be
  339. // used to validate the encryption secret is correct when the user logs in.
  340. let encryption_sign = manager.generate_encryption_sign(session.user_id, &encrypt_secret)?;
  341. let encryption_type = EncryptionType::SelfEncryption(encryption_sign);
  342. manager
  343. .set_encrypt_secret(session.user_id, encrypt_secret, encryption_type.clone())
  344. .await?;
  345. save_cloud_config(session.user_id, &store_preferences, config.clone())?;
  346. let params =
  347. UpdateUserProfileParams::new(session.user_id).with_encryption_type(encryption_type);
  348. manager.update_user_profile(params).await?;
  349. }
  350. }
  351. let config_pb = UserCloudConfigPB::from(config);
  352. send_notification(
  353. &session.user_id.to_string(),
  354. UserNotification::DidUpdateCloudConfig,
  355. )
  356. .payload(config_pb)
  357. .send();
  358. Ok(())
  359. }
  360. #[tracing::instrument(level = "debug", skip_all, err)]
  361. pub async fn get_cloud_config_handler(
  362. manager: AFPluginState<Weak<UserManager>>,
  363. store_preferences: AFPluginState<Weak<StorePreferences>>,
  364. ) -> DataResult<UserCloudConfigPB, FlowyError> {
  365. let manager = upgrade_manager(manager)?;
  366. let session = manager.get_session()?;
  367. let store_preferences = upgrade_store_preferences(store_preferences)?;
  368. // Generate the default config if the config is not exist
  369. let config = get_or_create_cloud_config(session.user_id, &store_preferences);
  370. data_result_ok(config.into())
  371. }
  372. #[tracing::instrument(level = "debug", skip(manager), err)]
  373. pub async fn get_all_user_workspace_handler(
  374. manager: AFPluginState<Weak<UserManager>>,
  375. ) -> DataResult<RepeatedUserWorkspacePB, FlowyError> {
  376. let manager = upgrade_manager(manager)?;
  377. let uid = manager.get_session()?.user_id;
  378. let user_workspaces = manager.get_all_user_workspaces(uid)?;
  379. data_result_ok(user_workspaces.into())
  380. }
  381. #[tracing::instrument(level = "debug", skip(data, manager), err)]
  382. pub async fn open_workspace_handler(
  383. data: AFPluginData<UserWorkspacePB>,
  384. manager: AFPluginState<Weak<UserManager>>,
  385. ) -> Result<(), FlowyError> {
  386. let manager = upgrade_manager(manager)?;
  387. let params = data.into_inner();
  388. manager.open_workspace(&params.id).await?;
  389. Ok(())
  390. }
  391. #[tracing::instrument(level = "debug", skip(data, manager), err)]
  392. pub async fn add_user_to_workspace_handler(
  393. data: AFPluginData<AddWorkspaceUserPB>,
  394. manager: AFPluginState<Weak<UserManager>>,
  395. ) -> Result<(), FlowyError> {
  396. let manager = upgrade_manager(manager)?;
  397. let params = data.into_inner();
  398. manager
  399. .add_user_to_workspace(params.email, params.workspace_id)
  400. .await?;
  401. Ok(())
  402. }
  403. #[tracing::instrument(level = "debug", skip(data, manager), err)]
  404. pub async fn remove_user_from_workspace_handler(
  405. data: AFPluginData<RemoveWorkspaceUserPB>,
  406. manager: AFPluginState<Weak<UserManager>>,
  407. ) -> Result<(), FlowyError> {
  408. let manager = upgrade_manager(manager)?;
  409. let params = data.into_inner();
  410. manager
  411. .remove_user_to_workspace(params.email, params.workspace_id)
  412. .await?;
  413. Ok(())
  414. }
  415. #[tracing::instrument(level = "debug", skip(data, manager), err)]
  416. pub async fn update_network_state_handler(
  417. data: AFPluginData<NetworkStatePB>,
  418. manager: AFPluginState<Weak<UserManager>>,
  419. ) -> Result<(), FlowyError> {
  420. let manager = upgrade_manager(manager)?;
  421. let reachable = data.into_inner().ty.is_reachable();
  422. manager
  423. .user_status_callback
  424. .read()
  425. .await
  426. .did_update_network(reachable);
  427. Ok(())
  428. }
  429. #[tracing::instrument(level = "debug", skip_all, err)]
  430. pub async fn get_historical_users_handler(
  431. manager: AFPluginState<Weak<UserManager>>,
  432. ) -> DataResult<RepeatedHistoricalUserPB, FlowyError> {
  433. let manager = upgrade_manager(manager)?;
  434. let users = RepeatedHistoricalUserPB::from(manager.get_historical_users());
  435. data_result_ok(users)
  436. }
  437. #[tracing::instrument(level = "debug", skip_all, err)]
  438. pub async fn open_historical_users_handler(
  439. user: AFPluginData<HistoricalUserPB>,
  440. manager: AFPluginState<Weak<UserManager>>,
  441. ) -> Result<(), FlowyError> {
  442. let user = user.into_inner();
  443. let manager = upgrade_manager(manager)?;
  444. let auth_type = AuthType::from(user.auth_type);
  445. manager
  446. .open_historical_user(user.user_id, user.device_id, auth_type)
  447. .await?;
  448. Ok(())
  449. }
  450. pub async fn push_realtime_event_handler(
  451. payload: AFPluginData<RealtimePayloadPB>,
  452. manager: AFPluginState<Weak<UserManager>>,
  453. ) -> Result<(), FlowyError> {
  454. match serde_json::from_str::<Value>(&payload.into_inner().json_str) {
  455. Ok(json) => {
  456. let manager = upgrade_manager(manager)?;
  457. manager.receive_realtime_event(json).await;
  458. },
  459. Err(e) => {
  460. tracing::error!("Deserialize RealtimePayload failed: {:?}", e);
  461. },
  462. }
  463. Ok(())
  464. }
  465. #[tracing::instrument(level = "debug", skip_all, err)]
  466. pub async fn create_reminder_event_handler(
  467. data: AFPluginData<ReminderPB>,
  468. manager: AFPluginState<Weak<UserManager>>,
  469. ) -> Result<(), FlowyError> {
  470. let manager = upgrade_manager(manager)?;
  471. let params = data.into_inner();
  472. manager.add_reminder(params).await?;
  473. Ok(())
  474. }
  475. #[tracing::instrument(level = "debug", skip_all, err)]
  476. pub async fn get_all_reminder_event_handler(
  477. manager: AFPluginState<Weak<UserManager>>,
  478. ) -> DataResult<RepeatedReminderPB, FlowyError> {
  479. let manager = upgrade_manager(manager)?;
  480. let reminders = manager
  481. .get_all_reminders()
  482. .await
  483. .into_iter()
  484. .map(ReminderPB::from)
  485. .collect::<Vec<_>>();
  486. data_result_ok(reminders.into())
  487. }
  488. #[tracing::instrument(level = "debug", skip_all, err)]
  489. pub async fn reset_workspace_handler(
  490. data: AFPluginData<ResetWorkspacePB>,
  491. manager: AFPluginState<Weak<UserManager>>,
  492. ) -> Result<(), FlowyError> {
  493. let manager = upgrade_manager(manager)?;
  494. let reset_pb = data.into_inner();
  495. if reset_pb.workspace_id.is_empty() {
  496. return Err(FlowyError::new(
  497. ErrorCode::WorkspaceIdInvalid,
  498. "The workspace id is empty",
  499. ));
  500. }
  501. let session = manager.get_session()?;
  502. manager.reset_workspace(reset_pb, session.device_id).await?;
  503. Ok(())
  504. }
  505. #[tracing::instrument(level = "debug", skip_all, err)]
  506. pub async fn remove_reminder_event_handler(
  507. data: AFPluginData<ReminderIdentifierPB>,
  508. manager: AFPluginState<Weak<UserManager>>,
  509. ) -> Result<(), FlowyError> {
  510. let manager = upgrade_manager(manager)?;
  511. let params = data.into_inner();
  512. let _ = manager.remove_reminder(params.id.as_str()).await;
  513. Ok(())
  514. }
  515. #[tracing::instrument(level = "debug", skip_all, err)]
  516. pub async fn update_reminder_event_handler(
  517. data: AFPluginData<ReminderPB>,
  518. manager: AFPluginState<Weak<UserManager>>,
  519. ) -> Result<(), FlowyError> {
  520. let manager = upgrade_manager(manager)?;
  521. let params = data.into_inner();
  522. manager.update_reminder(params).await?;
  523. Ok(())
  524. }