123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338 |
- use std::str::FromStr;
- use std::sync::Arc;
- use anyhow::Error;
- use tokio::sync::oneshot::channel;
- use uuid::Uuid;
- use flowy_user_deps::cloud::*;
- use flowy_user_deps::entities::*;
- use flowy_user_deps::DEFAULT_USER_NAME;
- use lib_infra::box_any::BoxAny;
- use lib_infra::future::FutureResult;
- use crate::supabase::api::request::FetchObjectUpdateAction;
- use crate::supabase::api::util::{ExtendedResponse, InsertParamsBuilder};
- use crate::supabase::api::{PostgresWrapper, SupabaseServerService};
- use crate::supabase::define::*;
- use crate::supabase::entities::GetUserProfileParams;
- use crate::supabase::entities::UidResponse;
- use crate::supabase::entities::UserProfileResponse;
- pub struct SupabaseUserServiceImpl<T> {
- server: T,
- }
- impl<T> SupabaseUserServiceImpl<T> {
- pub fn new(server: T) -> Self {
- Self { server }
- }
- }
- impl<T> UserService for SupabaseUserServiceImpl<T>
- where
- T: SupabaseServerService,
- {
- fn sign_up(&self, params: BoxAny) -> FutureResult<SignUpResponse, Error> {
- let try_get_postgrest = self.server.try_get_postgrest();
- FutureResult::new(async move {
- let postgrest = try_get_postgrest?;
- let params = third_party_params_from_box_any(params)?;
- let is_new_user = postgrest
- .from(USER_TABLE)
- .select("uid")
- .eq("uuid", params.uuid.to_string())
- .execute()
- .await?
- .get_value::<Vec<UidResponse>>()
- .await?
- .is_empty();
- // Insert the user if it's a new user. After the user is inserted, we can query the user profile
- // and workspaces. The profile and workspaces are created by the database trigger.
- if is_new_user {
- let insert_params = InsertParamsBuilder::new()
- .insert(USER_UUID, params.uuid.to_string())
- .insert(USER_EMAIL, params.email)
- .build();
- let resp = postgrest
- .from(USER_TABLE)
- .insert(insert_params)
- .execute()
- .await?
- .success_with_body()
- .await?;
- tracing::debug!("Create user response: {:?}", resp);
- }
- // Query the user profile and workspaces
- tracing::debug!("user uuid: {}", params.uuid);
- let user_profile =
- get_user_profile(postgrest.clone(), GetUserProfileParams::Uuid(params.uuid))
- .await?
- .unwrap();
- let user_workspaces = get_user_workspaces(postgrest.clone(), user_profile.uid).await?;
- let latest_workspace = user_workspaces
- .iter()
- .find(|user_workspace| user_workspace.id == user_profile.latest_workspace_id)
- .cloned();
- let user_name = if user_profile.name.is_empty() {
- DEFAULT_USER_NAME()
- } else {
- user_profile.name
- };
- Ok(SignUpResponse {
- user_id: user_profile.uid,
- name: user_name,
- latest_workspace: latest_workspace.unwrap(),
- user_workspaces,
- is_new: is_new_user,
- email: Some(user_profile.email),
- token: None,
- device_id: params.device_id,
- })
- })
- }
- fn sign_in(&self, params: BoxAny) -> FutureResult<SignInResponse, Error> {
- let try_get_postgrest = self.server.try_get_postgrest();
- FutureResult::new(async move {
- let postgrest = try_get_postgrest?;
- let params = third_party_params_from_box_any(params)?;
- let uuid = params.uuid;
- let user_profile = get_user_profile(postgrest.clone(), GetUserProfileParams::Uuid(uuid))
- .await?
- .unwrap();
- let user_workspaces = get_user_workspaces(postgrest.clone(), user_profile.uid).await?;
- let latest_workspace = user_workspaces
- .iter()
- .find(|user_workspace| user_workspace.id == user_profile.latest_workspace_id)
- .cloned();
- Ok(SignInResponse {
- user_id: user_profile.uid,
- name: DEFAULT_USER_NAME(),
- latest_workspace: latest_workspace.unwrap(),
- user_workspaces,
- email: None,
- token: None,
- device_id: params.device_id,
- })
- })
- }
- fn sign_out(&self, _token: Option<String>) -> FutureResult<(), Error> {
- FutureResult::new(async { Ok(()) })
- }
- fn update_user(
- &self,
- _credential: UserCredentials,
- params: UpdateUserProfileParams,
- ) -> FutureResult<(), Error> {
- let try_get_postgrest = self.server.try_get_postgrest();
- FutureResult::new(async move {
- let postgrest = try_get_postgrest?;
- update_user_profile(postgrest, params).await?;
- Ok(())
- })
- }
- fn get_user_profile(
- &self,
- credential: UserCredentials,
- ) -> FutureResult<Option<UserProfile>, Error> {
- let try_get_postgrest = self.server.try_get_postgrest();
- let uid = credential
- .uid
- .ok_or(anyhow::anyhow!("uid is required"))
- .unwrap();
- FutureResult::new(async move {
- let postgrest = try_get_postgrest?;
- let user_profile_resp = get_user_profile(postgrest, GetUserProfileParams::Uid(uid)).await?;
- match user_profile_resp {
- None => Ok(None),
- Some(user_profile_resp) => Ok(Some(UserProfile {
- id: user_profile_resp.uid,
- email: user_profile_resp.email,
- name: user_profile_resp.name,
- token: "".to_string(),
- icon_url: "".to_string(),
- openai_key: "".to_string(),
- workspace_id: user_profile_resp.latest_workspace_id,
- auth_type: AuthType::Supabase,
- })),
- }
- })
- }
- fn get_user_workspaces(&self, uid: i64) -> FutureResult<Vec<UserWorkspace>, Error> {
- let try_get_postgrest = self.server.try_get_postgrest();
- FutureResult::new(async move {
- let postgrest = try_get_postgrest?;
- let user_workspaces = get_user_workspaces(postgrest, uid).await?;
- Ok(user_workspaces)
- })
- }
- fn check_user(&self, credential: UserCredentials) -> FutureResult<(), Error> {
- let try_get_postgrest = self.server.try_get_postgrest();
- let uuid = credential.uuid.and_then(|uuid| Uuid::from_str(&uuid).ok());
- let uid = credential.uid;
- FutureResult::new(async move {
- let postgrest = try_get_postgrest?;
- check_user(postgrest, uid, uuid).await?;
- Ok(())
- })
- }
- fn add_workspace_member(
- &self,
- _user_email: String,
- _workspace_id: String,
- ) -> FutureResult<(), Error> {
- todo!()
- }
- fn remove_workspace_member(
- &self,
- _user_email: String,
- _workspace_id: String,
- ) -> FutureResult<(), Error> {
- todo!()
- }
- fn get_user_awareness_updates(&self, uid: i64) -> FutureResult<Vec<Vec<u8>>, Error> {
- let try_get_postgrest = self.server.try_get_weak_postgrest();
- let awareness_id = uid.to_string();
- let (tx, rx) = channel();
- tokio::spawn(async move {
- tx.send(
- async move {
- let postgrest = try_get_postgrest?;
- let action =
- FetchObjectUpdateAction::new(awareness_id, CollabType::UserAwareness, postgrest);
- action.run_with_fix_interval(5, 10).await
- }
- .await,
- )
- });
- FutureResult::new(async { rx.await? })
- }
- }
- async fn get_user_profile(
- postgrest: Arc<PostgresWrapper>,
- params: GetUserProfileParams,
- ) -> Result<Option<UserProfileResponse>, Error> {
- let mut builder = postgrest
- .from(USER_PROFILE_VIEW)
- .select("uid, email, name, latest_workspace_id");
- match params {
- GetUserProfileParams::Uid(uid) => builder = builder.eq("uid", uid.to_string()),
- GetUserProfileParams::Uuid(uuid) => builder = builder.eq("uuid", uuid.to_string()),
- }
- let mut profiles = builder
- .execute()
- .await?
- .error_for_status()?
- .get_value::<Vec<UserProfileResponse>>()
- .await?;
- match profiles.len() {
- 0 => Ok(None),
- 1 => Ok(Some(profiles.swap_remove(0))),
- _ => unreachable!(),
- }
- }
- async fn get_user_workspaces(
- postgrest: Arc<PostgresWrapper>,
- uid: i64,
- ) -> Result<Vec<UserWorkspace>, Error> {
- postgrest
- .from(WORKSPACE_TABLE)
- .select("id:workspace_id, name:workspace_name, created_at, database_storage_id")
- .eq("owner_uid", uid.to_string())
- .execute()
- .await?
- .error_for_status()?
- .get_value::<Vec<UserWorkspace>>()
- .await
- }
- async fn update_user_profile(
- postgrest: Arc<PostgresWrapper>,
- params: UpdateUserProfileParams,
- ) -> Result<(), Error> {
- if params.is_empty() {
- anyhow::bail!("no params to update");
- }
- // check if user exists
- let exists = !postgrest
- .from(USER_TABLE)
- .select("uid")
- .eq("uid", params.id.to_string())
- .execute()
- .await?
- .error_for_status()?
- .get_value::<Vec<UidResponse>>()
- .await?
- .is_empty();
- if !exists {
- anyhow::bail!("user uid {} does not exist", params.id);
- }
- let mut update_params = serde_json::Map::new();
- if let Some(name) = params.name {
- update_params.insert("name".to_string(), serde_json::json!(name));
- }
- if let Some(email) = params.email {
- update_params.insert("email".to_string(), serde_json::json!(email));
- }
- let update_payload = serde_json::to_string(&update_params).unwrap();
- let resp = postgrest
- .from(USER_TABLE)
- .update(update_payload)
- .eq("uid", params.id.to_string())
- .execute()
- .await?
- .success_with_body()
- .await?;
- tracing::debug!("update user profile resp: {:?}", resp);
- Ok(())
- }
- async fn check_user(
- postgrest: Arc<PostgresWrapper>,
- uid: Option<i64>,
- uuid: Option<Uuid>,
- ) -> Result<(), Error> {
- let mut builder = postgrest.from(USER_TABLE);
- if let Some(uid) = uid {
- builder = builder.eq("uid", uid.to_string());
- } else if let Some(uuid) = uuid {
- builder = builder.eq("uuid", uuid.to_string());
- } else {
- anyhow::bail!("uid or uuid is required");
- }
- let exists = !builder
- .execute()
- .await?
- .error_for_status()?
- .get_value::<Vec<UidResponse>>()
- .await?
- .is_empty();
- if !exists {
- anyhow::bail!("user does not exist, uid: {:?}, uuid: {:?}", uid, uuid);
- }
- Ok(())
- }
|