database.rs 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. use flowy_database::{schema::user_table, DBConnection, Database};
  2. use flowy_error::FlowyError;
  3. use flowy_user_data_model::entities::{SignInResponse, SignUpResponse, UpdateUserParams, UserProfile};
  4. use lazy_static::lazy_static;
  5. use lib_sqlite::ConnectionPool;
  6. use once_cell::sync::Lazy;
  7. use parking_lot::{Mutex, RwLock};
  8. use std::{collections::HashMap, sync::Arc, time::Duration};
  9. lazy_static! {
  10. static ref DB: RwLock<Option<Database>> = RwLock::new(None);
  11. }
  12. pub(crate) struct UserDB {
  13. db_dir: String,
  14. }
  15. impl UserDB {
  16. pub(crate) fn new(db_dir: &str) -> Self {
  17. Self {
  18. db_dir: db_dir.to_owned(),
  19. }
  20. }
  21. fn open_user_db(&self, user_id: &str) -> Result<(), FlowyError> {
  22. if user_id.is_empty() {
  23. return Err(FlowyError::internal().context("user id is empty"));
  24. }
  25. tracing::info!("open user db {}", user_id);
  26. let dir = format!("{}/{}", self.db_dir, user_id);
  27. let db = flowy_database::init(&dir).map_err(|e| {
  28. log::error!("init user db failed, {:?}, user_id: {}", e, user_id);
  29. FlowyError::internal().context(e)
  30. })?;
  31. match DB_MAP.try_write_for(Duration::from_millis(300)) {
  32. None => Err(FlowyError::internal().context("Acquire write lock to save user db failed")),
  33. Some(mut write_guard) => {
  34. write_guard.insert(user_id.to_owned(), db);
  35. Ok(())
  36. },
  37. }
  38. }
  39. pub(crate) fn close_user_db(&self, user_id: &str) -> Result<(), FlowyError> {
  40. match DB_MAP.try_write_for(Duration::from_millis(300)) {
  41. None => Err(FlowyError::internal().context("Acquire write lock to close user db failed")),
  42. Some(mut write_guard) => {
  43. set_user_db_init(false, user_id);
  44. write_guard.remove(user_id);
  45. Ok(())
  46. },
  47. }
  48. }
  49. pub(crate) fn get_connection(&self, user_id: &str) -> Result<DBConnection, FlowyError> {
  50. let conn = self.get_pool(user_id)?.get()?;
  51. Ok(conn)
  52. }
  53. pub(crate) fn get_pool(&self, user_id: &str) -> Result<Arc<ConnectionPool>, FlowyError> {
  54. // Opti: INIT_LOCK try to lock the INIT_RECORD accesses. Because the write guard
  55. // can not nested in the read guard that will cause the deadlock.
  56. match INIT_LOCK.try_lock_for(Duration::from_millis(300)) {
  57. None => log::error!("get_pool fail"),
  58. Some(_) => {
  59. if !is_user_db_init(user_id) {
  60. let _ = self.open_user_db(user_id)?;
  61. set_user_db_init(true, user_id);
  62. }
  63. },
  64. }
  65. match DB_MAP.try_read_for(Duration::from_millis(300)) {
  66. None => Err(FlowyError::internal().context("Acquire read lock to read user db failed")),
  67. Some(read_guard) => match read_guard.get(user_id) {
  68. None => {
  69. Err(FlowyError::internal().context("Get connection failed. The database is not initialization"))
  70. },
  71. Some(database) => Ok(database.get_pool()),
  72. },
  73. }
  74. }
  75. }
  76. lazy_static! {
  77. static ref DB_MAP: RwLock<HashMap<String, Database>> = RwLock::new(HashMap::new());
  78. }
  79. static INIT_LOCK: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
  80. static INIT_RECORD: Lazy<Mutex<HashMap<String, bool>>> = Lazy::new(|| Mutex::new(HashMap::new()));
  81. fn set_user_db_init(is_init: bool, user_id: &str) {
  82. let mut record = INIT_RECORD.lock();
  83. record.insert(user_id.to_owned(), is_init);
  84. }
  85. fn is_user_db_init(user_id: &str) -> bool {
  86. match INIT_RECORD.lock().get(user_id) {
  87. None => false,
  88. Some(flag) => *flag,
  89. }
  90. }
  91. #[derive(Clone, Default, Queryable, Identifiable, Insertable)]
  92. #[table_name = "user_table"]
  93. pub struct UserTable {
  94. pub(crate) id: String,
  95. pub(crate) name: String,
  96. pub(crate) token: String,
  97. pub(crate) email: String,
  98. pub(crate) workspace: String, // deprecated
  99. }
  100. impl UserTable {
  101. pub fn new(id: String, name: String, email: String, token: String) -> Self {
  102. Self {
  103. id,
  104. name,
  105. email,
  106. token,
  107. workspace: "".to_owned(),
  108. }
  109. }
  110. pub fn set_workspace(mut self, workspace: String) -> Self {
  111. self.workspace = workspace;
  112. self
  113. }
  114. }
  115. impl std::convert::From<SignUpResponse> for UserTable {
  116. fn from(resp: SignUpResponse) -> Self { UserTable::new(resp.user_id, resp.name, resp.email, resp.token) }
  117. }
  118. impl std::convert::From<SignInResponse> for UserTable {
  119. fn from(resp: SignInResponse) -> Self { UserTable::new(resp.user_id, resp.name, resp.email, resp.token) }
  120. }
  121. impl std::convert::From<UserTable> for UserProfile {
  122. fn from(table: UserTable) -> Self {
  123. UserProfile {
  124. id: table.id,
  125. email: table.email,
  126. name: table.name,
  127. token: table.token,
  128. }
  129. }
  130. }
  131. #[derive(AsChangeset, Identifiable, Default, Debug)]
  132. #[table_name = "user_table"]
  133. pub struct UserTableChangeset {
  134. pub id: String,
  135. pub workspace: Option<String>, // deprecated
  136. pub name: Option<String>,
  137. pub email: Option<String>,
  138. }
  139. impl UserTableChangeset {
  140. pub fn new(params: UpdateUserParams) -> Self {
  141. UserTableChangeset {
  142. id: params.id,
  143. workspace: None,
  144. name: params.name,
  145. email: params.email,
  146. }
  147. }
  148. }