workspace_controller.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. use crate::{
  2. entities::{
  3. app::{App, RepeatedApp},
  4. workspace::*,
  5. },
  6. errors::*,
  7. module::{WorkspaceDatabase, WorkspaceUser},
  8. observable::*,
  9. services::{helper::spawn, server::Server, AppController},
  10. sql_tables::{
  11. app::{AppTable, AppTableSql},
  12. view::{ViewTable, ViewTableSql},
  13. workspace::{WorkspaceTable, WorkspaceTableChangeset, WorkspaceTableSql},
  14. },
  15. };
  16. use flowy_database::SqliteConnection;
  17. use flowy_infra::kv::KV;
  18. use std::sync::Arc;
  19. pub(crate) struct WorkspaceController {
  20. pub user: Arc<dyn WorkspaceUser>,
  21. pub workspace_sql: Arc<WorkspaceTableSql>,
  22. pub app_sql: Arc<AppTableSql>,
  23. pub view_sql: Arc<ViewTableSql>,
  24. pub database: Arc<dyn WorkspaceDatabase>,
  25. pub app_controller: Arc<AppController>,
  26. server: Server,
  27. }
  28. impl WorkspaceController {
  29. pub(crate) fn new(
  30. user: Arc<dyn WorkspaceUser>,
  31. database: Arc<dyn WorkspaceDatabase>,
  32. app_controller: Arc<AppController>,
  33. server: Server,
  34. ) -> Self {
  35. let workspace_sql = Arc::new(WorkspaceTableSql {});
  36. let app_sql = Arc::new(AppTableSql {});
  37. let view_sql = Arc::new(ViewTableSql {});
  38. Self {
  39. user,
  40. workspace_sql,
  41. app_sql,
  42. view_sql,
  43. database,
  44. app_controller,
  45. server,
  46. }
  47. }
  48. pub(crate) async fn create_workspace(&self, params: CreateWorkspaceParams) -> Result<Workspace, WorkspaceError> {
  49. let workspace = self.create_workspace_on_server(params.clone()).await?;
  50. let user_id = self.user.user_id()?;
  51. let token = self.user.token()?;
  52. let workspace_table = WorkspaceTable::new(workspace.clone(), &user_id);
  53. let conn = &*self.database.db_connection()?;
  54. //[[immediate_transaction]]
  55. // https://sqlite.org/lang_transaction.html
  56. // IMMEDIATE cause the database connection to start a new write immediately,
  57. // without waiting for a write statement. The BEGIN IMMEDIATE might fail
  58. // with SQLITE_BUSY if another write transaction is already active on another
  59. // database connection.
  60. //
  61. // EXCLUSIVE is similar to IMMEDIATE in that a write transaction is started
  62. // immediately. EXCLUSIVE and IMMEDIATE are the same in WAL mode, but in
  63. // other journaling modes, EXCLUSIVE prevents other database connections from
  64. // reading the database while the transaction is underway.
  65. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  66. self.workspace_sql.create_workspace(workspace_table, conn)?;
  67. let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?;
  68. notify(&token, WorkspaceObservable::UserCreateWorkspace)
  69. .payload(repeated_workspace)
  70. .send();
  71. Ok(())
  72. })?;
  73. Ok(workspace)
  74. }
  75. pub(crate) async fn update_workspace(&self, params: UpdateWorkspaceParams) -> Result<(), WorkspaceError> {
  76. let changeset = WorkspaceTableChangeset::new(params.clone());
  77. let workspace_id = changeset.id.clone();
  78. let conn = &*self.database.db_connection()?;
  79. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  80. let _ = self.workspace_sql.update_workspace(changeset, conn)?;
  81. let user_id = self.user.user_id()?;
  82. let workspace = self.read_local_workspace(workspace_id.clone(), &user_id, conn)?;
  83. notify(&workspace_id, WorkspaceObservable::WorkspaceUpdated)
  84. .payload(workspace)
  85. .send();
  86. Ok(())
  87. })?;
  88. let _ = self.update_workspace_on_server(params)?;
  89. Ok(())
  90. }
  91. pub(crate) async fn delete_workspace(&self, workspace_id: &str) -> Result<(), WorkspaceError> {
  92. let user_id = self.user.user_id()?;
  93. let token = self.user.token()?;
  94. let conn = &*self.database.db_connection()?;
  95. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  96. let _ = self.workspace_sql.delete_workspace(workspace_id, conn)?;
  97. let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?;
  98. notify(&token, WorkspaceObservable::UserDeleteWorkspace)
  99. .payload(repeated_workspace)
  100. .send();
  101. Ok(())
  102. })?;
  103. let _ = self.delete_workspace_on_server(workspace_id)?;
  104. Ok(())
  105. }
  106. pub(crate) async fn open_workspace(&self, params: QueryWorkspaceParams) -> Result<Workspace, WorkspaceError> {
  107. let user_id = self.user.user_id()?;
  108. let conn = self.database.db_connection()?;
  109. if let Some(workspace_id) = params.workspace_id.clone() {
  110. let workspace = self.read_local_workspace(workspace_id, &user_id, &*conn)?;
  111. set_current_workspace(&workspace.id);
  112. Ok(workspace)
  113. } else {
  114. return Err(ErrorBuilder::new(ErrorCode::WorkspaceIdInvalid)
  115. .msg("Opened workspace id should not be empty")
  116. .build());
  117. }
  118. }
  119. pub(crate) async fn read_workspaces(&self, params: QueryWorkspaceParams) -> Result<RepeatedWorkspace, WorkspaceError> {
  120. let user_id = self.user.user_id()?;
  121. let workspaces = self.read_local_workspaces(params.workspace_id.clone(), &user_id, &*self.database.db_connection()?)?;
  122. let _ = self.read_workspaces_on_server(user_id.clone(), params.clone());
  123. Ok(workspaces)
  124. }
  125. pub(crate) async fn read_cur_workspace(&self) -> Result<Workspace, WorkspaceError> {
  126. let workspace_id = get_current_workspace()?;
  127. let user_id = self.user.user_id()?;
  128. let params = QueryWorkspaceParams {
  129. workspace_id: Some(workspace_id.clone()),
  130. };
  131. let workspace = self.read_local_workspace(workspace_id, &user_id, &*self.database.db_connection()?)?;
  132. let _ = self.read_workspaces_on_server(user_id.clone(), params)?;
  133. Ok(workspace)
  134. }
  135. pub(crate) async fn read_workspace_apps(&self) -> Result<RepeatedApp, WorkspaceError> {
  136. let workspace_id = get_current_workspace()?;
  137. let conn = self.database.db_connection()?;
  138. let apps = self.read_local_apps(&workspace_id, &*conn)?;
  139. // TODO: read from server
  140. Ok(RepeatedApp { items: apps })
  141. }
  142. #[tracing::instrument(level = "debug", skip(self, conn), err)]
  143. fn read_local_workspaces(
  144. &self,
  145. workspace_id: Option<String>,
  146. user_id: &str,
  147. conn: &SqliteConnection,
  148. ) -> Result<RepeatedWorkspace, WorkspaceError> {
  149. let workspace_id = workspace_id.to_owned();
  150. let workspace_tables = self.workspace_sql.read_workspaces(workspace_id, user_id, conn)?;
  151. let mut workspaces = vec![];
  152. for table in workspace_tables {
  153. let apps = self.read_local_apps(&table.id, conn)?;
  154. let mut workspace: Workspace = table.into();
  155. workspace.apps.items = apps;
  156. workspaces.push(workspace);
  157. }
  158. Ok(RepeatedWorkspace { items: workspaces })
  159. }
  160. fn read_local_workspace(&self, workspace_id: String, user_id: &str, conn: &SqliteConnection) -> Result<Workspace, WorkspaceError> {
  161. // Opti: fetch single workspace from local db
  162. let mut repeated_workspace = self.read_local_workspaces(Some(workspace_id), user_id, conn)?;
  163. if repeated_workspace.is_empty() {
  164. return Err(ErrorBuilder::new(ErrorCode::RecordNotFound).build());
  165. }
  166. debug_assert_eq!(repeated_workspace.len(), 1);
  167. let workspace = repeated_workspace.drain(..1).collect::<Vec<Workspace>>().pop().unwrap();
  168. Ok(workspace)
  169. }
  170. #[tracing::instrument(level = "debug", skip(self, conn), err)]
  171. fn read_local_apps(&self, workspace_id: &str, conn: &SqliteConnection) -> Result<Vec<App>, WorkspaceError> {
  172. let apps = self
  173. .workspace_sql
  174. .read_apps_belong_to_workspace(workspace_id, conn)?
  175. .into_iter()
  176. .map(|app_table| app_table.into())
  177. .collect::<Vec<App>>();
  178. Ok(apps)
  179. }
  180. }
  181. impl WorkspaceController {
  182. fn token_with_server(&self) -> Result<(String, Server), WorkspaceError> {
  183. let token = self.user.token()?;
  184. let server = self.server.clone();
  185. Ok((token, server))
  186. }
  187. #[tracing::instrument(level = "debug", skip(self), err)]
  188. async fn create_workspace_on_server(&self, params: CreateWorkspaceParams) -> Result<Workspace, WorkspaceError> {
  189. let token = self.user.token()?;
  190. let workspace = self.server.create_workspace(&token, params).await?;
  191. Ok(workspace)
  192. }
  193. #[tracing::instrument(level = "debug", skip(self), err)]
  194. fn update_workspace_on_server(&self, params: UpdateWorkspaceParams) -> Result<(), WorkspaceError> {
  195. let (token, server) = self.token_with_server()?;
  196. spawn(async move {
  197. match server.update_workspace(&token, params).await {
  198. Ok(_) => {},
  199. Err(e) => {
  200. // TODO: retry?
  201. log::error!("Update workspace failed: {:?}", e);
  202. },
  203. }
  204. });
  205. Ok(())
  206. }
  207. #[tracing::instrument(level = "debug", skip(self), err)]
  208. fn delete_workspace_on_server(&self, workspace_id: &str) -> Result<(), WorkspaceError> {
  209. let params = DeleteWorkspaceParams {
  210. workspace_id: workspace_id.to_string(),
  211. };
  212. let (token, server) = self.token_with_server()?;
  213. spawn(async move {
  214. match server.delete_workspace(&token, params).await {
  215. Ok(_) => {},
  216. Err(e) => {
  217. // TODO: retry?
  218. log::error!("Delete workspace failed: {:?}", e);
  219. },
  220. }
  221. });
  222. Ok(())
  223. }
  224. #[tracing::instrument(level = "debug", skip(self), err)]
  225. fn read_workspaces_on_server(&self, user_id: String, params: QueryWorkspaceParams) -> Result<(), WorkspaceError> {
  226. let (token, server) = self.token_with_server()?;
  227. let workspace_sql = self.workspace_sql.clone();
  228. let app_sql = self.app_sql.clone();
  229. let view_sql = self.view_sql.clone();
  230. let conn = self.database.db_connection()?;
  231. spawn(async move {
  232. // Opti: handle the error and retry?
  233. let workspaces = server.read_workspace(&token, params).await?;
  234. let _ = (&*conn).immediate_transaction::<_, WorkspaceError, _>(|| {
  235. log::debug!("Save {} workspace", workspaces.len());
  236. for workspace in &workspaces.items {
  237. let mut m_workspace = workspace.clone();
  238. let apps = m_workspace.apps.take_items();
  239. let workspace_table = WorkspaceTable::new(m_workspace, &user_id);
  240. let _ = workspace_sql.create_workspace(workspace_table, &*conn)?;
  241. log::debug!("Save {} apps", apps.len());
  242. for mut app in apps {
  243. let views = app.belongings.take_items();
  244. match app_sql.create_app(AppTable::new(app), &*conn) {
  245. Ok(_) => {},
  246. Err(e) => log::error!("create app failed: {:?}", e),
  247. }
  248. log::debug!("Save {} views", views.len());
  249. for view in views {
  250. match view_sql.create_view(ViewTable::new(view), &*conn) {
  251. Ok(_) => {},
  252. Err(e) => log::error!("create view failed: {:?}", e),
  253. }
  254. }
  255. }
  256. }
  257. Ok(())
  258. })?;
  259. notify(&token, WorkspaceObservable::WorkspaceListUpdated).payload(workspaces).send();
  260. Result::<(), WorkspaceError>::Ok(())
  261. });
  262. Ok(())
  263. }
  264. }
  265. const CURRENT_WORKSPACE_ID: &str = "current_workspace_id";
  266. fn set_current_workspace(workspace: &str) { KV::set_str(CURRENT_WORKSPACE_ID, workspace.to_owned()); }
  267. fn get_current_workspace() -> Result<String, WorkspaceError> {
  268. match KV::get_str(CURRENT_WORKSPACE_ID) {
  269. None => Err(ErrorBuilder::new(ErrorCode::CurrentWorkspaceNotFound).build()),
  270. Some(workspace_id) => Ok(workspace_id),
  271. }
  272. }