view_controller.rs 14 KB


  1. use std::{collections::HashSet, sync::Arc};
  2. use futures::{FutureExt, StreamExt};
  3. use flowy_database::SqliteConnection;
  4. use flowy_document::{
  5. entities::doc::{DocDelta, DocIdentifier},
  6. module::FlowyDocument,
  7. };
  8. use crate::{
  9. entities::{
  10. trash::{TrashIdentifiers, TrashType},
  11. view::{CreateViewParams, RepeatedView, UpdateViewParams, View, ViewIdentifier},
  12. },
  13. errors::{internal_error, WorkspaceError, WorkspaceResult},
  14. module::{WorkspaceDatabase, WorkspaceUser},
  15. notify::{send_dart_notification, WorkspaceNotification},
  16. services::{server::Server, TrashCan, TrashEvent},
  17. sql_tables::view::{ViewTable, ViewTableChangeset, ViewTableSql},
  18. };
  19. use flowy_workspace_infra::entities::share::{ExportData, ExportParams};
  20. pub(crate) struct ViewController {
  21. user: Arc<dyn WorkspaceUser>,
  22. server: Server,
  23. database: Arc<dyn WorkspaceDatabase>,
  24. trash_can: Arc<TrashCan>,
  25. document: Arc<FlowyDocument>,
  26. }
  27. impl ViewController {
  28. pub(crate) fn new(
  29. user: Arc<dyn WorkspaceUser>,
  30. database: Arc<dyn WorkspaceDatabase>,
  31. server: Server,
  32. trash_can: Arc<TrashCan>,
  33. document: Arc<FlowyDocument>,
  34. ) -> Self {
  35. Self {
  36. user,
  37. server,
  38. database,
  39. trash_can,
  40. document,
  41. }
  42. }
  43. pub(crate) fn init(&self) -> Result<(), WorkspaceError> {
  44. let _ = self.document.init()?;
  45. self.listen_trash_can_event();
  46. Ok(())
  47. }
  48. #[tracing::instrument(level = "debug", skip(self, params), fields(name = %params.name), err)]
  49. pub(crate) async fn create_view_from_params(&self, params: CreateViewParams) -> Result<View, WorkspaceError> {
  50. let view = self.create_view_on_server(params.clone()).await?;
  51. self.create_view(view).await
  52. }
  53. pub(crate) async fn create_view(&self, view: View) -> Result<View, WorkspaceError> {
  54. let conn = &*self.database.db_connection()?;
  55. let trash_can = self.trash_can.clone();
  56. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  57. let _ = self.save_view(view.clone(), conn)?;
  58. let _ = notify_views_changed(&view.belong_to_id, trash_can, &conn)?;
  59. Ok(())
  60. })?;
  61. Ok(view)
  62. }
  63. pub(crate) fn save_view(&self, view: View, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
  64. let view_table = ViewTable::new(view);
  65. let _ = ViewTableSql::create_view(view_table, conn)?;
  66. Ok(())
  67. }
  68. #[tracing::instrument(skip(self, params), fields(view_id = %params.view_id), err)]
  69. pub(crate) async fn read_view(&self, params: ViewIdentifier) -> Result<View, WorkspaceError> {
  70. let conn = self.database.db_connection()?;
  71. let view_table = ViewTableSql::read_view(&params.view_id, &*conn)?;
  72. let trash_ids = self.trash_can.trash_ids(&conn)?;
  73. if trash_ids.contains(&view_table.id) {
  74. return Err(WorkspaceError::record_not_found());
  75. }
  76. let view: View = view_table.into();
  77. let _ = self.read_view_on_server(params);
  78. Ok(view)
  79. }
  80. pub(crate) fn read_view_tables(&self, ids: Vec<String>) -> Result<Vec<ViewTable>, WorkspaceError> {
  81. let conn = &*self.database.db_connection()?;
  82. let mut view_tables = vec![];
  83. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  84. for view_id in ids {
  85. view_tables.push(ViewTableSql::read_view(&view_id, conn)?);
  86. }
  87. Ok(())
  88. })?;
  89. Ok(view_tables)
  90. }
  91. #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
  92. pub(crate) async fn open_view(&self, params: DocIdentifier) -> Result<DocDelta, WorkspaceError> {
  93. let edit_context = self.document.open(params, self.database.db_pool()?).await?;
  94. Ok(edit_context.delta().await.map_err(internal_error)?)
  95. }
  96. #[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.doc_id), err)]
  97. pub(crate) async fn close_view(&self, params: DocIdentifier) -> Result<(), WorkspaceError> {
  98. let _ = self.document.close(params).await?;
  99. Ok(())
  100. }
  101. #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
  102. pub(crate) async fn duplicate_view(&self, params: DocIdentifier) -> Result<(), WorkspaceError> {
  103. let view: View = ViewTableSql::read_view(&params.doc_id, &*self.database.db_connection()?)?.into();
  104. let delta_data = self
  105. .document
  106. .read_document_data(params, self.database.db_pool()?)
  107. .await?;
  108. let duplicate_params = CreateViewParams {
  109. belong_to_id: view.belong_to_id.clone(),
  110. name: format!("{}_copy", &view.name),
  111. desc: view.desc.clone(),
  112. thumbnail: "".to_owned(),
  113. view_type: view.view_type.clone(),
  114. data: delta_data.data,
  115. };
  116. let _ = self.create_view_from_params(duplicate_params).await?;
  117. Ok(())
  118. }
  119. #[tracing::instrument(level = "debug", skip(self, params), err)]
  120. pub(crate) async fn export_doc(&self, params: ExportParams) -> Result<ExportData, WorkspaceError> {
  121. let doc_identifier: DocIdentifier = params.doc_id.into();
  122. let doc = self
  123. .document
  124. .read_document_data(doc_identifier, self.database.db_pool()?)
  125. .await?;
  126. Ok(ExportData {
  127. data: doc.data,
  128. export_type: params.export_type,
  129. })
  130. }
  131. // belong_to_id will be the app_id or view_id.
  132. #[tracing::instrument(level = "debug", skip(self), err)]
  133. pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result<RepeatedView, WorkspaceError> {
  134. // TODO: read from server
  135. let conn = self.database.db_connection()?;
  136. let repeated_view = read_local_belonging_view(belong_to_id, self.trash_can.clone(), &conn)?;
  137. Ok(repeated_view)
  138. }
  139. #[tracing::instrument(level = "debug", skip(self, params), err)]
  140. pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result<View, WorkspaceError> {
  141. let conn = &*self.database.db_connection()?;
  142. let changeset = ViewTableChangeset::new(params.clone());
  143. let view_id = changeset.id.clone();
  144. let updated_view = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  145. let _ = ViewTableSql::update_view(changeset, conn)?;
  146. let view: View = ViewTableSql::read_view(&view_id, conn)?.into();
  147. Ok(view)
  148. })?;
  149. send_dart_notification(&view_id, WorkspaceNotification::ViewUpdated)
  150. .payload(updated_view.clone())
  151. .send();
  152. //
  153. let _ = notify_views_changed(&updated_view.belong_to_id, self.trash_can.clone(), conn)?;
  154. let _ = self.update_view_on_server(params);
  155. Ok(updated_view)
  156. }
  157. pub(crate) async fn apply_doc_delta(&self, params: DocDelta) -> Result<DocDelta, WorkspaceError> {
  158. let doc = self.document.apply_doc_delta(params).await?;
  159. Ok(doc)
  160. }
  161. }
  162. impl ViewController {
  163. #[tracing::instrument(skip(self), err)]
  164. async fn create_view_on_server(&self, params: CreateViewParams) -> Result<View, WorkspaceError> {
  165. let token = self.user.token()?;
  166. let view = self.server.create_view(&token, params).await?;
  167. Ok(view)
  168. }
  169. #[tracing::instrument(skip(self), err)]
  170. fn update_view_on_server(&self, params: UpdateViewParams) -> Result<(), WorkspaceError> {
  171. let token = self.user.token()?;
  172. let server = self.server.clone();
  173. tokio::spawn(async move {
  174. match server.update_view(&token, params).await {
  175. Ok(_) => {},
  176. Err(e) => {
  177. // TODO: retry?
  178. log::error!("Update view failed: {:?}", e);
  179. },
  180. }
  181. });
  182. Ok(())
  183. }
  184. #[tracing::instrument(skip(self), err)]
  185. fn read_view_on_server(&self, params: ViewIdentifier) -> Result<(), WorkspaceError> {
  186. let token = self.user.token()?;
  187. let server = self.server.clone();
  188. let pool = self.database.db_pool()?;
  189. // Opti: retry?
  190. tokio::spawn(async move {
  191. match server.read_view(&token, params).await {
  192. Ok(Some(view)) => match pool.get() {
  193. Ok(conn) => {
  194. let view_table = ViewTable::new(view.clone());
  195. let result = ViewTableSql::create_view(view_table, &conn);
  196. match result {
  197. Ok(_) => {
  198. send_dart_notification(&view.id, WorkspaceNotification::ViewUpdated)
  199. .payload(view.clone())
  200. .send();
  201. },
  202. Err(e) => log::error!("Save view failed: {:?}", e),
  203. }
  204. },
  205. Err(e) => log::error!("Require db connection failed: {:?}", e),
  206. },
  207. Ok(None) => {},
  208. Err(e) => log::error!("Read view failed: {:?}", e),
  209. }
  210. });
  211. Ok(())
  212. }
  213. fn listen_trash_can_event(&self) {
  214. let mut rx = self.trash_can.subscribe();
  215. let database = self.database.clone();
  216. let document = self.document.clone();
  217. let trash_can = self.trash_can.clone();
  218. let _ = tokio::spawn(async move {
  219. loop {
  220. let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
  221. match result {
  222. Ok(event) => event.select(TrashType::View),
  223. Err(_e) => None,
  224. }
  225. }));
  226. match stream.next().await {
  227. Some(event) => {
  228. handle_trash_event(database.clone(), document.clone(), trash_can.clone(), event).await
  229. },
  230. None => {},
  231. }
  232. }
  233. });
  234. }
  235. }
  236. #[tracing::instrument(level = "trace", skip(database, document, trash_can))]
  237. async fn handle_trash_event(
  238. database: Arc<dyn WorkspaceDatabase>,
  239. document: Arc<FlowyDocument>,
  240. trash_can: Arc<TrashCan>,
  241. event: TrashEvent,
  242. ) {
  243. let db_result = database.db_connection();
  244. match event {
  245. TrashEvent::NewTrash(identifiers, ret) => {
  246. let result = || {
  247. let conn = &*db_result?;
  248. let view_tables = get_view_table_from(identifiers, conn)?;
  249. for view_table in view_tables {
  250. let _ = notify_views_changed(&view_table.belong_to_id, trash_can.clone(), conn)?;
  251. notify_dart(view_table, WorkspaceNotification::ViewDeleted);
  252. }
  253. Ok::<(), WorkspaceError>(())
  254. };
  255. let _ = ret.send(result()).await;
  256. },
  257. TrashEvent::Putback(identifiers, ret) => {
  258. let result = || {
  259. let conn = &*db_result?;
  260. let view_tables = get_view_table_from(identifiers, conn)?;
  261. for view_table in view_tables {
  262. let _ = notify_views_changed(&view_table.belong_to_id, trash_can.clone(), conn)?;
  263. notify_dart(view_table, WorkspaceNotification::ViewRestored);
  264. }
  265. Ok::<(), WorkspaceError>(())
  266. };
  267. let _ = ret.send(result()).await;
  268. },
  269. TrashEvent::Delete(identifiers, ret) => {
  270. let result = || {
  271. let conn = &*db_result?;
  272. let _ = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  273. let mut notify_ids = HashSet::new();
  274. for identifier in identifiers.items {
  275. let view_table = ViewTableSql::read_view(&identifier.id, conn)?;
  276. let _ = ViewTableSql::delete_view(&identifier.id, conn)?;
  277. let _ = document.delete(identifier.id.clone().into())?;
  278. notify_ids.insert(view_table.belong_to_id);
  279. }
  280. for notify_id in notify_ids {
  281. let _ = notify_views_changed(&notify_id, trash_can.clone(), conn)?;
  282. }
  283. Ok(())
  284. })?;
  285. Ok::<(), WorkspaceError>(())
  286. };
  287. let _ = ret.send(result()).await;
  288. },
  289. }
  290. }
  291. fn get_view_table_from(
  292. identifiers: TrashIdentifiers,
  293. conn: &SqliteConnection,
  294. ) -> Result<Vec<ViewTable>, WorkspaceError> {
  295. let mut view_tables = vec![];
  296. let _ = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  297. for identifier in identifiers.items {
  298. let view_table = ViewTableSql::read_view(&identifier.id, conn)?;
  299. view_tables.push(view_table);
  300. }
  301. Ok(())
  302. })?;
  303. Ok(view_tables)
  304. }
  305. fn notify_dart(view_table: ViewTable, notification: WorkspaceNotification) {
  306. let view: View = view_table.into();
  307. send_dart_notification(&view.id, notification).payload(view).send();
  308. }
  309. #[tracing::instrument(skip(belong_to_id, trash_can, conn), fields(view_count), err)]
  310. fn notify_views_changed(belong_to_id: &str, trash_can: Arc<TrashCan>, conn: &SqliteConnection) -> WorkspaceResult<()> {
  311. let repeated_view = read_local_belonging_view(belong_to_id, trash_can.clone(), conn)?;
  312. tracing::Span::current().record("view_count", &format!("{}", repeated_view.len()).as_str());
  313. send_dart_notification(&belong_to_id, WorkspaceNotification::AppViewsChanged)
  314. .payload(repeated_view)
  315. .send();
  316. Ok(())
  317. }
  318. fn read_local_belonging_view(
  319. belong_to_id: &str,
  320. trash_can: Arc<TrashCan>,
  321. conn: &SqliteConnection,
  322. ) -> WorkspaceResult<RepeatedView> {
  323. let mut view_tables = ViewTableSql::read_views(belong_to_id, conn)?;
  324. let trash_ids = trash_can.trash_ids(conn)?;
  325. view_tables.retain(|view_table| !trash_ids.contains(&view_table.id));
  326. let views = view_tables
  327. .into_iter()
  328. .map(|view_table| view_table.into())
  329. .collect::<Vec<View>>();
  330. Ok(RepeatedView { items: views })
  331. }