controller.rs 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. use crate::{
  2. dart_notification::*,
  3. entities::{
  4. app::{App, CreateAppParams, *},
  5. trash::TrashType,
  6. },
  7. errors::*,
  8. event_map::{FolderCouldServiceV1, WorkspaceUser},
  9. services::{
  10. persistence::{AppChangeset, FolderPersistence, FolderPersistenceTransaction},
  11. TrashController, TrashEvent,
  12. },
  13. };
  14. use futures::{FutureExt, StreamExt};
  15. use std::{collections::HashSet, sync::Arc};
  16. pub(crate) struct AppController {
  17. user: Arc<dyn WorkspaceUser>,
  18. persistence: Arc<FolderPersistence>,
  19. trash_controller: Arc<TrashController>,
  20. cloud_service: Arc<dyn FolderCouldServiceV1>,
  21. }
  22. impl AppController {
  23. pub(crate) fn new(
  24. user: Arc<dyn WorkspaceUser>,
  25. persistence: Arc<FolderPersistence>,
  26. trash_can: Arc<TrashController>,
  27. cloud_service: Arc<dyn FolderCouldServiceV1>,
  28. ) -> Self {
  29. Self {
  30. user,
  31. persistence,
  32. trash_controller: trash_can,
  33. cloud_service,
  34. }
  35. }
  36. pub fn initialize(&self) -> Result<(), FlowyError> {
  37. self.listen_trash_controller_event();
  38. Ok(())
  39. }
  40. #[tracing::instrument(level = "debug", skip(self, params), fields(name = %params.name) err)]
  41. pub(crate) async fn create_app_from_params(&self, params: CreateAppParams) -> Result<App, FlowyError> {
  42. let app = self.create_app_on_server(params).await?;
  43. self.create_app_on_local(app).await
  44. }
  45. pub(crate) async fn create_app_on_local(&self, app: App) -> Result<App, FlowyError> {
  46. let _ = self
  47. .persistence
  48. .begin_transaction(|transaction| {
  49. let _ = transaction.create_app(app.clone())?;
  50. let _ = notify_apps_changed(&app.workspace_id, self.trash_controller.clone(), &transaction)?;
  51. Ok(())
  52. })
  53. .await?;
  54. Ok(app)
  55. }
  56. pub(crate) async fn read_app(&self, params: AppId) -> Result<App, FlowyError> {
  57. let app = self
  58. .persistence
  59. .begin_transaction(|transaction| {
  60. let app = transaction.read_app(&params.app_id)?;
  61. let trash_ids = self.trash_controller.read_trash_ids(&transaction)?;
  62. if trash_ids.contains(&app.id) {
  63. return Err(FlowyError::record_not_found());
  64. }
  65. Ok(app)
  66. })
  67. .await?;
  68. let _ = self.read_app_on_server(params)?;
  69. Ok(app)
  70. }
  71. pub(crate) async fn update_app(&self, params: UpdateAppParams) -> Result<(), FlowyError> {
  72. let changeset = AppChangeset::new(params.clone());
  73. let app_id = changeset.id.clone();
  74. let app = self
  75. .persistence
  76. .begin_transaction(|transaction| {
  77. let _ = transaction.update_app(changeset)?;
  78. let app = transaction.read_app(&app_id)?;
  79. Ok(app)
  80. })
  81. .await?;
  82. send_dart_notification(&app_id, FolderNotification::AppUpdated)
  83. .payload(app)
  84. .send();
  85. let _ = self.update_app_on_server(params)?;
  86. Ok(())
  87. }
  88. pub(crate) async fn read_local_apps(&self, ids: Vec<String>) -> Result<Vec<App>, FlowyError> {
  89. let apps = self
  90. .persistence
  91. .begin_transaction(|transaction| {
  92. let mut apps = vec![];
  93. for id in ids {
  94. apps.push(transaction.read_app(&id)?);
  95. }
  96. Ok(apps)
  97. })
  98. .await?;
  99. Ok(apps)
  100. }
  101. }
  102. impl AppController {
  103. #[tracing::instrument(level = "trace", skip(self), err)]
  104. async fn create_app_on_server(&self, params: CreateAppParams) -> Result<App, FlowyError> {
  105. let token = self.user.token()?;
  106. let app = self.cloud_service.create_app(&token, params).await?;
  107. Ok(app)
  108. }
  109. #[tracing::instrument(level = "trace", skip(self), err)]
  110. fn update_app_on_server(&self, params: UpdateAppParams) -> Result<(), FlowyError> {
  111. let token = self.user.token()?;
  112. let server = self.cloud_service.clone();
  113. tokio::spawn(async move {
  114. match server.update_app(&token, params).await {
  115. Ok(_) => {}
  116. Err(e) => {
  117. // TODO: retry?
  118. log::error!("Update app failed: {:?}", e);
  119. }
  120. }
  121. });
  122. Ok(())
  123. }
  124. #[tracing::instrument(level = "trace", skip(self), err)]
  125. fn read_app_on_server(&self, params: AppId) -> Result<(), FlowyError> {
  126. let token = self.user.token()?;
  127. let server = self.cloud_service.clone();
  128. let persistence = self.persistence.clone();
  129. tokio::spawn(async move {
  130. match server.read_app(&token, params).await {
  131. Ok(Some(app)) => {
  132. match persistence
  133. .begin_transaction(|transaction| transaction.create_app(app.clone()))
  134. .await
  135. {
  136. Ok(_) => {
  137. send_dart_notification(&app.id, FolderNotification::AppUpdated)
  138. .payload(app)
  139. .send();
  140. }
  141. Err(e) => log::error!("Save app failed: {:?}", e),
  142. }
  143. }
  144. Ok(None) => {}
  145. Err(e) => log::error!("Read app failed: {:?}", e),
  146. }
  147. });
  148. Ok(())
  149. }
  150. fn listen_trash_controller_event(&self) {
  151. let mut rx = self.trash_controller.subscribe();
  152. let persistence = self.persistence.clone();
  153. let trash_controller = self.trash_controller.clone();
  154. let _ = tokio::spawn(async move {
  155. loop {
  156. let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
  157. match result {
  158. Ok(event) => event.select(TrashType::App),
  159. Err(_e) => None,
  160. }
  161. }));
  162. if let Some(event) = stream.next().await {
  163. handle_trash_event(persistence.clone(), trash_controller.clone(), event).await
  164. }
  165. }
  166. });
  167. }
  168. }
  169. #[tracing::instrument(level = "trace", skip(persistence, trash_controller))]
  170. async fn handle_trash_event(
  171. persistence: Arc<FolderPersistence>,
  172. trash_controller: Arc<TrashController>,
  173. event: TrashEvent,
  174. ) {
  175. match event {
  176. TrashEvent::NewTrash(identifiers, ret) | TrashEvent::Putback(identifiers, ret) => {
  177. let result = persistence
  178. .begin_transaction(|transaction| {
  179. for identifier in identifiers.items {
  180. let app = transaction.read_app(&identifier.id)?;
  181. let _ = notify_apps_changed(&app.workspace_id, trash_controller.clone(), &transaction)?;
  182. }
  183. Ok(())
  184. })
  185. .await;
  186. let _ = ret.send(result).await;
  187. }
  188. TrashEvent::Delete(identifiers, ret) => {
  189. let result = persistence
  190. .begin_transaction(|transaction| {
  191. let mut notify_ids = HashSet::new();
  192. for identifier in identifiers.items {
  193. let app = transaction.read_app(&identifier.id)?;
  194. let _ = transaction.delete_app(&identifier.id)?;
  195. notify_ids.insert(app.workspace_id);
  196. }
  197. for notify_id in notify_ids {
  198. let _ = notify_apps_changed(&notify_id, trash_controller.clone(), &transaction)?;
  199. }
  200. Ok(())
  201. })
  202. .await;
  203. let _ = ret.send(result).await;
  204. }
  205. }
  206. }
  207. #[tracing::instrument(skip(workspace_id, trash_controller, transaction), err)]
  208. fn notify_apps_changed<'a>(
  209. workspace_id: &str,
  210. trash_controller: Arc<TrashController>,
  211. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  212. ) -> FlowyResult<()> {
  213. let repeated_app = read_local_workspace_apps(workspace_id, trash_controller, transaction)?;
  214. send_dart_notification(workspace_id, FolderNotification::WorkspaceAppsChanged)
  215. .payload(repeated_app)
  216. .send();
  217. Ok(())
  218. }
  219. pub fn read_local_workspace_apps<'a>(
  220. workspace_id: &str,
  221. trash_controller: Arc<TrashController>,
  222. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  223. ) -> Result<RepeatedApp, FlowyError> {
  224. let mut apps = transaction.read_workspace_apps(workspace_id)?;
  225. let trash_ids = trash_controller.read_trash_ids(transaction)?;
  226. apps.retain(|app| !trash_ids.contains(&app.id));
  227. Ok(RepeatedApp { items: apps })
  228. }