view_controller.rs 9.5 KB


  1. use crate::{
  2. entities::view::{CreateViewParams, UpdateViewParams, View},
  3. errors::WorkspaceError,
  4. module::WorkspaceDatabase,
  5. notify::send_dart_notification,
  6. services::{helper::spawn, server::Server},
  7. sql_tables::view::{ViewTable, ViewTableChangeset, ViewTableSql},
  8. };
  9. use crate::{
  10. entities::view::{DeleteViewParams, RepeatedView, ViewIdentifier},
  11. errors::internal_error,
  12. module::WorkspaceUser,
  13. notify::WorkspaceNotification,
  14. services::{TrashCan, TrashEvent},
  15. };
  16. use flowy_database::SqliteConnection;
  17. use flowy_document::{
  18. entities::doc::{CreateDocParams, DocDelta, DocIdentifier},
  19. module::FlowyDocument,
  20. };
  21. use crate::{entities::trash::TrashType, errors::WorkspaceResult};
  22. use futures::{FutureExt, StreamExt};
  23. use std::sync::Arc;
  24. pub(crate) struct ViewController {
  25. user: Arc<dyn WorkspaceUser>,
  26. server: Server,
  27. database: Arc<dyn WorkspaceDatabase>,
  28. trash_can: Arc<TrashCan>,
  29. document: Arc<FlowyDocument>,
  30. }
  31. impl ViewController {
  32. pub(crate) fn new(
  33. user: Arc<dyn WorkspaceUser>,
  34. database: Arc<dyn WorkspaceDatabase>,
  35. server: Server,
  36. trash_can: Arc<TrashCan>,
  37. document: Arc<FlowyDocument>,
  38. ) -> Self {
  39. Self {
  40. user,
  41. server,
  42. database,
  43. trash_can,
  44. document,
  45. }
  46. }
  47. pub(crate) fn init(&self) -> Result<(), WorkspaceError> {
  48. let _ = self.document.init()?;
  49. self.listen_trash_can_event();
  50. Ok(())
  51. }
  52. #[tracing::instrument(level = "debug", skip(self, params), err)]
  53. pub(crate) async fn create_view(&self, params: CreateViewParams) -> Result<View, WorkspaceError> {
  54. let view = self.create_view_on_server(params.clone()).await?;
  55. let conn = &*self.database.db_connection()?;
  56. // TODO: rollback anything created before if failed?
  57. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  58. let _ = self.save_view(view.clone(), conn)?;
  59. self.document.create(CreateDocParams::new(&view.id, params.data))?;
  60. let repeated_view = ViewTableSql::read_views(&view.belong_to_id, conn)?;
  61. send_dart_notification(&view.belong_to_id, WorkspaceNotification::AppViewsChanged)
  62. .payload(repeated_view)
  63. .send();
  64. Ok(())
  65. })?;
  66. Ok(view)
  67. }
  68. pub(crate) fn save_view(&self, view: View, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
  69. let view_table = ViewTable::new(view);
  70. let _ = ViewTableSql::create_view(view_table, conn)?;
  71. Ok(())
  72. }
  73. pub(crate) async fn read_view(&self, params: ViewIdentifier) -> Result<View, WorkspaceError> {
  74. let conn = self.database.db_connection()?;
  75. let view_table = ViewTableSql::read_view(&params.view_id, &*conn)?;
  76. let view: View = view_table.into();
  77. let _ = self.read_view_on_server(params);
  78. Ok(view)
  79. }
  80. pub(crate) fn read_view_tables(&self, ids: Vec<String>) -> Result<Vec<ViewTable>, WorkspaceError> {
  81. let conn = &*self.database.db_connection()?;
  82. let mut view_tables = vec![];
  83. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  84. for view_id in ids {
  85. view_tables.push(ViewTableSql::read_view(&view_id, conn)?);
  86. }
  87. Ok(())
  88. })?;
  89. Ok(view_tables)
  90. }
  91. #[tracing::instrument(level = "debug", skip(self), err)]
  92. pub(crate) async fn open_view(&self, params: DocIdentifier) -> Result<DocDelta, WorkspaceError> {
  93. let edit_context = self.document.open(params, self.database.db_pool()?).await?;
  94. Ok(edit_context.delta().await.map_err(internal_error)?)
  95. }
  96. // belong_to_id will be the app_id or view_id.
  97. #[tracing::instrument(level = "debug", skip(self), err)]
  98. pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result<RepeatedView, WorkspaceError> {
  99. // TODO: read from server
  100. let conn = self.database.db_connection()?;
  101. let repeated_view = ViewTableSql::read_views(belong_to_id, &*conn)?;
  102. Ok(repeated_view)
  103. }
  104. #[tracing::instrument(level = "debug", skip(self, params), err)]
  105. pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result<View, WorkspaceError> {
  106. let conn = &*self.database.db_connection()?;
  107. let changeset = ViewTableChangeset::new(params.clone());
  108. let view_id = changeset.id.clone();
  109. let updated_view = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  110. let _ = ViewTableSql::update_view(changeset, conn)?;
  111. let view: View = ViewTableSql::read_view(&view_id, conn)?.into();
  112. Ok(view)
  113. })?;
  114. send_dart_notification(&view_id, WorkspaceNotification::ViewUpdated)
  115. .payload(updated_view.clone())
  116. .send();
  117. let _ = self.update_view_on_server(params);
  118. Ok(updated_view)
  119. }
  120. pub(crate) async fn apply_doc_delta(&self, params: DocDelta) -> Result<DocDelta, WorkspaceError> {
  121. let doc = self.document.apply_doc_delta(params).await?;
  122. Ok(doc)
  123. }
  124. }
  125. impl ViewController {
  126. #[tracing::instrument(skip(self), err)]
  127. async fn create_view_on_server(&self, params: CreateViewParams) -> Result<View, WorkspaceError> {
  128. let token = self.user.token()?;
  129. let view = self.server.create_view(&token, params).await?;
  130. Ok(view)
  131. }
  132. #[tracing::instrument(skip(self), err)]
  133. fn update_view_on_server(&self, params: UpdateViewParams) -> Result<(), WorkspaceError> {
  134. let token = self.user.token()?;
  135. let server = self.server.clone();
  136. spawn(async move {
  137. match server.update_view(&token, params).await {
  138. Ok(_) => {},
  139. Err(e) => {
  140. // TODO: retry?
  141. log::error!("Update view failed: {:?}", e);
  142. },
  143. }
  144. });
  145. Ok(())
  146. }
  147. #[tracing::instrument(skip(self), err)]
  148. fn delete_view_on_server(&self, view_ids: Vec<String>) -> Result<(), WorkspaceError> {
  149. let token = self.user.token()?;
  150. let server = self.server.clone();
  151. let params = DeleteViewParams { view_ids };
  152. spawn(async move {
  153. match server.delete_view(&token, params).await {
  154. Ok(_) => {},
  155. Err(e) => {
  156. // TODO: retry?
  157. log::error!("Delete view failed: {:?}", e);
  158. },
  159. }
  160. });
  161. Ok(())
  162. }
  163. #[tracing::instrument(skip(self), err)]
  164. fn read_view_on_server(&self, params: ViewIdentifier) -> Result<(), WorkspaceError> {
  165. let token = self.user.token()?;
  166. let server = self.server.clone();
  167. spawn(async move {
  168. match server.read_view(&token, params).await {
  169. Ok(_) => {},
  170. Err(e) => {
  171. // TODO: retry?
  172. log::error!("Read view failed: {:?}", e);
  173. },
  174. }
  175. });
  176. Ok(())
  177. }
  178. fn listen_trash_can_event(&self) {
  179. let mut rx = self.trash_can.subscribe();
  180. let database = self.database.clone();
  181. let document = self.document.clone();
  182. let _ = tokio::spawn(async move {
  183. loop {
  184. let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
  185. match result {
  186. Ok(event) => event.select(TrashType::View),
  187. Err(_) => None,
  188. }
  189. }));
  190. let event: Option<TrashEvent> = stream.next().await;
  191. match event {
  192. Some(event) => handle_trash_event(database.clone(), document.clone(), event),
  193. None => {},
  194. }
  195. }
  196. });
  197. }
  198. }
  199. fn handle_trash_event(database: Arc<dyn WorkspaceDatabase>, document: Arc<FlowyDocument>, event: TrashEvent) {
  200. let db_result = database.db_connection();
  201. match event {
  202. TrashEvent::NewTrash(_, view_ids, ret) | TrashEvent::Putback(_, view_ids, ret) => {
  203. let result = || {
  204. let conn = &*db_result?;
  205. let _ = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  206. for view_id in view_ids {
  207. let _ = notify_view_num_did_change(&view_id, conn)?;
  208. }
  209. Ok(())
  210. })?;
  211. Ok::<(), WorkspaceError>(())
  212. };
  213. let _ = ret.send(result());
  214. },
  215. TrashEvent::Delete(_, delete_ids, ret) => {
  216. let result = || {
  217. let conn = &*db_result?;
  218. let _ = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  219. for view_id in delete_ids {
  220. let _ = ViewTableSql::delete_view(&view_id, conn)?;
  221. let _ = document.delete(view_id.clone().into())?;
  222. let _ = notify_view_num_did_change(&view_id, conn)?;
  223. }
  224. Ok(())
  225. })?;
  226. Ok::<(), WorkspaceError>(())
  227. };
  228. let _ = ret.send(result());
  229. },
  230. }
  231. }
  232. fn notify_view_num_did_change(view_id: &str, conn: &SqliteConnection) -> WorkspaceResult<()> {
  233. let view_table = ViewTableSql::read_view(view_id, conn)?;
  234. let repeated_view = ViewTableSql::read_views(&view_table.belong_to_id, conn)?;
  235. send_dart_notification(&view_table.belong_to_id, WorkspaceNotification::AppViewsChanged)
  236. .payload(repeated_view)
  237. .send();
  238. Ok(())
  239. }