user.rs 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. use std::str::FromStr;
  2. use std::sync::Arc;
  3. use anyhow::Error;
  4. use tokio::sync::oneshot::channel;
  5. use uuid::Uuid;
  6. use flowy_user_deps::cloud::*;
  7. use flowy_user_deps::entities::*;
  8. use flowy_user_deps::DEFAULT_USER_NAME;
  9. use lib_infra::box_any::BoxAny;
  10. use lib_infra::future::FutureResult;
  11. use crate::supabase::api::request::FetchObjectUpdateAction;
  12. use crate::supabase::api::util::{ExtendedResponse, InsertParamsBuilder};
  13. use crate::supabase::api::{PostgresWrapper, SupabaseServerService};
  14. use crate::supabase::define::*;
  15. use crate::supabase::entities::GetUserProfileParams;
  16. use crate::supabase::entities::UidResponse;
  17. use crate::supabase::entities::UserProfileResponse;
  18. pub struct SupabaseUserServiceImpl<T> {
  19. server: T,
  20. }
  21. impl<T> SupabaseUserServiceImpl<T> {
  22. pub fn new(server: T) -> Self {
  23. Self { server }
  24. }
  25. }
  26. impl<T> UserService for SupabaseUserServiceImpl<T>
  27. where
  28. T: SupabaseServerService,
  29. {
  30. fn sign_up(&self, params: BoxAny) -> FutureResult<SignUpResponse, Error> {
  31. let try_get_postgrest = self.server.try_get_postgrest();
  32. FutureResult::new(async move {
  33. let postgrest = try_get_postgrest?;
  34. let params = third_party_params_from_box_any(params)?;
  35. let is_new_user = postgrest
  36. .from(USER_TABLE)
  37. .select("uid")
  38. .eq("uuid", params.uuid.to_string())
  39. .execute()
  40. .await?
  41. .get_value::<Vec<UidResponse>>()
  42. .await?
  43. .is_empty();
  44. // Insert the user if it's a new user. After the user is inserted, we can query the user profile
  45. // and workspaces. The profile and workspaces are created by the database trigger.
  46. if is_new_user {
  47. let insert_params = InsertParamsBuilder::new()
  48. .insert(USER_UUID, params.uuid.to_string())
  49. .insert(USER_EMAIL, params.email)
  50. .build();
  51. let resp = postgrest
  52. .from(USER_TABLE)
  53. .insert(insert_params)
  54. .execute()
  55. .await?
  56. .success_with_body()
  57. .await?;
  58. tracing::debug!("Create user response: {:?}", resp);
  59. }
  60. // Query the user profile and workspaces
  61. tracing::debug!("user uuid: {}", params.uuid);
  62. let user_profile =
  63. get_user_profile(postgrest.clone(), GetUserProfileParams::Uuid(params.uuid))
  64. .await?
  65. .unwrap();
  66. let user_workspaces = get_user_workspaces(postgrest.clone(), user_profile.uid).await?;
  67. let latest_workspace = user_workspaces
  68. .iter()
  69. .find(|user_workspace| user_workspace.id == user_profile.latest_workspace_id)
  70. .cloned();
  71. let user_name = if user_profile.name.is_empty() {
  72. DEFAULT_USER_NAME()
  73. } else {
  74. user_profile.name
  75. };
  76. Ok(SignUpResponse {
  77. user_id: user_profile.uid,
  78. name: user_name,
  79. latest_workspace: latest_workspace.unwrap(),
  80. user_workspaces,
  81. is_new: is_new_user,
  82. email: Some(user_profile.email),
  83. token: None,
  84. device_id: params.device_id,
  85. })
  86. })
  87. }
  88. fn sign_in(&self, params: BoxAny) -> FutureResult<SignInResponse, Error> {
  89. let try_get_postgrest = self.server.try_get_postgrest();
  90. FutureResult::new(async move {
  91. let postgrest = try_get_postgrest?;
  92. let params = third_party_params_from_box_any(params)?;
  93. let uuid = params.uuid;
  94. let user_profile = get_user_profile(postgrest.clone(), GetUserProfileParams::Uuid(uuid))
  95. .await?
  96. .unwrap();
  97. let user_workspaces = get_user_workspaces(postgrest.clone(), user_profile.uid).await?;
  98. let latest_workspace = user_workspaces
  99. .iter()
  100. .find(|user_workspace| user_workspace.id == user_profile.latest_workspace_id)
  101. .cloned();
  102. Ok(SignInResponse {
  103. user_id: user_profile.uid,
  104. name: DEFAULT_USER_NAME(),
  105. latest_workspace: latest_workspace.unwrap(),
  106. user_workspaces,
  107. email: None,
  108. token: None,
  109. device_id: params.device_id,
  110. })
  111. })
  112. }
  113. fn sign_out(&self, _token: Option<String>) -> FutureResult<(), Error> {
  114. FutureResult::new(async { Ok(()) })
  115. }
  116. fn update_user(
  117. &self,
  118. _credential: UserCredentials,
  119. params: UpdateUserProfileParams,
  120. ) -> FutureResult<(), Error> {
  121. let try_get_postgrest = self.server.try_get_postgrest();
  122. FutureResult::new(async move {
  123. let postgrest = try_get_postgrest?;
  124. update_user_profile(postgrest, params).await?;
  125. Ok(())
  126. })
  127. }
  128. fn get_user_profile(
  129. &self,
  130. credential: UserCredentials,
  131. ) -> FutureResult<Option<UserProfile>, Error> {
  132. let try_get_postgrest = self.server.try_get_postgrest();
  133. let uid = credential
  134. .uid
  135. .ok_or(anyhow::anyhow!("uid is required"))
  136. .unwrap();
  137. FutureResult::new(async move {
  138. let postgrest = try_get_postgrest?;
  139. let user_profile_resp = get_user_profile(postgrest, GetUserProfileParams::Uid(uid)).await?;
  140. match user_profile_resp {
  141. None => Ok(None),
  142. Some(user_profile_resp) => Ok(Some(UserProfile {
  143. id: user_profile_resp.uid,
  144. email: user_profile_resp.email,
  145. name: user_profile_resp.name,
  146. token: "".to_string(),
  147. icon_url: "".to_string(),
  148. openai_key: "".to_string(),
  149. workspace_id: user_profile_resp.latest_workspace_id,
  150. auth_type: AuthType::Supabase,
  151. })),
  152. }
  153. })
  154. }
  155. fn get_user_workspaces(&self, uid: i64) -> FutureResult<Vec<UserWorkspace>, Error> {
  156. let try_get_postgrest = self.server.try_get_postgrest();
  157. FutureResult::new(async move {
  158. let postgrest = try_get_postgrest?;
  159. let user_workspaces = get_user_workspaces(postgrest, uid).await?;
  160. Ok(user_workspaces)
  161. })
  162. }
  163. fn check_user(&self, credential: UserCredentials) -> FutureResult<(), Error> {
  164. let try_get_postgrest = self.server.try_get_postgrest();
  165. let uuid = credential.uuid.and_then(|uuid| Uuid::from_str(&uuid).ok());
  166. let uid = credential.uid;
  167. FutureResult::new(async move {
  168. let postgrest = try_get_postgrest?;
  169. check_user(postgrest, uid, uuid).await?;
  170. Ok(())
  171. })
  172. }
  173. fn add_workspace_member(
  174. &self,
  175. _user_email: String,
  176. _workspace_id: String,
  177. ) -> FutureResult<(), Error> {
  178. todo!()
  179. }
  180. fn remove_workspace_member(
  181. &self,
  182. _user_email: String,
  183. _workspace_id: String,
  184. ) -> FutureResult<(), Error> {
  185. todo!()
  186. }
  187. fn get_user_awareness_updates(&self, uid: i64) -> FutureResult<Vec<Vec<u8>>, Error> {
  188. let try_get_postgrest = self.server.try_get_weak_postgrest();
  189. let awareness_id = uid.to_string();
  190. let (tx, rx) = channel();
  191. tokio::spawn(async move {
  192. tx.send(
  193. async move {
  194. let postgrest = try_get_postgrest?;
  195. let action =
  196. FetchObjectUpdateAction::new(awareness_id, CollabType::UserAwareness, postgrest);
  197. action.run_with_fix_interval(5, 10).await
  198. }
  199. .await,
  200. )
  201. });
  202. FutureResult::new(async { rx.await? })
  203. }
  204. }
  205. async fn get_user_profile(
  206. postgrest: Arc<PostgresWrapper>,
  207. params: GetUserProfileParams,
  208. ) -> Result<Option<UserProfileResponse>, Error> {
  209. let mut builder = postgrest
  210. .from(USER_PROFILE_VIEW)
  211. .select("uid, email, name, latest_workspace_id");
  212. match params {
  213. GetUserProfileParams::Uid(uid) => builder = builder.eq("uid", uid.to_string()),
  214. GetUserProfileParams::Uuid(uuid) => builder = builder.eq("uuid", uuid.to_string()),
  215. }
  216. let mut profiles = builder
  217. .execute()
  218. .await?
  219. .error_for_status()?
  220. .get_value::<Vec<UserProfileResponse>>()
  221. .await?;
  222. match profiles.len() {
  223. 0 => Ok(None),
  224. 1 => Ok(Some(profiles.swap_remove(0))),
  225. _ => unreachable!(),
  226. }
  227. }
  228. async fn get_user_workspaces(
  229. postgrest: Arc<PostgresWrapper>,
  230. uid: i64,
  231. ) -> Result<Vec<UserWorkspace>, Error> {
  232. postgrest
  233. .from(WORKSPACE_TABLE)
  234. .select("id:workspace_id, name:workspace_name, created_at, database_storage_id")
  235. .eq("owner_uid", uid.to_string())
  236. .execute()
  237. .await?
  238. .error_for_status()?
  239. .get_value::<Vec<UserWorkspace>>()
  240. .await
  241. }
  242. async fn update_user_profile(
  243. postgrest: Arc<PostgresWrapper>,
  244. params: UpdateUserProfileParams,
  245. ) -> Result<(), Error> {
  246. if params.is_empty() {
  247. anyhow::bail!("no params to update");
  248. }
  249. // check if user exists
  250. let exists = !postgrest
  251. .from(USER_TABLE)
  252. .select("uid")
  253. .eq("uid", params.id.to_string())
  254. .execute()
  255. .await?
  256. .error_for_status()?
  257. .get_value::<Vec<UidResponse>>()
  258. .await?
  259. .is_empty();
  260. if !exists {
  261. anyhow::bail!("user uid {} does not exist", params.id);
  262. }
  263. let mut update_params = serde_json::Map::new();
  264. if let Some(name) = params.name {
  265. update_params.insert("name".to_string(), serde_json::json!(name));
  266. }
  267. if let Some(email) = params.email {
  268. update_params.insert("email".to_string(), serde_json::json!(email));
  269. }
  270. let update_payload = serde_json::to_string(&update_params).unwrap();
  271. let resp = postgrest
  272. .from(USER_TABLE)
  273. .update(update_payload)
  274. .eq("uid", params.id.to_string())
  275. .execute()
  276. .await?
  277. .success_with_body()
  278. .await?;
  279. tracing::debug!("update user profile resp: {:?}", resp);
  280. Ok(())
  281. }
  282. async fn check_user(
  283. postgrest: Arc<PostgresWrapper>,
  284. uid: Option<i64>,
  285. uuid: Option<Uuid>,
  286. ) -> Result<(), Error> {
  287. let mut builder = postgrest.from(USER_TABLE);
  288. if let Some(uid) = uid {
  289. builder = builder.eq("uid", uid.to_string());
  290. } else if let Some(uuid) = uuid {
  291. builder = builder.eq("uuid", uuid.to_string());
  292. } else {
  293. anyhow::bail!("uid or uuid is required");
  294. }
  295. let exists = !builder
  296. .execute()
  297. .await?
  298. .error_for_status()?
  299. .get_value::<Vec<UidResponse>>()
  300. .await?
  301. .is_empty();
  302. if !exists {
  303. anyhow::bail!("user does not exist, uid: {:?}, uuid: {:?}", uid, uuid);
  304. }
  305. Ok(())
  306. }