view_controller.rs 10 KB


  1. use crate::{
  2. entities::view::{CreateViewParams, UpdateViewParams, View},
  3. errors::WorkspaceError,
  4. module::WorkspaceDatabase,
  5. notify::send_dart_notification,
  6. services::{helper::spawn, server::Server},
  7. sql_tables::view::{ViewTable, ViewTableChangeset, ViewTableSql},
  8. };
  9. use crate::{
  10. entities::view::{DeleteViewParams, RepeatedView, ViewIdentifier},
  11. errors::internal_error,
  12. module::WorkspaceUser,
  13. notify::WorkspaceNotification,
  14. services::{TrashCan, TrashEvent},
  15. };
  16. use flowy_database::SqliteConnection;
  17. use flowy_document::{
  18. entities::doc::{CreateDocParams, DocDelta, DocIdentifier},
  19. module::FlowyDocument,
  20. };
  21. use crate::{entities::trash::TrashType, errors::WorkspaceResult};
  22. use futures::{FutureExt, StreamExt};
  23. use std::sync::Arc;
  24. pub(crate) struct ViewController {
  25. user: Arc<dyn WorkspaceUser>,
  26. server: Server,
  27. database: Arc<dyn WorkspaceDatabase>,
  28. trash_can: Arc<TrashCan>,
  29. document: Arc<FlowyDocument>,
  30. }
  31. impl ViewController {
  32. pub(crate) fn new(
  33. user: Arc<dyn WorkspaceUser>,
  34. database: Arc<dyn WorkspaceDatabase>,
  35. server: Server,
  36. trash_can: Arc<TrashCan>,
  37. document: Arc<FlowyDocument>,
  38. ) -> Self {
  39. Self {
  40. user,
  41. server,
  42. database,
  43. trash_can,
  44. document,
  45. }
  46. }
  47. pub(crate) fn init(&self) -> Result<(), WorkspaceError> {
  48. let _ = self.document.init()?;
  49. self.listen_trash_can_event();
  50. Ok(())
  51. }
  52. #[tracing::instrument(level = "debug", skip(self, params), err)]
  53. pub(crate) async fn create_view(&self, params: CreateViewParams) -> Result<View, WorkspaceError> {
  54. let view = self.create_view_on_server(params.clone()).await?;
  55. let conn = &*self.database.db_connection()?;
  56. let trash_can = self.trash_can.clone();
  57. // TODO: rollback anything created before if failed?
  58. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  59. let _ = self.save_view(view.clone(), conn)?;
  60. self.document.create(CreateDocParams::new(&view.id, params.data))?;
  61. let repeated_view = read_belonging_view(&view.belong_to_id, trash_can, &conn)?;
  62. send_dart_notification(&view.belong_to_id, WorkspaceNotification::AppViewsChanged)
  63. .payload(repeated_view)
  64. .send();
  65. Ok(())
  66. })?;
  67. Ok(view)
  68. }
  69. pub(crate) fn save_view(&self, view: View, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
  70. let view_table = ViewTable::new(view);
  71. let _ = ViewTableSql::create_view(view_table, conn)?;
  72. Ok(())
  73. }
  74. pub(crate) async fn read_view(&self, params: ViewIdentifier) -> Result<View, WorkspaceError> {
  75. let conn = self.database.db_connection()?;
  76. let view_table = ViewTableSql::read_view(&params.view_id, &*conn)?;
  77. let trash_ids = self.trash_can.trash_ids(&conn)?;
  78. if trash_ids.contains(&view_table.id) {
  79. return Err(WorkspaceError::record_not_found());
  80. }
  81. let view: View = view_table.into();
  82. let _ = self.read_view_on_server(params);
  83. Ok(view)
  84. }
  85. pub(crate) fn read_view_tables(&self, ids: Vec<String>) -> Result<Vec<ViewTable>, WorkspaceError> {
  86. let conn = &*self.database.db_connection()?;
  87. let mut view_tables = vec![];
  88. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  89. for view_id in ids {
  90. view_tables.push(ViewTableSql::read_view(&view_id, conn)?);
  91. }
  92. Ok(())
  93. })?;
  94. Ok(view_tables)
  95. }
  96. #[tracing::instrument(level = "debug", skip(self), err)]
  97. pub(crate) async fn open_view(&self, params: DocIdentifier) -> Result<DocDelta, WorkspaceError> {
  98. let edit_context = self.document.open(params, self.database.db_pool()?).await?;
  99. Ok(edit_context.delta().await.map_err(internal_error)?)
  100. }
  101. // belong_to_id will be the app_id or view_id.
  102. #[tracing::instrument(level = "debug", skip(self), err)]
  103. pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result<RepeatedView, WorkspaceError> {
  104. // TODO: read from server
  105. let conn = self.database.db_connection()?;
  106. let repeated_view = read_belonging_view(belong_to_id, self.trash_can.clone(), &conn)?;
  107. Ok(repeated_view)
  108. }
  109. #[tracing::instrument(level = "debug", skip(self, params), err)]
  110. pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result<View, WorkspaceError> {
  111. let conn = &*self.database.db_connection()?;
  112. let changeset = ViewTableChangeset::new(params.clone());
  113. let view_id = changeset.id.clone();
  114. let updated_view = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  115. let _ = ViewTableSql::update_view(changeset, conn)?;
  116. let view: View = ViewTableSql::read_view(&view_id, conn)?.into();
  117. Ok(view)
  118. })?;
  119. send_dart_notification(&view_id, WorkspaceNotification::ViewUpdated)
  120. .payload(updated_view.clone())
  121. .send();
  122. let _ = self.update_view_on_server(params);
  123. Ok(updated_view)
  124. }
  125. pub(crate) async fn apply_doc_delta(&self, params: DocDelta) -> Result<DocDelta, WorkspaceError> {
  126. let doc = self.document.apply_doc_delta(params).await?;
  127. Ok(doc)
  128. }
  129. }
  130. impl ViewController {
  131. #[tracing::instrument(skip(self), err)]
  132. async fn create_view_on_server(&self, params: CreateViewParams) -> Result<View, WorkspaceError> {
  133. let token = self.user.token()?;
  134. let view = self.server.create_view(&token, params).await?;
  135. Ok(view)
  136. }
  137. #[tracing::instrument(skip(self), err)]
  138. fn update_view_on_server(&self, params: UpdateViewParams) -> Result<(), WorkspaceError> {
  139. let token = self.user.token()?;
  140. let server = self.server.clone();
  141. spawn(async move {
  142. match server.update_view(&token, params).await {
  143. Ok(_) => {},
  144. Err(e) => {
  145. // TODO: retry?
  146. log::error!("Update view failed: {:?}", e);
  147. },
  148. }
  149. });
  150. Ok(())
  151. }
  152. // #[tracing::instrument(skip(self), err)]
  153. // fn delete_view_on_server(&self, view_ids: Vec<String>) -> Result<(),
  154. // WorkspaceError> { let token = self.user.token()?;
  155. // let server = self.server.clone();
  156. // let params = DeleteViewParams { view_ids };
  157. // spawn(async move {
  158. // match server.delete_view(&token, params).await {
  159. // Ok(_) => {},
  160. // Err(e) => {
  161. // // TODO: retry?
  162. // log::error!("Delete view failed: {:?}", e);
  163. // },
  164. // }
  165. // });
  166. // Ok(())
  167. // }
  168. #[tracing::instrument(skip(self), err)]
  169. fn read_view_on_server(&self, params: ViewIdentifier) -> Result<(), WorkspaceError> {
  170. let token = self.user.token()?;
  171. let server = self.server.clone();
  172. spawn(async move {
  173. match server.read_view(&token, params).await {
  174. Ok(_) => {},
  175. Err(e) => {
  176. // TODO: retry?
  177. log::error!("Read view failed: {:?}", e);
  178. },
  179. }
  180. });
  181. Ok(())
  182. }
  183. fn listen_trash_can_event(&self) {
  184. let mut rx = self.trash_can.subscribe();
  185. let database = self.database.clone();
  186. let document = self.document.clone();
  187. let trash_can = self.trash_can.clone();
  188. let _ = tokio::spawn(async move {
  189. loop {
  190. let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
  191. match result {
  192. Ok(event) => event.select(TrashType::View),
  193. Err(_) => None,
  194. }
  195. }));
  196. let event: Option<TrashEvent> = stream.next().await;
  197. match event {
  198. Some(event) => {
  199. handle_trash_event(database.clone(), document.clone(), trash_can.clone(), event).await
  200. },
  201. None => {},
  202. }
  203. }
  204. });
  205. }
  206. }
  207. async fn handle_trash_event(
  208. database: Arc<dyn WorkspaceDatabase>,
  209. document: Arc<FlowyDocument>,
  210. trash_can: Arc<TrashCan>,
  211. event: TrashEvent,
  212. ) {
  213. let db_result = database.db_connection();
  214. match event {
  215. TrashEvent::NewTrash(_, view_ids, ret) | TrashEvent::Putback(_, view_ids, ret) => {
  216. let result = || {
  217. let conn = &*db_result?;
  218. let _ = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  219. for view_id in view_ids {
  220. let _ = notify_view_num_did_change(&view_id, conn, trash_can.clone())?;
  221. }
  222. Ok(())
  223. })?;
  224. Ok::<(), WorkspaceError>(())
  225. };
  226. let _ = ret.send(result()).await;
  227. },
  228. TrashEvent::Delete(_, delete_ids, ret) => {
  229. let result = || {
  230. let conn = &*db_result?;
  231. let _ = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  232. for view_id in delete_ids {
  233. let _ = ViewTableSql::delete_view(&view_id, conn)?;
  234. let _ = document.delete(view_id.clone().into())?;
  235. let _ = notify_view_num_did_change(&view_id, conn, trash_can.clone())?;
  236. }
  237. Ok(())
  238. })?;
  239. Ok::<(), WorkspaceError>(())
  240. };
  241. let _ = ret.send(result()).await;
  242. },
  243. }
  244. }
  245. #[tracing::instrument(skip(conn, trash_can), err)]
  246. fn notify_view_num_did_change(view_id: &str, conn: &SqliteConnection, trash_can: Arc<TrashCan>) -> WorkspaceResult<()> {
  247. let view_table = ViewTableSql::read_view(view_id, conn)?;
  248. let repeated_view = read_belonging_view(&view_table.belong_to_id, trash_can, conn)?;
  249. send_dart_notification(&view_table.belong_to_id, WorkspaceNotification::AppViewsChanged)
  250. .payload(repeated_view)
  251. .send();
  252. Ok(())
  253. }
  254. fn read_belonging_view(
  255. belong_to_id: &str,
  256. trash_can: Arc<TrashCan>,
  257. conn: &SqliteConnection,
  258. ) -> WorkspaceResult<RepeatedView> {
  259. let mut repeated_view = ViewTableSql::read_views(belong_to_id, conn)?;
  260. let trash_ids = trash_can.trash_ids(conn)?;
  261. repeated_view.retain(|view| !trash_ids.contains(&view.id));
  262. Ok(repeated_view)
  263. }