event_handler.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430
  1. use std::sync::Weak;
  2. use std::{convert::TryInto, sync::Arc};
  3. use serde_json::Value;
  4. use flowy_error::{FlowyError, FlowyResult};
  5. use flowy_sqlite::kv::StorePreferences;
  6. use flowy_user_deps::cloud::UserCloudConfig;
  7. use flowy_user_deps::entities::*;
  8. use lib_dispatch::prelude::*;
  9. use lib_infra::box_any::BoxAny;
  10. use crate::entities::*;
  11. use crate::manager::UserManager;
  12. use crate::notification::{send_notification, UserNotification};
  13. use crate::services::cloud_config::{
  14. get_cloud_config, get_or_create_cloud_config, save_cloud_config,
  15. };
  16. fn upgrade_manager(manager: AFPluginState<Weak<UserManager>>) -> FlowyResult<Arc<UserManager>> {
  17. let manager = manager
  18. .upgrade()
  19. .ok_or(FlowyError::internal().context("The user session is already drop"))?;
  20. Ok(manager)
  21. }
  22. fn upgrade_store_preferences(
  23. store: AFPluginState<Weak<StorePreferences>>,
  24. ) -> FlowyResult<Arc<StorePreferences>> {
  25. let store = store
  26. .upgrade()
  27. .ok_or(FlowyError::internal().context("The store preferences is already drop"))?;
  28. Ok(store)
  29. }
  30. #[tracing::instrument(level = "debug", name = "sign_in", skip(data, manager), fields(email = %data.email), err)]
  31. pub async fn sign_in(
  32. data: AFPluginData<SignInPayloadPB>,
  33. manager: AFPluginState<Weak<UserManager>>,
  34. ) -> DataResult<UserProfilePB, FlowyError> {
  35. let manager = upgrade_manager(manager)?;
  36. let params: SignInParams = data.into_inner().try_into()?;
  37. let auth_type = params.auth_type.clone();
  38. let user_profile: UserProfilePB = manager
  39. .sign_in(BoxAny::new(params), auth_type)
  40. .await?
  41. .into();
  42. data_result_ok(user_profile)
  43. }
  44. #[tracing::instrument(
  45. level = "debug",
  46. name = "sign_up",
  47. skip(data, manager),
  48. fields(
  49. email = %data.email,
  50. name = %data.name,
  51. ),
  52. err
  53. )]
  54. pub async fn sign_up(
  55. data: AFPluginData<SignUpPayloadPB>,
  56. manager: AFPluginState<Weak<UserManager>>,
  57. ) -> DataResult<UserProfilePB, FlowyError> {
  58. let manager = upgrade_manager(manager)?;
  59. let params: SignUpParams = data.into_inner().try_into()?;
  60. let auth_type = params.auth_type.clone();
  61. let user_profile = manager.sign_up(auth_type, BoxAny::new(params)).await?;
  62. data_result_ok(user_profile.into())
  63. }
  64. #[tracing::instrument(level = "debug", skip(manager))]
  65. pub async fn init_user_handler(
  66. manager: AFPluginState<Weak<UserManager>>,
  67. ) -> Result<(), FlowyError> {
  68. let manager = upgrade_manager(manager)?;
  69. manager.init_user().await?;
  70. Ok(())
  71. }
  72. #[tracing::instrument(level = "debug", skip(manager))]
  73. pub async fn check_user_handler(
  74. manager: AFPluginState<Weak<UserManager>>,
  75. ) -> Result<(), FlowyError> {
  76. let manager = upgrade_manager(manager)?;
  77. manager.check_user().await?;
  78. Ok(())
  79. }
  80. #[tracing::instrument(level = "debug", skip(manager))]
  81. pub async fn get_user_profile_handler(
  82. manager: AFPluginState<Weak<UserManager>>,
  83. ) -> DataResult<UserProfilePB, FlowyError> {
  84. let manager = upgrade_manager(manager)?;
  85. let uid = manager.get_session()?.user_id;
  86. let user_profile: UserProfilePB = manager.get_user_profile(uid, true).await?.into();
  87. data_result_ok(user_profile)
  88. }
  89. #[tracing::instrument(level = "debug", skip(manager))]
  90. pub async fn sign_out(manager: AFPluginState<Weak<UserManager>>) -> Result<(), FlowyError> {
  91. let manager = upgrade_manager(manager)?;
  92. manager.sign_out().await?;
  93. Ok(())
  94. }
  95. #[tracing::instrument(level = "debug", skip(data, manager))]
  96. pub async fn update_user_profile_handler(
  97. data: AFPluginData<UpdateUserProfilePayloadPB>,
  98. manager: AFPluginState<Weak<UserManager>>,
  99. ) -> Result<(), FlowyError> {
  100. let manager = upgrade_manager(manager)?;
  101. let params: UpdateUserProfileParams = data.into_inner().try_into()?;
  102. manager.update_user_profile(params).await?;
  103. Ok(())
  104. }
  105. const APPEARANCE_SETTING_CACHE_KEY: &str = "appearance_settings";
  106. #[tracing::instrument(level = "debug", skip_all, err)]
  107. pub async fn set_appearance_setting(
  108. store_preferences: AFPluginState<Weak<StorePreferences>>,
  109. data: AFPluginData<AppearanceSettingsPB>,
  110. ) -> Result<(), FlowyError> {
  111. let store_preferences = upgrade_store_preferences(store_preferences)?;
  112. let mut setting = data.into_inner();
  113. if setting.theme.is_empty() {
  114. setting.theme = APPEARANCE_DEFAULT_THEME.to_string();
  115. }
  116. store_preferences.set_object(APPEARANCE_SETTING_CACHE_KEY, setting)?;
  117. Ok(())
  118. }
  119. #[tracing::instrument(level = "debug", skip_all, err)]
  120. pub async fn get_appearance_setting(
  121. store_preferences: AFPluginState<Weak<StorePreferences>>,
  122. ) -> DataResult<AppearanceSettingsPB, FlowyError> {
  123. let store_preferences = upgrade_store_preferences(store_preferences)?;
  124. match store_preferences.get_str(APPEARANCE_SETTING_CACHE_KEY) {
  125. None => data_result_ok(AppearanceSettingsPB::default()),
  126. Some(s) => {
  127. let setting = match serde_json::from_str(&s) {
  128. Ok(setting) => setting,
  129. Err(e) => {
  130. tracing::error!(
  131. "Deserialize AppearanceSettings failed: {:?}, fallback to default",
  132. e
  133. );
  134. AppearanceSettingsPB::default()
  135. },
  136. };
  137. data_result_ok(setting)
  138. },
  139. }
  140. }
  141. #[tracing::instrument(level = "debug", skip_all, err)]
  142. pub async fn get_user_setting(
  143. manager: AFPluginState<Weak<UserManager>>,
  144. ) -> DataResult<UserSettingPB, FlowyError> {
  145. let manager = upgrade_manager(manager)?;
  146. let user_setting = manager.user_setting()?;
  147. data_result_ok(user_setting)
  148. }
  149. /// Only used for third party auth.
  150. /// Use [UserEvent::SignIn] or [UserEvent::SignUp] If the [AuthType] is Local or SelfHosted
  151. #[tracing::instrument(level = "debug", skip(data, manager), err)]
  152. pub async fn third_party_auth_handler(
  153. data: AFPluginData<ThirdPartyAuthPB>,
  154. manager: AFPluginState<Weak<UserManager>>,
  155. ) -> DataResult<UserProfilePB, FlowyError> {
  156. let manager = upgrade_manager(manager)?;
  157. let params = data.into_inner();
  158. let auth_type: AuthType = params.auth_type.into();
  159. let user_profile = manager.sign_up(auth_type, BoxAny::new(params.map)).await?;
  160. data_result_ok(user_profile.into())
  161. }
  162. #[tracing::instrument(level = "debug", skip_all, err)]
  163. pub async fn set_encrypt_secret_handler(
  164. manager: AFPluginState<Weak<UserManager>>,
  165. data: AFPluginData<UserSecretPB>,
  166. store_preferences: AFPluginState<Weak<StorePreferences>>,
  167. ) -> Result<(), FlowyError> {
  168. let manager = upgrade_manager(manager)?;
  169. let store_preferences = upgrade_store_preferences(store_preferences)?;
  170. let data = data.into_inner();
  171. match data.encryption_type {
  172. EncryptionTypePB::NoEncryption => {
  173. tracing::error!("Encryption type is NoEncryption, but set encrypt secret");
  174. },
  175. EncryptionTypePB::Symmetric => {
  176. manager.check_encryption_sign_with_secret(
  177. data.user_id,
  178. &data.encryption_sign,
  179. &data.encryption_secret,
  180. )?;
  181. let config = UserCloudConfig::new(data.encryption_secret).with_enable_encrypt(true);
  182. manager
  183. .set_encrypt_secret(
  184. data.user_id,
  185. config.encrypt_secret.clone(),
  186. EncryptionType::SelfEncryption(data.encryption_sign),
  187. )
  188. .await?;
  189. save_cloud_config(data.user_id, &store_preferences, config)?;
  190. },
  191. }
  192. manager.resume_sign_up().await?;
  193. Ok(())
  194. }
  195. #[tracing::instrument(level = "debug", skip_all, err)]
  196. pub async fn check_encrypt_secret_handler(
  197. manager: AFPluginState<Weak<UserManager>>,
  198. ) -> DataResult<UserEncryptionSecretCheckPB, FlowyError> {
  199. let manager = upgrade_manager(manager)?;
  200. let uid = manager.get_session()?.user_id;
  201. let profile = manager.get_user_profile(uid, false).await?;
  202. let is_need_secret = match profile.encryption_type {
  203. EncryptionType::NoEncryption => false,
  204. EncryptionType::SelfEncryption(sign) => {
  205. if sign.is_empty() {
  206. false
  207. } else {
  208. manager.check_encryption_sign(uid, &sign).is_err()
  209. }
  210. },
  211. };
  212. data_result_ok(UserEncryptionSecretCheckPB { is_need_secret })
  213. }
  214. #[tracing::instrument(level = "debug", skip_all, err)]
  215. pub async fn set_cloud_config_handler(
  216. manager: AFPluginState<Weak<UserManager>>,
  217. data: AFPluginData<UpdateCloudConfigPB>,
  218. store_preferences: AFPluginState<Weak<StorePreferences>>,
  219. ) -> Result<(), FlowyError> {
  220. let manager = upgrade_manager(manager)?;
  221. let session = manager.get_session()?;
  222. let update = data.into_inner();
  223. let store_preferences = upgrade_store_preferences(store_preferences)?;
  224. let mut config = get_cloud_config(session.user_id, &store_preferences)
  225. .ok_or(FlowyError::internal().context("Can't find any cloud config"))?;
  226. if let Some(enable_sync) = update.enable_sync {
  227. manager.cloud_services.set_enable_sync(enable_sync);
  228. config.enable_sync = enable_sync;
  229. }
  230. if let Some(enable_encrypt) = update.enable_encrypt {
  231. debug_assert!(enable_encrypt, "Disable encryption is not supported");
  232. if enable_encrypt {
  233. tracing::info!("Enable encryption for user: {}", session.user_id);
  234. config = config.with_enable_encrypt(enable_encrypt);
  235. let encrypt_secret = config.encrypt_secret.clone();
  236. // The encryption secret is generated when the user first enables encryption and will be
  237. // used to validate the encryption secret is correct when the user logs in.
  238. let encryption_sign = manager.generate_encryption_sign(session.user_id, &encrypt_secret)?;
  239. let encryption_type = EncryptionType::SelfEncryption(encryption_sign);
  240. manager
  241. .set_encrypt_secret(session.user_id, encrypt_secret, encryption_type.clone())
  242. .await?;
  243. save_cloud_config(session.user_id, &store_preferences, config.clone())?;
  244. let params =
  245. UpdateUserProfileParams::new(session.user_id).with_encryption_type(encryption_type);
  246. manager.update_user_profile(params).await?;
  247. }
  248. }
  249. let config_pb = UserCloudConfigPB::from(config);
  250. send_notification(
  251. &session.user_id.to_string(),
  252. UserNotification::DidUpdateCloudConfig,
  253. )
  254. .payload(config_pb)
  255. .send();
  256. Ok(())
  257. }
  258. #[tracing::instrument(level = "debug", skip_all, err)]
  259. pub async fn get_cloud_config_handler(
  260. manager: AFPluginState<Weak<UserManager>>,
  261. store_preferences: AFPluginState<Weak<StorePreferences>>,
  262. ) -> DataResult<UserCloudConfigPB, FlowyError> {
  263. let manager = upgrade_manager(manager)?;
  264. let session = manager.get_session()?;
  265. let store_preferences = upgrade_store_preferences(store_preferences)?;
  266. // Generate the default config if the config is not exist
  267. let config = get_or_create_cloud_config(session.user_id, &store_preferences);
  268. data_result_ok(config.into())
  269. }
  270. #[tracing::instrument(level = "debug", skip(manager), err)]
  271. pub async fn get_all_user_workspace_handler(
  272. manager: AFPluginState<Weak<UserManager>>,
  273. ) -> DataResult<RepeatedUserWorkspacePB, FlowyError> {
  274. let manager = upgrade_manager(manager)?;
  275. let uid = manager.get_session()?.user_id;
  276. let user_workspaces = manager.get_all_user_workspaces(uid)?;
  277. data_result_ok(user_workspaces.into())
  278. }
  279. #[tracing::instrument(level = "debug", skip(data, manager), err)]
  280. pub async fn open_workspace_handler(
  281. data: AFPluginData<UserWorkspacePB>,
  282. manager: AFPluginState<Weak<UserManager>>,
  283. ) -> Result<(), FlowyError> {
  284. let manager = upgrade_manager(manager)?;
  285. let params = data.into_inner();
  286. manager.open_workspace(&params.id).await?;
  287. Ok(())
  288. }
  289. #[tracing::instrument(level = "debug", skip(data, manager), err)]
  290. pub async fn add_user_to_workspace_handler(
  291. data: AFPluginData<AddWorkspaceUserPB>,
  292. manager: AFPluginState<Weak<UserManager>>,
  293. ) -> Result<(), FlowyError> {
  294. let manager = upgrade_manager(manager)?;
  295. let params = data.into_inner();
  296. manager
  297. .add_user_to_workspace(params.email, params.workspace_id)
  298. .await?;
  299. Ok(())
  300. }
  301. #[tracing::instrument(level = "debug", skip(data, manager), err)]
  302. pub async fn remove_user_from_workspace_handler(
  303. data: AFPluginData<RemoveWorkspaceUserPB>,
  304. manager: AFPluginState<Weak<UserManager>>,
  305. ) -> Result<(), FlowyError> {
  306. let manager = upgrade_manager(manager)?;
  307. let params = data.into_inner();
  308. manager
  309. .remove_user_to_workspace(params.email, params.workspace_id)
  310. .await?;
  311. Ok(())
  312. }
  313. #[tracing::instrument(level = "debug", skip(data, manager), err)]
  314. pub async fn update_network_state_handler(
  315. data: AFPluginData<NetworkStatePB>,
  316. manager: AFPluginState<Weak<UserManager>>,
  317. ) -> Result<(), FlowyError> {
  318. let manager = upgrade_manager(manager)?;
  319. let reachable = data.into_inner().ty.is_reachable();
  320. manager
  321. .user_status_callback
  322. .read()
  323. .await
  324. .did_update_network(reachable);
  325. Ok(())
  326. }
  327. #[tracing::instrument(level = "debug", skip_all, err)]
  328. pub async fn get_historical_users_handler(
  329. manager: AFPluginState<Weak<UserManager>>,
  330. ) -> DataResult<RepeatedHistoricalUserPB, FlowyError> {
  331. let manager = upgrade_manager(manager)?;
  332. let users = RepeatedHistoricalUserPB::from(manager.get_historical_users());
  333. data_result_ok(users)
  334. }
  335. #[tracing::instrument(level = "debug", skip_all, err)]
  336. pub async fn open_historical_users_handler(
  337. user: AFPluginData<HistoricalUserPB>,
  338. manager: AFPluginState<Weak<UserManager>>,
  339. ) -> Result<(), FlowyError> {
  340. let user = user.into_inner();
  341. let manager = upgrade_manager(manager)?;
  342. let auth_type = AuthType::from(user.auth_type);
  343. manager
  344. .open_historical_user(user.user_id, user.device_id, auth_type)
  345. .await?;
  346. Ok(())
  347. }
  348. pub async fn push_realtime_event_handler(
  349. payload: AFPluginData<RealtimePayloadPB>,
  350. manager: AFPluginState<Weak<UserManager>>,
  351. ) -> Result<(), FlowyError> {
  352. match serde_json::from_str::<Value>(&payload.into_inner().json_str) {
  353. Ok(json) => {
  354. let manager = upgrade_manager(manager)?;
  355. manager.receive_realtime_event(json).await;
  356. },
  357. Err(e) => {
  358. tracing::error!("Deserialize RealtimePayload failed: {:?}", e);
  359. },
  360. }
  361. Ok(())
  362. }
  363. #[tracing::instrument(level = "debug", skip_all, err)]
  364. pub async fn create_reminder_event_handler(
  365. data: AFPluginData<ReminderPB>,
  366. manager: AFPluginState<Weak<UserManager>>,
  367. ) -> Result<(), FlowyError> {
  368. let manager = upgrade_manager(manager)?;
  369. let params = data.into_inner();
  370. manager.add_reminder(params).await?;
  371. Ok(())
  372. }
  373. #[tracing::instrument(level = "debug", skip_all, err)]
  374. pub async fn get_all_reminder_event_handler(
  375. manager: AFPluginState<Weak<UserManager>>,
  376. ) -> DataResult<RepeatedReminderPB, FlowyError> {
  377. let manager = upgrade_manager(manager)?;
  378. let reminders = manager
  379. .get_all_reminders()
  380. .await
  381. .into_iter()
  382. .map(ReminderPB::from)
  383. .collect::<Vec<_>>();
  384. data_result_ok(reminders.into())
  385. }