database.rs 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. use crate::entities::{SignInResponse, SignUpResponse, UpdateUserProfileParams, UserProfilePB};
  2. use flowy_database::ConnectionPool;
  3. use flowy_database::{schema::user_table, DBConnection, Database};
  4. use flowy_error::{ErrorCode, FlowyError};
  5. use lazy_static::lazy_static;
  6. use parking_lot::RwLock;
  7. use std::path::PathBuf;
  8. use std::{collections::HashMap, sync::Arc, time::Duration};
  9. pub struct UserDB {
  10. db_dir: String,
  11. }
  12. impl UserDB {
  13. pub fn new(db_dir: &str) -> Self {
  14. Self {
  15. db_dir: db_dir.to_owned(),
  16. }
  17. }
  18. #[tracing::instrument(level = "trace", skip(self))]
  19. fn open_user_db_if_need(&self, user_id: &str) -> Result<Arc<ConnectionPool>, FlowyError> {
  20. if user_id.is_empty() {
  21. return Err(ErrorCode::UserIdIsEmpty.into());
  22. }
  23. if let Some(database) = DB_MAP.read().get(user_id) {
  24. return Ok(database.get_pool());
  25. }
  26. let mut write_guard = DB_MAP.write();
  27. // The Write guard acquire exclusive access that will guarantee the user db only initialize once.
  28. match write_guard.get(user_id) {
  29. None => {}
  30. Some(database) => return Ok(database.get_pool()),
  31. }
  32. let mut dir = PathBuf::new();
  33. dir.push(&self.db_dir);
  34. dir.push(user_id);
  35. let dir = dir.to_str().unwrap().to_owned();
  36. tracing::trace!("open user db {} at path: {}", user_id, dir);
  37. let db = flowy_database::init(&dir).map_err(|e| {
  38. log::error!("open user: {} db failed, {:?}", user_id, e);
  39. FlowyError::internal().context(e)
  40. })?;
  41. let pool = db.get_pool();
  42. write_guard.insert(user_id.to_owned(), db);
  43. drop(write_guard);
  44. Ok(pool)
  45. }
  46. pub(crate) fn close_user_db(&self, user_id: &str) -> Result<(), FlowyError> {
  47. match DB_MAP.try_write_for(Duration::from_millis(300)) {
  48. None => Err(FlowyError::internal().context("Acquire write lock to close user db failed")),
  49. Some(mut write_guard) => {
  50. write_guard.remove(user_id);
  51. Ok(())
  52. }
  53. }
  54. }
  55. pub(crate) fn get_connection(&self, user_id: &str) -> Result<DBConnection, FlowyError> {
  56. let conn = self.get_pool(user_id)?.get()?;
  57. Ok(conn)
  58. }
  59. pub(crate) fn get_pool(&self, user_id: &str) -> Result<Arc<ConnectionPool>, FlowyError> {
  60. let pool = self.open_user_db_if_need(user_id)?;
  61. Ok(pool)
  62. }
  63. }
  64. lazy_static! {
  65. static ref DB_MAP: RwLock<HashMap<String, Database>> = RwLock::new(HashMap::new());
  66. }
  67. #[derive(Clone, Default, Queryable, Identifiable, Insertable)]
  68. #[table_name = "user_table"]
  69. pub struct UserTable {
  70. pub(crate) id: String,
  71. pub(crate) name: String,
  72. pub(crate) token: String,
  73. pub(crate) email: String,
  74. pub(crate) workspace: String, // deprecated
  75. pub(crate) icon_url: String,
  76. }
  77. impl UserTable {
  78. pub fn new(id: String, name: String, email: String, token: String) -> Self {
  79. Self {
  80. id,
  81. name,
  82. email,
  83. token,
  84. icon_url: "".to_owned(),
  85. workspace: "".to_owned(),
  86. }
  87. }
  88. pub fn set_workspace(mut self, workspace: String) -> Self {
  89. self.workspace = workspace;
  90. self
  91. }
  92. }
  93. impl std::convert::From<SignUpResponse> for UserTable {
  94. fn from(resp: SignUpResponse) -> Self {
  95. UserTable::new(resp.user_id, resp.name, resp.email, resp.token)
  96. }
  97. }
  98. impl std::convert::From<SignInResponse> for UserTable {
  99. fn from(resp: SignInResponse) -> Self {
  100. UserTable::new(resp.user_id, resp.name, resp.email, resp.token)
  101. }
  102. }
  103. impl std::convert::From<UserTable> for UserProfilePB {
  104. fn from(table: UserTable) -> Self {
  105. UserProfilePB {
  106. id: table.id,
  107. email: table.email,
  108. name: table.name,
  109. token: table.token,
  110. icon_url: table.icon_url,
  111. }
  112. }
  113. }
  114. #[derive(AsChangeset, Identifiable, Default, Debug)]
  115. #[table_name = "user_table"]
  116. pub struct UserTableChangeset {
  117. pub id: String,
  118. pub workspace: Option<String>, // deprecated
  119. pub name: Option<String>,
  120. pub email: Option<String>,
  121. pub icon_url: Option<String>,
  122. }
  123. impl UserTableChangeset {
  124. pub fn new(params: UpdateUserProfileParams) -> Self {
  125. UserTableChangeset {
  126. id: params.id,
  127. workspace: None,
  128. name: params.name,
  129. email: params.email,
  130. icon_url: params.icon_url,
  131. }
  132. }
  133. }