controller.rs 8.3 KB


  1. use crate::entities::workspace::*;
  2. use crate::manager::FolderManager;
  3. use crate::{
  4. errors::*,
  5. event_map::{FolderCouldServiceV1, WorkspaceUser},
  6. notification::*,
  7. services::{
  8. persistence::{FolderPersistence, FolderPersistenceTransaction, WorkspaceChangeset},
  9. read_workspace_apps, TrashController,
  10. },
  11. };
  12. use flowy_sqlite::kv::KV;
  13. use folder_model::{AppRevision, WorkspaceRevision};
  14. use lib_dispatch::prelude::ToBytes;
  15. use std::sync::Arc;
  16. pub struct WorkspaceController {
  17. pub user: Arc<dyn WorkspaceUser>,
  18. persistence: Arc<FolderPersistence>,
  19. pub(crate) trash_controller: Arc<TrashController>,
  20. cloud_service: Arc<dyn FolderCouldServiceV1>,
  21. }
  22. impl WorkspaceController {
  23. pub(crate) fn new(
  24. user: Arc<dyn WorkspaceUser>,
  25. persistence: Arc<FolderPersistence>,
  26. trash_can: Arc<TrashController>,
  27. cloud_service: Arc<dyn FolderCouldServiceV1>,
  28. ) -> Self {
  29. Self {
  30. user,
  31. persistence,
  32. trash_controller: trash_can,
  33. cloud_service,
  34. }
  35. }
  36. pub(crate) async fn create_workspace_from_params(
  37. &self,
  38. params: CreateWorkspaceParams,
  39. ) -> Result<WorkspaceRevision, FlowyError> {
  40. let workspace = self.create_workspace_on_server(params.clone()).await?;
  41. let user_id = self.user.user_id()?;
  42. let workspaces = self
  43. .persistence
  44. .begin_transaction(|transaction| {
  45. transaction.create_workspace(&user_id, workspace.clone())?;
  46. transaction.read_workspaces(&user_id, None)
  47. })
  48. .await?
  49. .into_iter()
  50. .map(|workspace_rev| workspace_rev.into())
  51. .collect();
  52. let repeated_workspace = RepeatedWorkspacePB { items: workspaces };
  53. send_workspace_notification(FolderNotification::DidCreateWorkspace, repeated_workspace);
  54. set_current_workspace(&user_id, &workspace.id);
  55. Ok(workspace)
  56. }
  57. #[allow(dead_code)]
  58. pub(crate) async fn update_workspace(
  59. &self,
  60. params: UpdateWorkspaceParams,
  61. ) -> Result<(), FlowyError> {
  62. let changeset = WorkspaceChangeset::new(params.clone());
  63. let workspace_id = changeset.id.clone();
  64. let workspace = self
  65. .persistence
  66. .begin_transaction(|transaction| {
  67. transaction.update_workspace(changeset)?;
  68. let user_id = self.user.user_id()?;
  69. self.read_workspace(workspace_id.clone(), &user_id, &transaction)
  70. })
  71. .await?;
  72. send_workspace_notification(FolderNotification::DidUpdateWorkspace, workspace);
  73. self.update_workspace_on_server(params)?;
  74. Ok(())
  75. }
  76. #[allow(dead_code)]
  77. pub(crate) async fn delete_workspace(&self, workspace_id: &str) -> Result<(), FlowyError> {
  78. let user_id = self.user.user_id()?;
  79. let repeated_workspace = self
  80. .persistence
  81. .begin_transaction(|transaction| {
  82. transaction.delete_workspace(workspace_id)?;
  83. self.read_workspaces(None, &user_id, &transaction)
  84. })
  85. .await?;
  86. send_workspace_notification(FolderNotification::DidDeleteWorkspace, repeated_workspace);
  87. self.delete_workspace_on_server(workspace_id)?;
  88. Ok(())
  89. }
  90. pub(crate) async fn open_workspace(
  91. &self,
  92. params: WorkspaceIdPB,
  93. ) -> Result<WorkspacePB, FlowyError> {
  94. let user_id = self.user.user_id()?;
  95. if let Some(workspace_id) = params.value {
  96. let workspace = self
  97. .persistence
  98. .begin_transaction(|transaction| self.read_workspace(workspace_id, &user_id, &transaction))
  99. .await?;
  100. set_current_workspace(&user_id, &workspace.id);
  101. Ok(workspace)
  102. } else {
  103. Err(FlowyError::workspace_id().context("Opened workspace id should not be empty"))
  104. }
  105. }
  106. pub(crate) async fn read_current_workspace_apps(&self) -> Result<Vec<AppRevision>, FlowyError> {
  107. let user_id = self.user.user_id()?;
  108. let workspace_id = get_current_workspace(&user_id)?;
  109. let app_revs = self
  110. .persistence
  111. .begin_transaction(|transaction| {
  112. read_workspace_apps(&workspace_id, self.trash_controller.clone(), &transaction)
  113. })
  114. .await?;
  115. // TODO: read from server
  116. Ok(app_revs)
  117. }
  118. #[tracing::instrument(level = "debug", skip(self, transaction), err)]
  119. pub(crate) fn read_workspaces<'a>(
  120. &self,
  121. workspace_id: Option<String>,
  122. user_id: &str,
  123. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  124. ) -> Result<RepeatedWorkspacePB, FlowyError> {
  125. let workspace_id = workspace_id.to_owned();
  126. let trash_ids = self.trash_controller.read_trash_ids(transaction)?;
  127. let workspaces = transaction
  128. .read_workspaces(user_id, workspace_id)?
  129. .into_iter()
  130. .map(|mut workspace_rev| {
  131. workspace_rev
  132. .apps
  133. .retain(|app_rev| !trash_ids.contains(&app_rev.id));
  134. workspace_rev.into()
  135. })
  136. .collect();
  137. Ok(RepeatedWorkspacePB { items: workspaces })
  138. }
  139. pub(crate) fn read_workspace<'a>(
  140. &self,
  141. workspace_id: String,
  142. user_id: &str,
  143. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  144. ) -> Result<WorkspacePB, FlowyError> {
  145. let mut workspaces = self
  146. .read_workspaces(Some(workspace_id.clone()), user_id, transaction)?
  147. .items;
  148. if workspaces.is_empty() {
  149. return Err(
  150. FlowyError::record_not_found().context(format!("{} workspace not found", workspace_id)),
  151. );
  152. }
  153. debug_assert_eq!(workspaces.len(), 1);
  154. let workspace = workspaces
  155. .drain(..1)
  156. .collect::<Vec<WorkspacePB>>()
  157. .pop()
  158. .unwrap();
  159. Ok(workspace)
  160. }
  161. }
  162. impl WorkspaceController {
  163. async fn create_workspace_on_server(
  164. &self,
  165. params: CreateWorkspaceParams,
  166. ) -> Result<WorkspaceRevision, FlowyError> {
  167. let token = self.user.token()?;
  168. self.cloud_service.create_workspace(&token, params).await
  169. }
  170. fn update_workspace_on_server(&self, params: UpdateWorkspaceParams) -> Result<(), FlowyError> {
  171. let (token, server) = (self.user.token()?, self.cloud_service.clone());
  172. tokio::spawn(async move {
  173. match server.update_workspace(&token, params).await {
  174. Ok(_) => {},
  175. Err(e) => {
  176. // TODO: retry?
  177. log::error!("Update workspace failed: {:?}", e);
  178. },
  179. }
  180. });
  181. Ok(())
  182. }
  183. fn delete_workspace_on_server(&self, workspace_id: &str) -> Result<(), FlowyError> {
  184. let params = WorkspaceIdPB {
  185. value: Some(workspace_id.to_string()),
  186. };
  187. let (token, server) = (self.user.token()?, self.cloud_service.clone());
  188. tokio::spawn(async move {
  189. match server.delete_workspace(&token, params).await {
  190. Ok(_) => {},
  191. Err(e) => {
  192. // TODO: retry?
  193. log::error!("Delete workspace failed: {:?}", e);
  194. },
  195. }
  196. });
  197. Ok(())
  198. }
  199. }
  200. pub async fn notify_workspace_setting_did_change(
  201. folder_manager: &Arc<FolderManager>,
  202. view_id: &str,
  203. ) -> FlowyResult<()> {
  204. let user_id = folder_manager.user.user_id()?;
  205. let workspace_id = get_current_workspace(&user_id)?;
  206. let workspace_setting = folder_manager
  207. .persistence
  208. .begin_transaction(|transaction| {
  209. let workspace = folder_manager.workspace_controller.read_workspace(
  210. workspace_id.clone(),
  211. &user_id,
  212. &transaction,
  213. )?;
  214. let setting = match transaction.read_view(view_id) {
  215. Ok(latest_view) => WorkspaceSettingPB {
  216. workspace,
  217. latest_view: Some(latest_view.into()),
  218. },
  219. Err(_) => WorkspaceSettingPB {
  220. workspace,
  221. latest_view: None,
  222. },
  223. };
  224. Ok(setting)
  225. })
  226. .await?;
  227. send_workspace_notification(
  228. FolderNotification::DidUpdateWorkspaceSetting,
  229. workspace_setting,
  230. );
  231. Ok(())
  232. }
  233. /// The [CURRENT_WORKSPACE] represents as the current workspace that opened by the
  234. /// user. Only one workspace can be opened at a time.
  235. const CURRENT_WORKSPACE: &str = "current-workspace";
  236. fn send_workspace_notification<T: ToBytes>(ty: FolderNotification, payload: T) {
  237. send_notification(CURRENT_WORKSPACE, ty)
  238. .payload(payload)
  239. .send();
  240. }
  241. const CURRENT_WORKSPACE_ID: &str = "current_workspace_id";
  242. pub fn set_current_workspace(_user_id: &str, workspace_id: &str) {
  243. KV::set_str(CURRENT_WORKSPACE_ID, workspace_id.to_owned());
  244. }
  245. pub fn clear_current_workspace(_user_id: &str) {
  246. let _ = KV::remove(CURRENT_WORKSPACE_ID);
  247. }
  248. pub fn get_current_workspace(_user_id: &str) -> Result<String, FlowyError> {
  249. match KV::get_str(CURRENT_WORKSPACE_ID) {
  250. None => Err(
  251. FlowyError::record_not_found()
  252. .context("Current workspace not found or should call open workspace first"),
  253. ),
  254. Some(workspace_id) => Ok(workspace_id),
  255. }
  256. }