user_session.rs 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. use crate::{
  2. entities::{SignInParams, SignUpParams, UpdateUserParams, UpdateUserRequest, UserDetail},
  3. errors::{ErrorBuilder, UserErrCode, UserError},
  4. event::UserEvent::*,
  5. services::{
  6. user::{construct_server, database::UserDB, UserServer},
  7. workspace::WorkspaceAction,
  8. },
  9. sql_tables::{UserTable, UserTableChangeset},
  10. };
  11. use bytes::Bytes;
  12. use flowy_database::{
  13. query_dsl::*,
  14. schema::{user_table, user_table::dsl},
  15. DBConnection,
  16. ExpressionMethods,
  17. UserDatabaseConnection,
  18. };
  19. use flowy_dispatch::prelude::{EventDispatch, ModuleRequest, ToBytes};
  20. use flowy_infra::kv::KVStore;
  21. use std::sync::{Arc, RwLock};
  22. const DEFAULT_WORKSPACE_NAME: &'static str = "My workspace";
  23. const DEFAULT_WORKSPACE_DESC: &'static str = "This is your first workspace";
  24. const DEFAULT_WORKSPACE: &'static str = "Default_Workspace";
  25. pub struct UserSessionConfig {
  26. root_dir: String,
  27. }
  28. impl UserSessionConfig {
  29. pub fn new(root_dir: &str) -> Self {
  30. Self {
  31. root_dir: root_dir.to_owned(),
  32. }
  33. }
  34. }
  35. pub struct UserSession {
  36. database: UserDB,
  37. config: UserSessionConfig,
  38. workspace: Arc<dyn WorkspaceAction + Send + Sync>,
  39. server: Arc<dyn UserServer + Send + Sync>,
  40. user_id: RwLock<Option<String>>,
  41. }
  42. impl UserSession {
  43. pub fn new<R>(config: UserSessionConfig, workspace: Arc<R>) -> Self
  44. where
  45. R: 'static + WorkspaceAction + Send + Sync,
  46. {
  47. let db = UserDB::new(&config.root_dir);
  48. let server = construct_server();
  49. Self {
  50. database: db,
  51. config,
  52. workspace,
  53. server,
  54. user_id: RwLock::new(None),
  55. }
  56. }
  57. pub fn get_db_connection(&self) -> Result<DBConnection, UserError> {
  58. let user_id = self.get_user_id()?;
  59. self.database.get_connection(&user_id)
  60. }
  61. pub async fn sign_in(&self, params: SignInParams) -> Result<UserTable, UserError> {
  62. let resp = self.server.sign_in(params).await?;
  63. let _ = self.set_user_id(Some(resp.uid.clone()))?;
  64. let user_table = self.save_user(resp.into()).await?;
  65. Ok(user_table)
  66. }
  67. pub async fn sign_up(&self, params: SignUpParams) -> Result<UserTable, UserError> {
  68. let resp = self.server.sign_up(params).await?;
  69. let _ = self.set_user_id(Some(resp.uid.clone()))?;
  70. let user_table = self.save_user(resp.into()).await?;
  71. Ok(user_table)
  72. }
  73. pub fn sign_out(&self) -> Result<(), UserError> {
  74. let user_id = self.get_user_id()?;
  75. let conn = self.get_db_connection()?;
  76. let _ = diesel::delete(dsl::user_table.filter(dsl::id.eq(&user_id))).execute(&*conn)?;
  77. let _ = self.server.sign_out(&user_id);
  78. let _ = self.database.close_user_db(&user_id)?;
  79. let _ = self.set_user_id(None)?;
  80. Ok(())
  81. }
  82. async fn save_user(&self, mut user: UserTable) -> Result<UserTable, UserError> {
  83. if user.workspace.is_empty() {
  84. log::info!("Try to create user default workspace");
  85. let workspace_id = self.create_default_workspace_if_need(&user.id).await?;
  86. user.workspace = workspace_id;
  87. }
  88. let conn = self.get_db_connection()?;
  89. let _ = diesel::insert_into(user_table::table)
  90. .values(user.clone())
  91. .execute(&*conn)?;
  92. Ok(user)
  93. }
  94. pub fn update_user(&self, params: UpdateUserParams) -> Result<UserDetail, UserError> {
  95. let changeset = UserTableChangeset::new(params);
  96. let conn = self.get_db_connection()?;
  97. diesel_update_table!(user_table, changeset, conn);
  98. let user_detail = self.user_detail()?;
  99. Ok(user_detail)
  100. }
  101. pub fn user_detail(&self) -> Result<UserDetail, UserError> {
  102. let user_id = self.get_user_id()?;
  103. let user = dsl::user_table
  104. .filter(user_table::id.eq(&user_id))
  105. .first::<UserTable>(&*(self.get_db_connection()?))?;
  106. let _ = self.server.get_user_info(&user_id);
  107. Ok(UserDetail::from(user))
  108. }
  109. pub fn set_user_id(&self, user_id: Option<String>) -> Result<(), UserError> {
  110. log::trace!("Set user id: {:?}", user_id);
  111. KVStore::set_str(USER_ID_CACHE_KEY, user_id.clone().unwrap_or("".to_owned()));
  112. match self.user_id.write() {
  113. Ok(mut write_guard) => {
  114. *write_guard = user_id;
  115. Ok(())
  116. },
  117. Err(e) => Err(ErrorBuilder::new(UserErrCode::WriteCurrentIdFailed)
  118. .error(e)
  119. .build()),
  120. }
  121. }
  122. pub fn get_user_dir(&self) -> Result<String, UserError> {
  123. let user_id = self.get_user_id()?;
  124. Ok(format!("{}/{}", self.config.root_dir, user_id))
  125. }
  126. pub fn get_user_id(&self) -> Result<String, UserError> {
  127. let mut user_id = {
  128. let read_guard = self.user_id.read().map_err(|e| {
  129. ErrorBuilder::new(UserErrCode::ReadCurrentIdFailed)
  130. .error(e)
  131. .build()
  132. })?;
  133. (*read_guard).clone()
  134. };
  135. if user_id.is_none() {
  136. user_id = KVStore::get_str(USER_ID_CACHE_KEY);
  137. let _ = self.set_user_id(user_id.clone())?;
  138. }
  139. match user_id {
  140. None => Err(ErrorBuilder::new(UserErrCode::UserNotLoginYet).build()),
  141. Some(user_id) => Ok(user_id),
  142. }
  143. }
  144. pub async fn set_current_workspace(&self, workspace_id: &str) -> Result<(), UserError> {
  145. let user_id = self.get_user_id()?;
  146. let payload: Bytes = UpdateUserRequest::new(&user_id)
  147. .workspace(workspace_id)
  148. .into_bytes()
  149. .unwrap();
  150. let request = ModuleRequest::new(UpdateUser).payload(payload);
  151. let _ = EventDispatch::async_send(request)
  152. .await
  153. .parse::<UserDetail, UserError>()
  154. .unwrap()?;
  155. Ok(())
  156. }
  157. async fn create_default_workspace_if_need(&self, user_id: &str) -> Result<String, UserError> {
  158. let key = format!("{}{}", user_id, DEFAULT_WORKSPACE);
  159. if KVStore::get_bool(&key).unwrap_or(false) {
  160. return Err(ErrorBuilder::new(UserErrCode::DefaultWorkspaceAlreadyExist).build());
  161. }
  162. KVStore::set_bool(&key, true);
  163. log::debug!("Create user:{} default workspace", user_id);
  164. let workspace_id = self
  165. .workspace
  166. .create_workspace(DEFAULT_WORKSPACE_NAME, DEFAULT_WORKSPACE_DESC, user_id)
  167. .await?;
  168. Ok(workspace_id)
  169. }
  170. }
  171. pub fn current_user_id() -> Result<String, UserError> {
  172. match KVStore::get_str(USER_ID_CACHE_KEY) {
  173. None => Err(ErrorBuilder::new(UserErrCode::UserNotLoginYet).build()),
  174. Some(user_id) => Ok(user_id),
  175. }
  176. }
  177. impl UserDatabaseConnection for UserSession {
  178. fn get_connection(&self) -> Result<DBConnection, String> {
  179. self.get_db_connection().map_err(|e| format!("{:?}", e))
  180. }
  181. }
  182. const USER_ID_CACHE_KEY: &str = "user_id";