view_controller.rs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. use std::{collections::HashSet, sync::Arc};
  2. use futures::{FutureExt, StreamExt};
  3. use flowy_database::SqliteConnection;
  4. use flowy_document::{
  5. entities::doc::{DocDelta, DocIdentifier},
  6. module::FlowyDocument,
  7. };
  8. use crate::{
  9. entities::{
  10. trash::{TrashIdentifiers, TrashType},
  11. view::{CreateViewParams, RepeatedView, UpdateViewParams, View, ViewIdentifier},
  12. },
  13. errors::{internal_error, WorkspaceError, WorkspaceResult},
  14. module::{WorkspaceDatabase, WorkspaceUser},
  15. notify::{send_dart_notification, WorkspaceNotification},
  16. services::{server::Server, TrashCan, TrashEvent},
  17. sql_tables::view::{ViewTable, ViewTableChangeset, ViewTableSql},
  18. };
  19. use flowy_workspace_infra::entities::share::{ExportData, ExportParams, ExportRequest};
  20. pub(crate) struct ViewController {
  21. user: Arc<dyn WorkspaceUser>,
  22. server: Server,
  23. database: Arc<dyn WorkspaceDatabase>,
  24. trash_can: Arc<TrashCan>,
  25. document: Arc<FlowyDocument>,
  26. }
  27. impl ViewController {
  28. pub(crate) fn new(
  29. user: Arc<dyn WorkspaceUser>,
  30. database: Arc<dyn WorkspaceDatabase>,
  31. server: Server,
  32. trash_can: Arc<TrashCan>,
  33. document: Arc<FlowyDocument>,
  34. ) -> Self {
  35. Self {
  36. user,
  37. server,
  38. database,
  39. trash_can,
  40. document,
  41. }
  42. }
  43. pub(crate) fn init(&self) -> Result<(), WorkspaceError> {
  44. let _ = self.document.init()?;
  45. self.listen_trash_can_event();
  46. Ok(())
  47. }
  48. #[tracing::instrument(level = "debug", skip(self, params), fields(name = %params.name), err)]
  49. pub(crate) async fn create_view_from_params(&self, params: CreateViewParams) -> Result<View, WorkspaceError> {
  50. let view = self.create_view_on_server(params.clone()).await?;
  51. self.create_view(view).await
  52. }
  53. pub(crate) async fn create_view(&self, view: View) -> Result<View, WorkspaceError> {
  54. let conn = &*self.database.db_connection()?;
  55. let trash_can = self.trash_can.clone();
  56. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  57. let _ = self.save_view(view.clone(), conn)?;
  58. let _ = notify_views_changed(&view.belong_to_id, trash_can, &conn)?;
  59. Ok(())
  60. })?;
  61. Ok(view)
  62. }
  63. pub(crate) fn save_view(&self, view: View, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
  64. let view_table = ViewTable::new(view);
  65. let _ = ViewTableSql::create_view(view_table, conn)?;
  66. Ok(())
  67. }
  68. #[tracing::instrument(skip(self, params), fields(view_id = %params.view_id), err)]
  69. pub(crate) async fn read_view(&self, params: ViewIdentifier) -> Result<View, WorkspaceError> {
  70. let conn = self.database.db_connection()?;
  71. let view_table = ViewTableSql::read_view(&params.view_id, &*conn)?;
  72. let trash_ids = self.trash_can.trash_ids(&conn)?;
  73. if trash_ids.contains(&view_table.id) {
  74. return Err(WorkspaceError::record_not_found());
  75. }
  76. let view: View = view_table.into();
  77. let _ = self.read_view_on_server(params);
  78. Ok(view)
  79. }
  80. pub(crate) fn read_view_tables(&self, ids: Vec<String>) -> Result<Vec<ViewTable>, WorkspaceError> {
  81. let conn = &*self.database.db_connection()?;
  82. let mut view_tables = vec![];
  83. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  84. for view_id in ids {
  85. view_tables.push(ViewTableSql::read_view(&view_id, conn)?);
  86. }
  87. Ok(())
  88. })?;
  89. Ok(view_tables)
  90. }
  91. #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
  92. pub(crate) async fn open_view(&self, params: DocIdentifier) -> Result<DocDelta, WorkspaceError> {
  93. let edit_context = self.document.open(params, self.database.db_pool()?).await?;
  94. Ok(edit_context.delta().await.map_err(internal_error)?)
  95. }
  96. #[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.doc_id), err)]
  97. pub(crate) async fn close_view(&self, params: DocIdentifier) -> Result<(), WorkspaceError> {
  98. let _ = self.document.close(params).await?;
  99. Ok(())
  100. }
  101. #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
  102. pub(crate) async fn duplicate_view(&self, params: DocIdentifier) -> Result<(), WorkspaceError> {
  103. let view: View = ViewTableSql::read_view(&params.doc_id, &*self.database.db_connection()?)?.into();
  104. let delta_data = self
  105. .document
  106. .read_document_data(params, self.database.db_pool()?)
  107. .await?;
  108. let duplicate_params = CreateViewParams {
  109. belong_to_id: view.belong_to_id.clone(),
  110. name: format!("{}_copy", &view.name),
  111. desc: view.desc.clone(),
  112. thumbnail: "".to_owned(),
  113. view_type: view.view_type.clone(),
  114. data: delta_data.data,
  115. };
  116. let _ = self.create_view_from_params(duplicate_params).await?;
  117. Ok(())
  118. }
  119. #[tracing::instrument(level = "debug", skip(self, params), err)]
  120. pub(crate) async fn export_doc(&self, params: ExportParams) -> Result<ExportData, WorkspaceError> {
  121. unimplemented!()
  122. }
  123. // belong_to_id will be the app_id or view_id.
  124. #[tracing::instrument(level = "debug", skip(self), err)]
  125. pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result<RepeatedView, WorkspaceError> {
  126. // TODO: read from server
  127. let conn = self.database.db_connection()?;
  128. let repeated_view = read_local_belonging_view(belong_to_id, self.trash_can.clone(), &conn)?;
  129. Ok(repeated_view)
  130. }
  131. #[tracing::instrument(level = "debug", skip(self, params), err)]
  132. pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result<View, WorkspaceError> {
  133. let conn = &*self.database.db_connection()?;
  134. let changeset = ViewTableChangeset::new(params.clone());
  135. let view_id = changeset.id.clone();
  136. let updated_view = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  137. let _ = ViewTableSql::update_view(changeset, conn)?;
  138. let view: View = ViewTableSql::read_view(&view_id, conn)?.into();
  139. Ok(view)
  140. })?;
  141. send_dart_notification(&view_id, WorkspaceNotification::ViewUpdated)
  142. .payload(updated_view.clone())
  143. .send();
  144. //
  145. let _ = notify_views_changed(&updated_view.belong_to_id, self.trash_can.clone(), conn)?;
  146. let _ = self.update_view_on_server(params);
  147. Ok(updated_view)
  148. }
  149. pub(crate) async fn apply_doc_delta(&self, params: DocDelta) -> Result<DocDelta, WorkspaceError> {
  150. let doc = self.document.apply_doc_delta(params).await?;
  151. Ok(doc)
  152. }
  153. }
  154. impl ViewController {
  155. #[tracing::instrument(skip(self), err)]
  156. async fn create_view_on_server(&self, params: CreateViewParams) -> Result<View, WorkspaceError> {
  157. let token = self.user.token()?;
  158. let view = self.server.create_view(&token, params).await?;
  159. Ok(view)
  160. }
  161. #[tracing::instrument(skip(self), err)]
  162. fn update_view_on_server(&self, params: UpdateViewParams) -> Result<(), WorkspaceError> {
  163. let token = self.user.token()?;
  164. let server = self.server.clone();
  165. tokio::spawn(async move {
  166. match server.update_view(&token, params).await {
  167. Ok(_) => {},
  168. Err(e) => {
  169. // TODO: retry?
  170. log::error!("Update view failed: {:?}", e);
  171. },
  172. }
  173. });
  174. Ok(())
  175. }
  176. #[tracing::instrument(skip(self), err)]
  177. fn read_view_on_server(&self, params: ViewIdentifier) -> Result<(), WorkspaceError> {
  178. let token = self.user.token()?;
  179. let server = self.server.clone();
  180. let pool = self.database.db_pool()?;
  181. // Opti: retry?
  182. tokio::spawn(async move {
  183. match server.read_view(&token, params).await {
  184. Ok(Some(view)) => match pool.get() {
  185. Ok(conn) => {
  186. let view_table = ViewTable::new(view.clone());
  187. let result = ViewTableSql::create_view(view_table, &conn);
  188. match result {
  189. Ok(_) => {
  190. send_dart_notification(&view.id, WorkspaceNotification::ViewUpdated)
  191. .payload(view.clone())
  192. .send();
  193. },
  194. Err(e) => log::error!("Save view failed: {:?}", e),
  195. }
  196. },
  197. Err(e) => log::error!("Require db connection failed: {:?}", e),
  198. },
  199. Ok(None) => {},
  200. Err(e) => log::error!("Read view failed: {:?}", e),
  201. }
  202. });
  203. Ok(())
  204. }
  205. fn listen_trash_can_event(&self) {
  206. let mut rx = self.trash_can.subscribe();
  207. let database = self.database.clone();
  208. let document = self.document.clone();
  209. let trash_can = self.trash_can.clone();
  210. let _ = tokio::spawn(async move {
  211. loop {
  212. let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
  213. match result {
  214. Ok(event) => event.select(TrashType::View),
  215. Err(_e) => None,
  216. }
  217. }));
  218. match stream.next().await {
  219. Some(event) => {
  220. handle_trash_event(database.clone(), document.clone(), trash_can.clone(), event).await
  221. },
  222. None => {},
  223. }
  224. }
  225. });
  226. }
  227. }
  228. #[tracing::instrument(level = "trace", skip(database, document, trash_can))]
  229. async fn handle_trash_event(
  230. database: Arc<dyn WorkspaceDatabase>,
  231. document: Arc<FlowyDocument>,
  232. trash_can: Arc<TrashCan>,
  233. event: TrashEvent,
  234. ) {
  235. let db_result = database.db_connection();
  236. match event {
  237. TrashEvent::NewTrash(identifiers, ret) => {
  238. let result = || {
  239. let conn = &*db_result?;
  240. let view_tables = get_view_table_from(identifiers, conn)?;
  241. for view_table in view_tables {
  242. let _ = notify_views_changed(&view_table.belong_to_id, trash_can.clone(), conn)?;
  243. notify_dart(view_table, WorkspaceNotification::ViewDeleted);
  244. }
  245. Ok::<(), WorkspaceError>(())
  246. };
  247. let _ = ret.send(result()).await;
  248. },
  249. TrashEvent::Putback(identifiers, ret) => {
  250. let result = || {
  251. let conn = &*db_result?;
  252. let view_tables = get_view_table_from(identifiers, conn)?;
  253. for view_table in view_tables {
  254. let _ = notify_views_changed(&view_table.belong_to_id, trash_can.clone(), conn)?;
  255. notify_dart(view_table, WorkspaceNotification::ViewRestored);
  256. }
  257. Ok::<(), WorkspaceError>(())
  258. };
  259. let _ = ret.send(result()).await;
  260. },
  261. TrashEvent::Delete(identifiers, ret) => {
  262. let result = || {
  263. let conn = &*db_result?;
  264. let _ = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  265. let mut notify_ids = HashSet::new();
  266. for identifier in identifiers.items {
  267. let view_table = ViewTableSql::read_view(&identifier.id, conn)?;
  268. let _ = ViewTableSql::delete_view(&identifier.id, conn)?;
  269. let _ = document.delete(identifier.id.clone().into())?;
  270. notify_ids.insert(view_table.belong_to_id);
  271. }
  272. for notify_id in notify_ids {
  273. let _ = notify_views_changed(&notify_id, trash_can.clone(), conn)?;
  274. }
  275. Ok(())
  276. })?;
  277. Ok::<(), WorkspaceError>(())
  278. };
  279. let _ = ret.send(result()).await;
  280. },
  281. }
  282. }
  283. fn get_view_table_from(
  284. identifiers: TrashIdentifiers,
  285. conn: &SqliteConnection,
  286. ) -> Result<Vec<ViewTable>, WorkspaceError> {
  287. let mut view_tables = vec![];
  288. let _ = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  289. for identifier in identifiers.items {
  290. let view_table = ViewTableSql::read_view(&identifier.id, conn)?;
  291. view_tables.push(view_table);
  292. }
  293. Ok(())
  294. })?;
  295. Ok(view_tables)
  296. }
  297. fn notify_dart(view_table: ViewTable, notification: WorkspaceNotification) {
  298. let view: View = view_table.into();
  299. send_dart_notification(&view.id, notification).payload(view).send();
  300. }
  301. #[tracing::instrument(skip(belong_to_id, trash_can, conn), fields(view_count), err)]
  302. fn notify_views_changed(belong_to_id: &str, trash_can: Arc<TrashCan>, conn: &SqliteConnection) -> WorkspaceResult<()> {
  303. let repeated_view = read_local_belonging_view(belong_to_id, trash_can.clone(), conn)?;
  304. tracing::Span::current().record("view_count", &format!("{}", repeated_view.len()).as_str());
  305. send_dart_notification(&belong_to_id, WorkspaceNotification::AppViewsChanged)
  306. .payload(repeated_view)
  307. .send();
  308. Ok(())
  309. }
  310. fn read_local_belonging_view(
  311. belong_to_id: &str,
  312. trash_can: Arc<TrashCan>,
  313. conn: &SqliteConnection,
  314. ) -> WorkspaceResult<RepeatedView> {
  315. let mut view_tables = ViewTableSql::read_views(belong_to_id, conn)?;
  316. let trash_ids = trash_can.trash_ids(conn)?;
  317. view_tables.retain(|view_table| !trash_ids.contains(&view_table.id));
  318. let views = view_tables
  319. .into_iter()
  320. .map(|view_table| view_table.into())
  321. .collect::<Vec<View>>();
  322. Ok(RepeatedView { items: views })
  323. }