workspace_controller.rs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321
  1. use crate::{
  2. entities::{
  3. app::{App, RepeatedApp},
  4. workspace::*,
  5. },
  6. errors::*,
  7. module::{WorkspaceDatabase, WorkspaceUser},
  8. notify::*,
  9. services::{helper::spawn, server::Server, AppController, TrashCan, ViewController},
  10. sql_tables::workspace::{WorkspaceTable, WorkspaceTableChangeset, WorkspaceTableSql},
  11. };
  12. use flowy_database::SqliteConnection;
  13. use flowy_infra::kv::KV;
  14. use std::sync::Arc;
  15. pub struct WorkspaceController {
  16. pub user: Arc<dyn WorkspaceUser>,
  17. pub(crate) workspace_sql: Arc<WorkspaceTableSql>,
  18. pub(crate) view_controller: Arc<ViewController>,
  19. pub(crate) database: Arc<dyn WorkspaceDatabase>,
  20. pub(crate) app_controller: Arc<AppController>,
  21. pub(crate) trash_can: Arc<TrashCan>,
  22. server: Server,
  23. }
  24. impl WorkspaceController {
  25. pub(crate) fn new(
  26. user: Arc<dyn WorkspaceUser>,
  27. database: Arc<dyn WorkspaceDatabase>,
  28. app_controller: Arc<AppController>,
  29. view_controller: Arc<ViewController>,
  30. trash_can: Arc<TrashCan>,
  31. server: Server,
  32. ) -> Self {
  33. let workspace_sql = Arc::new(WorkspaceTableSql {});
  34. Self {
  35. user,
  36. workspace_sql,
  37. database,
  38. app_controller,
  39. view_controller,
  40. trash_can,
  41. server,
  42. }
  43. }
  44. pub fn init(&self) -> Result<(), WorkspaceError> {
  45. let _ = self.trash_can.init()?;
  46. let _ = self.view_controller.init()?;
  47. Ok(())
  48. }
  49. pub(crate) async fn create_workspace(&self, params: CreateWorkspaceParams) -> Result<Workspace, WorkspaceError> {
  50. let workspace = self.create_workspace_on_server(params.clone()).await?;
  51. let user_id = self.user.user_id()?;
  52. let token = self.user.token()?;
  53. let workspace_table = WorkspaceTable::new(workspace.clone(), &user_id);
  54. let conn = &*self.database.db_connection()?;
  55. //[[immediate_transaction]]
  56. // https://sqlite.org/lang_transaction.html
  57. // IMMEDIATE cause the database connection to start a new write immediately,
  58. // without waiting for a write statement. The BEGIN IMMEDIATE might fail
  59. // with SQLITE_BUSY if another write transaction is already active on another
  60. // database connection.
  61. //
  62. // EXCLUSIVE is similar to IMMEDIATE in that a write transaction is started
  63. // immediately. EXCLUSIVE and IMMEDIATE are the same in WAL mode, but in
  64. // other journaling modes, EXCLUSIVE prevents other database connections from
  65. // reading the database while the transaction is underway.
  66. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  67. self.workspace_sql.create_workspace(workspace_table, conn)?;
  68. let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?;
  69. send_dart_notification(&token, WorkspaceNotification::UserCreateWorkspace)
  70. .payload(repeated_workspace)
  71. .send();
  72. Ok(())
  73. })?;
  74. Ok(workspace)
  75. }
  76. #[allow(dead_code)]
  77. pub(crate) async fn update_workspace(&self, params: UpdateWorkspaceParams) -> Result<(), WorkspaceError> {
  78. let changeset = WorkspaceTableChangeset::new(params.clone());
  79. let workspace_id = changeset.id.clone();
  80. let conn = &*self.database.db_connection()?;
  81. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  82. let _ = self.workspace_sql.update_workspace(changeset, conn)?;
  83. let user_id = self.user.user_id()?;
  84. let workspace = self.read_local_workspace(workspace_id.clone(), &user_id, conn)?;
  85. send_dart_notification(&workspace_id, WorkspaceNotification::WorkspaceUpdated)
  86. .payload(workspace)
  87. .send();
  88. Ok(())
  89. })?;
  90. let _ = self.update_workspace_on_server(params)?;
  91. Ok(())
  92. }
  93. #[allow(dead_code)]
  94. pub(crate) async fn delete_workspace(&self, workspace_id: &str) -> Result<(), WorkspaceError> {
  95. let user_id = self.user.user_id()?;
  96. let token = self.user.token()?;
  97. let conn = &*self.database.db_connection()?;
  98. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  99. let _ = self.workspace_sql.delete_workspace(workspace_id, conn)?;
  100. let repeated_workspace = self.read_local_workspaces(None, &user_id, conn)?;
  101. send_dart_notification(&token, WorkspaceNotification::UserDeleteWorkspace)
  102. .payload(repeated_workspace)
  103. .send();
  104. Ok(())
  105. })?;
  106. let _ = self.delete_workspace_on_server(workspace_id)?;
  107. Ok(())
  108. }
  109. pub(crate) async fn open_workspace(&self, params: QueryWorkspaceParams) -> Result<Workspace, WorkspaceError> {
  110. let user_id = self.user.user_id()?;
  111. let conn = self.database.db_connection()?;
  112. if let Some(workspace_id) = params.workspace_id.clone() {
  113. let workspace = self.read_local_workspace(workspace_id, &user_id, &*conn)?;
  114. set_current_workspace(&workspace.id);
  115. Ok(workspace)
  116. } else {
  117. return Err(WorkspaceError::workspace_id().context("Opened workspace id should not be empty"));
  118. }
  119. }
  120. pub(crate) async fn read_workspaces(
  121. &self,
  122. params: QueryWorkspaceParams,
  123. ) -> Result<RepeatedWorkspace, WorkspaceError> {
  124. let user_id = self.user.user_id()?;
  125. let workspaces =
  126. self.read_local_workspaces(params.workspace_id.clone(), &user_id, &*self.database.db_connection()?)?;
  127. let _ = self.read_workspaces_on_server(user_id.clone(), params.clone());
  128. Ok(workspaces)
  129. }
  130. pub(crate) async fn read_cur_workspace(&self) -> Result<Workspace, WorkspaceError> {
  131. let workspace_id = get_current_workspace()?;
  132. let user_id = self.user.user_id()?;
  133. let params = QueryWorkspaceParams {
  134. workspace_id: Some(workspace_id.clone()),
  135. };
  136. let workspace = self.read_local_workspace(workspace_id, &user_id, &*self.database.db_connection()?)?;
  137. let _ = self.read_workspaces_on_server(user_id.clone(), params)?;
  138. Ok(workspace)
  139. }
  140. pub(crate) async fn read_workspace_apps(&self) -> Result<RepeatedApp, WorkspaceError> {
  141. let workspace_id = get_current_workspace()?;
  142. let conn = self.database.db_connection()?;
  143. let apps = self.read_local_apps(&workspace_id, &*conn)?;
  144. // TODO: read from server
  145. Ok(RepeatedApp { items: apps })
  146. }
  147. #[tracing::instrument(level = "debug", skip(self, conn), err)]
  148. fn read_local_workspaces(
  149. &self,
  150. workspace_id: Option<String>,
  151. user_id: &str,
  152. conn: &SqliteConnection,
  153. ) -> Result<RepeatedWorkspace, WorkspaceError> {
  154. let workspace_id = workspace_id.to_owned();
  155. let workspace_tables = self.workspace_sql.read_workspaces(workspace_id, user_id, conn)?;
  156. let mut workspaces = vec![];
  157. for table in workspace_tables {
  158. let apps = self.read_local_apps(&table.id, conn)?;
  159. let mut workspace: Workspace = table.into();
  160. workspace.apps.items = apps;
  161. workspaces.push(workspace);
  162. }
  163. Ok(RepeatedWorkspace { items: workspaces })
  164. }
  165. fn read_local_workspace(
  166. &self,
  167. workspace_id: String,
  168. user_id: &str,
  169. conn: &SqliteConnection,
  170. ) -> Result<Workspace, WorkspaceError> {
  171. // Opti: fetch single workspace from local db
  172. let mut repeated_workspace = self.read_local_workspaces(Some(workspace_id.clone()), user_id, conn)?;
  173. if repeated_workspace.is_empty() {
  174. return Err(WorkspaceError::record_not_found().context(format!("{} workspace not found", workspace_id)));
  175. }
  176. debug_assert_eq!(repeated_workspace.len(), 1);
  177. let workspace = repeated_workspace.drain(..1).collect::<Vec<Workspace>>().pop().unwrap();
  178. Ok(workspace)
  179. }
  180. #[tracing::instrument(level = "debug", skip(self, conn), err)]
  181. fn read_local_apps(&self, workspace_id: &str, conn: &SqliteConnection) -> Result<Vec<App>, WorkspaceError> {
  182. let apps = self
  183. .workspace_sql
  184. .read_apps_belong_to_workspace(workspace_id, conn)?
  185. .into_iter()
  186. .map(|app_table| app_table.into())
  187. .collect::<Vec<App>>();
  188. Ok(apps)
  189. }
  190. }
  191. impl WorkspaceController {
  192. fn token_with_server(&self) -> Result<(String, Server), WorkspaceError> {
  193. let token = self.user.token()?;
  194. let server = self.server.clone();
  195. Ok((token, server))
  196. }
  197. #[tracing::instrument(level = "debug", skip(self), err)]
  198. async fn create_workspace_on_server(&self, params: CreateWorkspaceParams) -> Result<Workspace, WorkspaceError> {
  199. let token = self.user.token()?;
  200. let workspace = self.server.create_workspace(&token, params).await?;
  201. Ok(workspace)
  202. }
  203. #[tracing::instrument(level = "debug", skip(self), err)]
  204. fn update_workspace_on_server(&self, params: UpdateWorkspaceParams) -> Result<(), WorkspaceError> {
  205. let (token, server) = self.token_with_server()?;
  206. spawn(async move {
  207. match server.update_workspace(&token, params).await {
  208. Ok(_) => {},
  209. Err(e) => {
  210. // TODO: retry?
  211. log::error!("Update workspace failed: {:?}", e);
  212. },
  213. }
  214. });
  215. Ok(())
  216. }
  217. #[tracing::instrument(level = "debug", skip(self), err)]
  218. fn delete_workspace_on_server(&self, workspace_id: &str) -> Result<(), WorkspaceError> {
  219. let params = DeleteWorkspaceParams {
  220. workspace_id: workspace_id.to_string(),
  221. };
  222. let (token, server) = self.token_with_server()?;
  223. spawn(async move {
  224. match server.delete_workspace(&token, params).await {
  225. Ok(_) => {},
  226. Err(e) => {
  227. // TODO: retry?
  228. log::error!("Delete workspace failed: {:?}", e);
  229. },
  230. }
  231. });
  232. Ok(())
  233. }
  234. #[tracing::instrument(level = "debug", skip(self), err)]
  235. fn read_workspaces_on_server(&self, user_id: String, params: QueryWorkspaceParams) -> Result<(), WorkspaceError> {
  236. let (token, server) = self.token_with_server()?;
  237. let workspace_sql = self.workspace_sql.clone();
  238. let app_ctrl = self.app_controller.clone();
  239. let view_ctrl = self.view_controller.clone();
  240. let conn = self.database.db_connection()?;
  241. spawn(async move {
  242. // Opti: handle the error and retry?
  243. let workspaces = server.read_workspace(&token, params).await?;
  244. let _ = (&*conn).immediate_transaction::<_, WorkspaceError, _>(|| {
  245. log::debug!("Save {} workspace", workspaces.len());
  246. for workspace in &workspaces.items {
  247. let mut m_workspace = workspace.clone();
  248. let apps = m_workspace.apps.into_inner();
  249. let workspace_table = WorkspaceTable::new(m_workspace, &user_id);
  250. let _ = workspace_sql.create_workspace(workspace_table, &*conn)?;
  251. log::debug!("Save {} apps", apps.len());
  252. for mut app in apps {
  253. let views = app.belongings.into_inner();
  254. match app_ctrl.save_app(app, &*conn) {
  255. Ok(_) => {},
  256. Err(e) => log::error!("create app failed: {:?}", e),
  257. }
  258. log::debug!("Save {} views", views.len());
  259. for view in views {
  260. match view_ctrl.save_view(view, &*conn) {
  261. Ok(_) => {},
  262. Err(e) => log::error!("create view failed: {:?}", e),
  263. }
  264. }
  265. }
  266. }
  267. Ok(())
  268. })?;
  269. send_dart_notification(&token, WorkspaceNotification::WorkspaceListUpdated)
  270. .payload(workspaces)
  271. .send();
  272. Result::<(), WorkspaceError>::Ok(())
  273. });
  274. Ok(())
  275. }
  276. }
  277. const CURRENT_WORKSPACE_ID: &str = "current_workspace_id";
  278. fn set_current_workspace(workspace: &str) { KV::set_str(CURRENT_WORKSPACE_ID, workspace.to_owned()); }
  279. fn get_current_workspace() -> Result<String, WorkspaceError> {
  280. match KV::get_str(CURRENT_WORKSPACE_ID) {
  281. None => Err(WorkspaceError::record_not_found()
  282. .context("Current workspace not found or should call open workspace first")),
  283. Some(workspace_id) => Ok(workspace_id),
  284. }
  285. }