controller.rs 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. use crate::{
  2. entities::{
  3. app::{App, CreateAppParams, *},
  4. trash::TrashType,
  5. },
  6. errors::*,
  7. module::{WorkspaceDatabase, WorkspaceUser},
  8. notify::*,
  9. services::{
  10. app::sql::{AppTable, AppTableChangeset, AppTableSql},
  11. server::Server,
  12. TrashController, TrashEvent,
  13. },
  14. };
  15. use flowy_database::SqliteConnection;
  16. use futures::{FutureExt, StreamExt};
  17. use std::{collections::HashSet, sync::Arc};
  18. pub(crate) struct AppController {
  19. user: Arc<dyn WorkspaceUser>,
  20. database: Arc<dyn WorkspaceDatabase>,
  21. trash_can: Arc<TrashController>,
  22. server: Server,
  23. }
  24. impl AppController {
  25. pub(crate) fn new(
  26. user: Arc<dyn WorkspaceUser>,
  27. database: Arc<dyn WorkspaceDatabase>,
  28. trash_can: Arc<TrashController>,
  29. server: Server,
  30. ) -> Self {
  31. Self {
  32. user,
  33. database,
  34. trash_can,
  35. server,
  36. }
  37. }
  38. pub fn init(&self) -> Result<(), FlowyError> {
  39. self.listen_trash_controller_event();
  40. Ok(())
  41. }
  42. #[tracing::instrument(level = "debug", skip(self, params), fields(name = %params.name) err)]
  43. pub(crate) async fn create_app_from_params(&self, params: CreateAppParams) -> Result<App, FlowyError> {
  44. let app = self.create_app_on_server(params).await?;
  45. self.create_app_on_local(app).await
  46. }
  47. pub(crate) async fn create_app_on_local(&self, app: App) -> Result<App, FlowyError> {
  48. let conn = &*self.database.db_connection()?;
  49. conn.immediate_transaction::<_, FlowyError, _>(|| {
  50. let _ = self.save_app(app.clone(), &*conn)?;
  51. let _ = notify_apps_changed(&app.workspace_id, self.trash_can.clone(), conn)?;
  52. Ok(())
  53. })?;
  54. Ok(app)
  55. }
  56. pub(crate) fn save_app(&self, app: App, conn: &SqliteConnection) -> Result<(), FlowyError> {
  57. let app_table = AppTable::new(app);
  58. let _ = AppTableSql::create_app(app_table, &*conn)?;
  59. Ok(())
  60. }
  61. pub(crate) async fn read_app(&self, params: AppId) -> Result<App, FlowyError> {
  62. let conn = self.database.db_connection()?;
  63. let app_table = AppTableSql::read_app(&params.app_id, &*conn)?;
  64. let trash_ids = self.trash_can.read_trash_ids(&conn)?;
  65. if trash_ids.contains(&app_table.id) {
  66. return Err(FlowyError::record_not_found());
  67. }
  68. let _ = self.read_app_on_server(params)?;
  69. Ok(app_table.into())
  70. }
  71. pub(crate) async fn update_app(&self, params: UpdateAppParams) -> Result<(), FlowyError> {
  72. let changeset = AppTableChangeset::new(params.clone());
  73. let app_id = changeset.id.clone();
  74. let conn = &*self.database.db_connection()?;
  75. conn.immediate_transaction::<_, FlowyError, _>(|| {
  76. let _ = AppTableSql::update_app(changeset, conn)?;
  77. let app: App = AppTableSql::read_app(&app_id, conn)?.into();
  78. send_dart_notification(&app_id, WorkspaceNotification::AppUpdated)
  79. .payload(app)
  80. .send();
  81. Ok(())
  82. })?;
  83. let _ = self.update_app_on_server(params)?;
  84. Ok(())
  85. }
  86. pub(crate) fn read_app_tables(&self, ids: Vec<String>) -> Result<Vec<AppTable>, FlowyError> {
  87. let conn = &*self.database.db_connection()?;
  88. let mut app_tables = vec![];
  89. conn.immediate_transaction::<_, FlowyError, _>(|| {
  90. for app_id in ids {
  91. app_tables.push(AppTableSql::read_app(&app_id, conn)?);
  92. }
  93. Ok(())
  94. })?;
  95. Ok(app_tables)
  96. }
  97. }
  98. impl AppController {
  99. #[tracing::instrument(level = "debug", skip(self), err)]
  100. async fn create_app_on_server(&self, params: CreateAppParams) -> Result<App, FlowyError> {
  101. let token = self.user.token()?;
  102. let app = self.server.create_app(&token, params).await?;
  103. Ok(app)
  104. }
  105. #[tracing::instrument(level = "debug", skip(self), err)]
  106. fn update_app_on_server(&self, params: UpdateAppParams) -> Result<(), FlowyError> {
  107. let token = self.user.token()?;
  108. let server = self.server.clone();
  109. tokio::spawn(async move {
  110. match server.update_app(&token, params).await {
  111. Ok(_) => {}
  112. Err(e) => {
  113. // TODO: retry?
  114. log::error!("Update app failed: {:?}", e);
  115. }
  116. }
  117. });
  118. Ok(())
  119. }
  120. #[tracing::instrument(level = "debug", skip(self), err)]
  121. fn read_app_on_server(&self, params: AppId) -> Result<(), FlowyError> {
  122. let token = self.user.token()?;
  123. let server = self.server.clone();
  124. let pool = self.database.db_pool()?;
  125. tokio::spawn(async move {
  126. // Opti: retry?
  127. match server.read_app(&token, params).await {
  128. Ok(Some(app)) => match pool.get() {
  129. Ok(conn) => {
  130. let app_table = AppTable::new(app.clone());
  131. let result = AppTableSql::create_app(app_table, &*conn);
  132. match result {
  133. Ok(_) => {
  134. send_dart_notification(&app.id, WorkspaceNotification::AppUpdated)
  135. .payload(app)
  136. .send();
  137. }
  138. Err(e) => log::error!("Save app failed: {:?}", e),
  139. }
  140. }
  141. Err(e) => log::error!("Require db connection failed: {:?}", e),
  142. },
  143. Ok(None) => {}
  144. Err(e) => log::error!("Read app failed: {:?}", e),
  145. }
  146. });
  147. Ok(())
  148. }
  149. fn listen_trash_controller_event(&self) {
  150. let mut rx = self.trash_can.subscribe();
  151. let database = self.database.clone();
  152. let trash_can = self.trash_can.clone();
  153. let _ = tokio::spawn(async move {
  154. loop {
  155. let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
  156. match result {
  157. Ok(event) => event.select(TrashType::App),
  158. Err(_e) => None,
  159. }
  160. }));
  161. if let Some(event) = stream.next().await {
  162. handle_trash_event(database.clone(), trash_can.clone(), event).await
  163. }
  164. }
  165. });
  166. }
  167. }
  168. #[tracing::instrument(level = "trace", skip(database, trash_can))]
  169. async fn handle_trash_event(database: Arc<dyn WorkspaceDatabase>, trash_can: Arc<TrashController>, event: TrashEvent) {
  170. let db_result = database.db_connection();
  171. match event {
  172. TrashEvent::NewTrash(identifiers, ret) | TrashEvent::Putback(identifiers, ret) => {
  173. let result = || {
  174. let conn = &*db_result?;
  175. let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
  176. for identifier in identifiers.items {
  177. let app_table = AppTableSql::read_app(&identifier.id, conn)?;
  178. let _ = notify_apps_changed(&app_table.workspace_id, trash_can.clone(), conn)?;
  179. }
  180. Ok(())
  181. })?;
  182. Ok::<(), FlowyError>(())
  183. };
  184. let _ = ret.send(result()).await;
  185. }
  186. TrashEvent::Delete(identifiers, ret) => {
  187. let result = || {
  188. let conn = &*db_result?;
  189. let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
  190. let mut notify_ids = HashSet::new();
  191. for identifier in identifiers.items {
  192. let app_table = AppTableSql::read_app(&identifier.id, conn)?;
  193. let _ = AppTableSql::delete_app(&identifier.id, conn)?;
  194. notify_ids.insert(app_table.workspace_id);
  195. }
  196. for notify_id in notify_ids {
  197. let _ = notify_apps_changed(&notify_id, trash_can.clone(), conn)?;
  198. }
  199. Ok(())
  200. })?;
  201. Ok::<(), FlowyError>(())
  202. };
  203. let _ = ret.send(result()).await;
  204. }
  205. }
  206. }
  207. #[tracing::instrument(skip(workspace_id, trash_can, conn), err)]
  208. fn notify_apps_changed(
  209. workspace_id: &str,
  210. trash_can: Arc<TrashController>,
  211. conn: &SqliteConnection,
  212. ) -> FlowyResult<()> {
  213. let repeated_app = read_local_workspace_apps(workspace_id, trash_can, conn)?;
  214. send_dart_notification(workspace_id, WorkspaceNotification::WorkspaceAppsChanged)
  215. .payload(repeated_app)
  216. .send();
  217. Ok(())
  218. }
  219. pub fn read_local_workspace_apps(
  220. workspace_id: &str,
  221. trash_controller: Arc<TrashController>,
  222. conn: &SqliteConnection,
  223. ) -> Result<RepeatedApp, FlowyError> {
  224. let mut app_tables = AppTableSql::read_workspace_apps(workspace_id, false, conn)?;
  225. let trash_ids = trash_controller.read_trash_ids(conn)?;
  226. app_tables.retain(|app_table| !trash_ids.contains(&app_table.id));
  227. let apps = app_tables.into_iter().map(|table| table.into()).collect::<Vec<App>>();
  228. Ok(RepeatedApp { items: apps })
  229. }