controller.rs 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. use crate::{
  2. entities::{
  3. app::{App, CreateAppParams, *},
  4. trash::TrashType,
  5. },
  6. errors::*,
  7. module::{WorkspaceDatabase, WorkspaceUser},
  8. notify::*,
  9. services::{
  10. app::sql::{AppTable, AppTableChangeset, AppTableSql},
  11. server::Server,
  12. TrashController,
  13. TrashEvent,
  14. },
  15. };
  16. use flowy_database::SqliteConnection;
  17. use futures::{FutureExt, StreamExt};
  18. use std::{collections::HashSet, sync::Arc};
  19. pub(crate) struct AppController {
  20. user: Arc<dyn WorkspaceUser>,
  21. database: Arc<dyn WorkspaceDatabase>,
  22. trash_can: Arc<TrashController>,
  23. server: Server,
  24. }
  25. impl AppController {
  26. pub(crate) fn new(
  27. user: Arc<dyn WorkspaceUser>,
  28. database: Arc<dyn WorkspaceDatabase>,
  29. trash_can: Arc<TrashController>,
  30. server: Server,
  31. ) -> Self {
  32. Self {
  33. user,
  34. database,
  35. trash_can,
  36. server,
  37. }
  38. }
  39. pub fn init(&self) -> Result<(), FlowyError> {
  40. self.listen_trash_can_event();
  41. Ok(())
  42. }
  43. #[tracing::instrument(level = "debug", skip(self, params), fields(name = %params.name) err)]
  44. pub(crate) async fn create_app_from_params(&self, params: CreateAppParams) -> Result<App, FlowyError> {
  45. let app = self.create_app_on_server(params).await?;
  46. self.create_app(app).await
  47. }
  48. pub(crate) async fn create_app(&self, app: App) -> Result<App, FlowyError> {
  49. let conn = &*self.database.db_connection()?;
  50. conn.immediate_transaction::<_, FlowyError, _>(|| {
  51. let _ = self.save_app(app.clone(), &*conn)?;
  52. let _ = notify_apps_changed(&app.workspace_id, self.trash_can.clone(), conn)?;
  53. Ok(())
  54. })?;
  55. Ok(app)
  56. }
  57. pub(crate) fn save_app(&self, app: App, conn: &SqliteConnection) -> Result<(), FlowyError> {
  58. let app_table = AppTable::new(app);
  59. let _ = AppTableSql::create_app(app_table, &*conn)?;
  60. Ok(())
  61. }
  62. pub(crate) async fn read_app(&self, params: AppIdentifier) -> Result<App, FlowyError> {
  63. let conn = self.database.db_connection()?;
  64. let app_table = AppTableSql::read_app(&params.app_id, &*conn)?;
  65. let trash_ids = self.trash_can.trash_ids(&conn)?;
  66. if trash_ids.contains(&app_table.id) {
  67. return Err(FlowyError::record_not_found());
  68. }
  69. let _ = self.read_app_on_server(params)?;
  70. Ok(app_table.into())
  71. }
  72. pub(crate) async fn update_app(&self, params: UpdateAppParams) -> Result<(), FlowyError> {
  73. let changeset = AppTableChangeset::new(params.clone());
  74. let app_id = changeset.id.clone();
  75. let conn = &*self.database.db_connection()?;
  76. conn.immediate_transaction::<_, FlowyError, _>(|| {
  77. let _ = AppTableSql::update_app(changeset, conn)?;
  78. let app: App = AppTableSql::read_app(&app_id, conn)?.into();
  79. send_dart_notification(&app_id, WorkspaceNotification::AppUpdated)
  80. .payload(app)
  81. .send();
  82. Ok(())
  83. })?;
  84. let _ = self.update_app_on_server(params)?;
  85. Ok(())
  86. }
  87. pub(crate) fn read_app_tables(&self, ids: Vec<String>) -> Result<Vec<AppTable>, FlowyError> {
  88. let conn = &*self.database.db_connection()?;
  89. let mut app_tables = vec![];
  90. conn.immediate_transaction::<_, FlowyError, _>(|| {
  91. for app_id in ids {
  92. app_tables.push(AppTableSql::read_app(&app_id, conn)?);
  93. }
  94. Ok(())
  95. })?;
  96. Ok(app_tables)
  97. }
  98. }
  99. impl AppController {
  100. #[tracing::instrument(level = "debug", skip(self), err)]
  101. async fn create_app_on_server(&self, params: CreateAppParams) -> Result<App, FlowyError> {
  102. let token = self.user.token()?;
  103. let app = self.server.create_app(&token, params).await?;
  104. Ok(app)
  105. }
  106. #[tracing::instrument(level = "debug", skip(self), err)]
  107. fn update_app_on_server(&self, params: UpdateAppParams) -> Result<(), FlowyError> {
  108. let token = self.user.token()?;
  109. let server = self.server.clone();
  110. tokio::spawn(async move {
  111. match server.update_app(&token, params).await {
  112. Ok(_) => {},
  113. Err(e) => {
  114. // TODO: retry?
  115. log::error!("Update app failed: {:?}", e);
  116. },
  117. }
  118. });
  119. Ok(())
  120. }
  121. #[tracing::instrument(level = "debug", skip(self), err)]
  122. fn read_app_on_server(&self, params: AppIdentifier) -> Result<(), FlowyError> {
  123. let token = self.user.token()?;
  124. let server = self.server.clone();
  125. let pool = self.database.db_pool()?;
  126. tokio::spawn(async move {
  127. // Opti: retry?
  128. match server.read_app(&token, params).await {
  129. Ok(Some(app)) => match pool.get() {
  130. Ok(conn) => {
  131. let app_table = AppTable::new(app.clone());
  132. let result = AppTableSql::create_app(app_table, &*conn);
  133. match result {
  134. Ok(_) => {
  135. send_dart_notification(&app.id, WorkspaceNotification::AppUpdated)
  136. .payload(app)
  137. .send();
  138. },
  139. Err(e) => log::error!("Save app failed: {:?}", e),
  140. }
  141. },
  142. Err(e) => log::error!("Require db connection failed: {:?}", e),
  143. },
  144. Ok(None) => {},
  145. Err(e) => log::error!("Read app failed: {:?}", e),
  146. }
  147. });
  148. Ok(())
  149. }
  150. fn listen_trash_can_event(&self) {
  151. let mut rx = self.trash_can.subscribe();
  152. let database = self.database.clone();
  153. let trash_can = self.trash_can.clone();
  154. let _ = tokio::spawn(async move {
  155. loop {
  156. let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
  157. match result {
  158. Ok(event) => event.select(TrashType::App),
  159. Err(_e) => None,
  160. }
  161. }));
  162. if let Some(event) = stream.next().await {
  163. handle_trash_event(database.clone(), trash_can.clone(), event).await
  164. }
  165. }
  166. });
  167. }
  168. }
  169. #[tracing::instrument(level = "trace", skip(database, trash_can))]
  170. async fn handle_trash_event(database: Arc<dyn WorkspaceDatabase>, trash_can: Arc<TrashController>, event: TrashEvent) {
  171. let db_result = database.db_connection();
  172. match event {
  173. TrashEvent::NewTrash(identifiers, ret) | TrashEvent::Putback(identifiers, ret) => {
  174. let result = || {
  175. let conn = &*db_result?;
  176. let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
  177. for identifier in identifiers.items {
  178. let app_table = AppTableSql::read_app(&identifier.id, conn)?;
  179. let _ = notify_apps_changed(&app_table.workspace_id, trash_can.clone(), conn)?;
  180. }
  181. Ok(())
  182. })?;
  183. Ok::<(), FlowyError>(())
  184. };
  185. let _ = ret.send(result()).await;
  186. },
  187. TrashEvent::Delete(identifiers, ret) => {
  188. let result = || {
  189. let conn = &*db_result?;
  190. let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
  191. let mut notify_ids = HashSet::new();
  192. for identifier in identifiers.items {
  193. let app_table = AppTableSql::read_app(&identifier.id, conn)?;
  194. let _ = AppTableSql::delete_app(&identifier.id, conn)?;
  195. notify_ids.insert(app_table.workspace_id);
  196. }
  197. for notify_id in notify_ids {
  198. let _ = notify_apps_changed(&notify_id, trash_can.clone(), conn)?;
  199. }
  200. Ok(())
  201. })?;
  202. Ok::<(), FlowyError>(())
  203. };
  204. let _ = ret.send(result()).await;
  205. },
  206. }
  207. }
  208. #[tracing::instrument(skip(workspace_id, trash_can, conn), err)]
  209. fn notify_apps_changed(
  210. workspace_id: &str,
  211. trash_can: Arc<TrashController>,
  212. conn: &SqliteConnection,
  213. ) -> FlowyResult<()> {
  214. let repeated_app = read_local_workspace_apps(workspace_id, trash_can, conn)?;
  215. send_dart_notification(workspace_id, WorkspaceNotification::WorkspaceAppsChanged)
  216. .payload(repeated_app)
  217. .send();
  218. Ok(())
  219. }
  220. pub fn read_local_workspace_apps(
  221. workspace_id: &str,
  222. trash_controller: Arc<TrashController>,
  223. conn: &SqliteConnection,
  224. ) -> Result<RepeatedApp, FlowyError> {
  225. let mut app_tables = AppTableSql::read_workspace_apps(workspace_id, false, conn)?;
  226. let trash_ids = trash_controller.trash_ids(conn)?;
  227. app_tables.retain(|app_table| !trash_ids.contains(&app_table.id));
  228. let apps = app_tables.into_iter().map(|table| table.into()).collect::<Vec<App>>();
  229. Ok(RepeatedApp { items: apps })
  230. }
  231. // #[tracing::instrument(level = "debug", skip(self), err)]
  232. // pub(crate) async fn delete_app(&self, app_id: &str) -> Result<(),
  233. // FlowyError> { let conn = &*self.database.db_connection()?;
  234. // conn.immediate_transaction::<_, FlowyError, _>(|| {
  235. // let app = AppTableSql::delete_app(app_id, &*conn)?;
  236. // let apps = self.read_local_apps(&app.workspace_id, &*conn)?;
  237. // send_dart_notification(&app.workspace_id,
  238. // WorkspaceNotification::WorkspaceDeleteApp) .payload(apps)
  239. // .send();
  240. // Ok(())
  241. // })?;
  242. //
  243. // let _ = self.delete_app_on_server(app_id);
  244. // Ok(())
  245. // }
  246. //
  247. // #[tracing::instrument(level = "debug", skip(self), err)]
  248. // fn delete_app_on_server(&self, app_id: &str) -> Result<(), FlowyError> {
  249. // let token = self.user.token()?;
  250. // let server = self.server.clone();
  251. // let params = DeleteAppParams {
  252. // app_id: app_id.to_string(),
  253. // };
  254. // spawn(async move {
  255. // match server.delete_app(&token, params).await {
  256. // Ok(_) => {},
  257. // Err(e) => {
  258. // // TODO: retry?
  259. // log::error!("Delete app failed: {:?}", e);
  260. // },
  261. // }
  262. // });
  263. // // let action = RetryAction::new(self.server.clone(), self.user.clone(),
  264. // move // |token, server| { let params = params.clone();
  265. // // async move {
  266. // // match server.delete_app(&token, params).await {
  267. // // Ok(_) => {},
  268. // // Err(e) => log::error!("Delete app failed: {:?}", e),
  269. // // }
  270. // // Ok::<(), FlowyError>(())
  271. // // }
  272. // // });
  273. // //
  274. // // spawn_retry(500, 3, action);
  275. // Ok(())
  276. // }