user_session.rs 7.7 KB


  1. use crate::{
  2. entities::{SignInParams, SignUpParams, UpdateUserParams, UserDetail},
  3. errors::{ErrorBuilder, ErrorCode, UserError},
  4. services::user::{construct_user_server, database::UserDB, UserServerAPI},
  5. sql_tables::{UserTable, UserTableChangeset},
  6. };
  7. use flowy_database::{
  8. query_dsl::*,
  9. schema::{user_table, user_table::dsl},
  10. DBConnection,
  11. ExpressionMethods,
  12. UserDatabaseConnection,
  13. };
  14. use crate::{entities::UserToken, services::server::Server};
  15. use flowy_infra::kv::KVStore;
  16. use flowy_sqlite::ConnectionPool;
  17. use parking_lot::RwLock;
  18. use serde::{Deserialize, Serialize};
  19. use serde_json::Error;
  20. use std::sync::Arc;
  21. pub struct UserSessionConfig {
  22. root_dir: String,
  23. }
  24. impl UserSessionConfig {
  25. pub fn new(root_dir: &str) -> Self {
  26. Self {
  27. root_dir: root_dir.to_owned(),
  28. }
  29. }
  30. }
  31. pub struct UserSession {
  32. database: UserDB,
  33. config: UserSessionConfig,
  34. #[allow(dead_code)]
  35. server: Server,
  36. session: RwLock<Option<Session>>,
  37. }
  38. impl UserSession {
  39. pub fn new(config: UserSessionConfig) -> Self {
  40. let db = UserDB::new(&config.root_dir);
  41. let server = construct_user_server();
  42. Self {
  43. database: db,
  44. config,
  45. server,
  46. session: RwLock::new(None),
  47. }
  48. }
  49. pub fn get_db_connection(&self) -> Result<DBConnection, UserError> {
  50. let user_id = self.get_session()?.user_id;
  51. self.database.get_connection(&user_id)
  52. }
  53. // The caller will be not 'Sync' before of the return value,
  54. // PooledConnection<ConnectionManager> is not sync. You can use
  55. // db_connection_pool function to require the ConnectionPool that is 'Sync'.
  56. //
  57. // let pool = self.db_connection_pool()?;
  58. // let conn: PooledConnection<ConnectionManager> = pool.get()?;
  59. pub fn db_connection_pool(&self) -> Result<Arc<ConnectionPool>, UserError> {
  60. let user_id = self.get_session()?.user_id;
  61. self.database.get_pool(&user_id)
  62. }
  63. pub async fn sign_in(&self, params: SignInParams) -> Result<UserTable, UserError> {
  64. let resp = self.server.sign_in(params).await?;
  65. let session = Session::new(&resp.uid, &resp.token);
  66. let _ = self.set_session(Some(session))?;
  67. let user_table = self.save_user(resp.into()).await?;
  68. Ok(user_table)
  69. }
  70. pub async fn sign_up(&self, params: SignUpParams) -> Result<UserTable, UserError> {
  71. let resp = self.server.sign_up(params).await?;
  72. let session = Session::new(&resp.uid, &resp.token);
  73. let _ = self.set_session(Some(session))?;
  74. let user_table = self.save_user(resp.into()).await?;
  75. Ok(user_table)
  76. }
  77. pub async fn sign_out(&self) -> Result<(), UserError> {
  78. let session = self.get_session()?;
  79. match self.server.sign_out(&session.token).await {
  80. Ok(_) => {},
  81. Err(e) => log::error!("Sign out failed: {:?}", e),
  82. }
  83. let conn = self.get_db_connection()?;
  84. let _ =
  85. diesel::delete(dsl::user_table.filter(dsl::id.eq(&session.user_id))).execute(&*conn)?;
  86. let _ = self.server.sign_out(&session.token);
  87. let _ = self.database.close_user_db(&session.user_id)?;
  88. let _ = self.set_session(None)?;
  89. Ok(())
  90. }
  91. pub async fn update_user(&self, params: UpdateUserParams) -> Result<(), UserError> {
  92. let changeset = UserTableChangeset::new(params);
  93. let conn = self.get_db_connection()?;
  94. diesel_update_table!(user_table, changeset, conn);
  95. Ok(())
  96. }
  97. pub async fn user_detail(&self) -> Result<UserDetail, UserError> {
  98. let user_id = self.get_session()?.user_id;
  99. let user = dsl::user_table
  100. .filter(user_table::id.eq(&user_id))
  101. .first::<UserTable>(&*(self.get_db_connection()?))?;
  102. let server = self.server.clone();
  103. let token = user.token.clone();
  104. tokio::spawn(async move {
  105. match server.get_user_detail(&token).await {
  106. Ok(user_detail) => {
  107. //
  108. log::info!("{:?}", user_detail);
  109. },
  110. Err(e) => {
  111. //
  112. log::info!("{:?}", e);
  113. },
  114. }
  115. })
  116. .await;
  117. Ok(UserDetail::from(user))
  118. }
  119. pub fn user_dir(&self) -> Result<String, UserError> {
  120. let session = self.get_session()?;
  121. Ok(format!("{}/{}", self.config.root_dir, session.user_id))
  122. }
  123. pub fn user_id(&self) -> Result<String, UserError> { Ok(self.get_session()?.user_id) }
  124. // pub fn user_token(&self) -> Result<String, UserError> {
  125. // let user_detail = self.user_detail()?;
  126. // Ok(user_detail.token)
  127. // }
  128. }
  129. impl UserSession {
  130. async fn save_user(&self, user: UserTable) -> Result<UserTable, UserError> {
  131. let conn = self.get_db_connection()?;
  132. let _ = diesel::insert_into(user_table::table)
  133. .values(user.clone())
  134. .execute(&*conn)?;
  135. Ok(user)
  136. }
  137. fn set_session(&self, session: Option<Session>) -> Result<(), UserError> {
  138. log::trace!("Update user session: {:?}", session);
  139. match &session {
  140. None => KVStore::set_str(SESSION_CACHE_KEY, "".to_string()),
  141. Some(session) => KVStore::set_str(SESSION_CACHE_KEY, session.clone().into()),
  142. }
  143. let mut write_guard = self.session.write();
  144. *write_guard = session;
  145. Ok(())
  146. }
  147. fn get_session(&self) -> Result<Session, UserError> {
  148. let mut session = {
  149. let read_guard = self.session.read();
  150. (*read_guard).clone()
  151. };
  152. if session.is_none() {
  153. match KVStore::get_str(SESSION_CACHE_KEY) {
  154. None => {},
  155. Some(s) => {
  156. session = Some(Session::from(s));
  157. let _ = self.set_session(session.clone())?;
  158. },
  159. }
  160. }
  161. match session {
  162. None => Err(ErrorBuilder::new(ErrorCode::UserNotLoginYet).build()),
  163. Some(session) => Ok(session),
  164. }
  165. }
  166. }
  167. pub async fn update_user(
  168. server: Server,
  169. pool: Arc<ConnectionPool>,
  170. params: UpdateUserParams,
  171. ) -> Result<(), UserError> {
  172. let changeset = UserTableChangeset::new(params);
  173. let conn = pool.get()?;
  174. diesel_update_table!(user_table, changeset, conn);
  175. Ok(())
  176. }
  177. pub fn current_user_id() -> Result<String, UserError> {
  178. match KVStore::get_str(SESSION_CACHE_KEY) {
  179. None => Err(ErrorBuilder::new(ErrorCode::UserNotLoginYet).build()),
  180. Some(user_id) => Ok(user_id),
  181. }
  182. }
  183. impl UserDatabaseConnection for UserSession {
  184. fn get_connection(&self) -> Result<DBConnection, String> {
  185. self.get_db_connection().map_err(|e| format!("{:?}", e))
  186. }
  187. }
  188. const SESSION_CACHE_KEY: &str = "session_cache_key";
  189. #[derive(Debug, Clone, Default, Serialize, Deserialize)]
  190. struct Session {
  191. user_id: String,
  192. token: String,
  193. }
  194. impl Session {
  195. pub fn new(user_id: &str, token: &str) -> Self {
  196. Self {
  197. user_id: user_id.to_owned(),
  198. token: token.to_owned(),
  199. }
  200. }
  201. }
  202. impl std::convert::From<String> for Session {
  203. fn from(s: String) -> Self {
  204. match serde_json::from_str(&s) {
  205. Ok(s) => s,
  206. Err(e) => {
  207. log::error!("Deserialize string to Session failed: {:?}", e);
  208. Session::default()
  209. },
  210. }
  211. }
  212. }
  213. impl std::convert::Into<String> for Session {
  214. fn into(self) -> String {
  215. match serde_json::to_string(&self) {
  216. Ok(s) => s,
  217. Err(e) => {
  218. log::error!("Serialize session to string failed: {:?}", e);
  219. "".to_string()
  220. },
  221. }
  222. }
  223. }