view_controller.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. use flowy_database::SqliteConnection;
  2. use flowy_document_infra::entities::doc::{DocDelta, DocIdentifier};
  3. use futures::{FutureExt, StreamExt};
  4. use std::{collections::HashSet, sync::Arc};
  5. use crate::{
  6. entities::{
  7. trash::{TrashIdentifiers, TrashType},
  8. view::{CreateViewParams, RepeatedView, UpdateViewParams, View, ViewIdentifier},
  9. },
  10. errors::{internal_error, WorkspaceError, WorkspaceResult},
  11. module::{WorkspaceDatabase, WorkspaceUser},
  12. notify::{send_dart_notification, WorkspaceNotification},
  13. services::{server::Server, TrashCan, TrashEvent},
  14. sql_tables::view::{ViewTable, ViewTableChangeset, ViewTableSql},
  15. };
  16. use flowy_document::module::FlowyDocument;
  17. use flowy_infra::kv::KV;
  18. use flowy_workspace_infra::entities::share::{ExportData, ExportParams};
  19. const LATEST_VIEW_ID: &str = "latest_view_id";
  20. pub(crate) struct ViewController {
  21. user: Arc<dyn WorkspaceUser>,
  22. server: Server,
  23. database: Arc<dyn WorkspaceDatabase>,
  24. trash_can: Arc<TrashCan>,
  25. document: Arc<FlowyDocument>,
  26. }
  27. impl ViewController {
  28. pub(crate) fn new(
  29. user: Arc<dyn WorkspaceUser>,
  30. database: Arc<dyn WorkspaceDatabase>,
  31. server: Server,
  32. trash_can: Arc<TrashCan>,
  33. document: Arc<FlowyDocument>,
  34. ) -> Self {
  35. Self {
  36. user,
  37. server,
  38. database,
  39. trash_can,
  40. document,
  41. }
  42. }
  43. pub(crate) fn init(&self) -> Result<(), WorkspaceError> {
  44. let _ = self.document.init()?;
  45. self.listen_trash_can_event();
  46. Ok(())
  47. }
  48. #[tracing::instrument(level = "debug", skip(self, params), fields(name = %params.name), err)]
  49. pub(crate) async fn create_view_from_params(&self, params: CreateViewParams) -> Result<View, WorkspaceError> {
  50. let view = self.create_view_on_server(params.clone()).await?;
  51. self.create_view(view).await
  52. }
  53. pub(crate) async fn create_view(&self, view: View) -> Result<View, WorkspaceError> {
  54. let conn = &*self.database.db_connection()?;
  55. let trash_can = self.trash_can.clone();
  56. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  57. let _ = self.save_view(view.clone(), conn)?;
  58. let _ = notify_views_changed(&view.belong_to_id, trash_can, &conn)?;
  59. Ok(())
  60. })?;
  61. Ok(view)
  62. }
  63. pub(crate) fn save_view(&self, view: View, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
  64. let view_table = ViewTable::new(view);
  65. let _ = ViewTableSql::create_view(view_table, conn)?;
  66. Ok(())
  67. }
  68. #[tracing::instrument(skip(self, params), fields(view_id = %params.view_id), err)]
  69. pub(crate) async fn read_view(&self, params: ViewIdentifier) -> Result<View, WorkspaceError> {
  70. let conn = self.database.db_connection()?;
  71. let view_table = ViewTableSql::read_view(&params.view_id, &*conn)?;
  72. let trash_ids = self.trash_can.trash_ids(&conn)?;
  73. if trash_ids.contains(&view_table.id) {
  74. return Err(WorkspaceError::record_not_found());
  75. }
  76. let view: View = view_table.into();
  77. let _ = self.read_view_on_server(params);
  78. Ok(view)
  79. }
  80. pub(crate) fn read_view_tables(&self, ids: Vec<String>) -> Result<Vec<ViewTable>, WorkspaceError> {
  81. let conn = &*self.database.db_connection()?;
  82. let mut view_tables = vec![];
  83. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  84. for view_id in ids {
  85. view_tables.push(ViewTableSql::read_view(&view_id, conn)?);
  86. }
  87. Ok(())
  88. })?;
  89. Ok(view_tables)
  90. }
  91. #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
  92. pub(crate) async fn open_view(&self, params: DocIdentifier) -> Result<DocDelta, WorkspaceError> {
  93. let doc_id = params.doc_id.clone();
  94. let edit_context = self.document.open(params).await?;
  95. KV::set_str(LATEST_VIEW_ID, doc_id);
  96. Ok(edit_context.delta().await.map_err(internal_error)?)
  97. }
  98. #[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.doc_id), err)]
  99. pub(crate) async fn close_view(&self, params: DocIdentifier) -> Result<(), WorkspaceError> {
  100. let _ = self.document.close(params).await?;
  101. Ok(())
  102. }
  103. #[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.doc_id), err)]
  104. pub(crate) async fn delete_view(&self, params: DocIdentifier) -> Result<(), WorkspaceError> {
  105. if let Some(view_id) = KV::get_str(LATEST_VIEW_ID) {
  106. if view_id == params.doc_id {
  107. let _ = KV::remove(LATEST_VIEW_ID);
  108. }
  109. }
  110. let _ = self.document.close(params).await?;
  111. Ok(())
  112. }
  113. #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
  114. pub(crate) async fn duplicate_view(&self, params: DocIdentifier) -> Result<(), WorkspaceError> {
  115. let view: View = ViewTableSql::read_view(&params.doc_id, &*self.database.db_connection()?)?.into();
  116. let delta_data = self
  117. .document
  118. .read_document_data(params, self.database.db_pool()?)
  119. .await?;
  120. let duplicate_params = CreateViewParams {
  121. belong_to_id: view.belong_to_id.clone(),
  122. name: format!("{} (copy)", &view.name),
  123. desc: view.desc.clone(),
  124. thumbnail: "".to_owned(),
  125. view_type: view.view_type.clone(),
  126. data: delta_data.data,
  127. };
  128. let _ = self.create_view_from_params(duplicate_params).await?;
  129. Ok(())
  130. }
  131. #[tracing::instrument(level = "debug", skip(self, params), err)]
  132. pub(crate) async fn export_doc(&self, params: ExportParams) -> Result<ExportData, WorkspaceError> {
  133. let doc_identifier: DocIdentifier = params.doc_id.into();
  134. let doc = self
  135. .document
  136. .read_document_data(doc_identifier, self.database.db_pool()?)
  137. .await?;
  138. Ok(ExportData {
  139. data: doc.data,
  140. export_type: params.export_type,
  141. })
  142. }
  143. // belong_to_id will be the app_id or view_id.
  144. #[tracing::instrument(level = "debug", skip(self), err)]
  145. pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result<RepeatedView, WorkspaceError> {
  146. // TODO: read from server
  147. let conn = self.database.db_connection()?;
  148. let repeated_view = read_local_belonging_view(belong_to_id, self.trash_can.clone(), &conn)?;
  149. Ok(repeated_view)
  150. }
  151. #[tracing::instrument(level = "debug", skip(self, params), err)]
  152. pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result<View, WorkspaceError> {
  153. let conn = &*self.database.db_connection()?;
  154. let changeset = ViewTableChangeset::new(params.clone());
  155. let view_id = changeset.id.clone();
  156. let updated_view = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  157. let _ = ViewTableSql::update_view(changeset, conn)?;
  158. let view: View = ViewTableSql::read_view(&view_id, conn)?.into();
  159. Ok(view)
  160. })?;
  161. send_dart_notification(&view_id, WorkspaceNotification::ViewUpdated)
  162. .payload(updated_view.clone())
  163. .send();
  164. //
  165. let _ = notify_views_changed(&updated_view.belong_to_id, self.trash_can.clone(), conn)?;
  166. let _ = self.update_view_on_server(params);
  167. Ok(updated_view)
  168. }
  169. pub(crate) async fn apply_doc_delta(&self, params: DocDelta) -> Result<DocDelta, WorkspaceError> {
  170. let doc = self.document.apply_doc_delta(params).await?;
  171. Ok(doc)
  172. }
  173. pub(crate) fn latest_visit_view(&self) -> WorkspaceResult<Option<View>> {
  174. match KV::get_str(LATEST_VIEW_ID) {
  175. None => Ok(None),
  176. Some(view_id) => {
  177. let conn = self.database.db_connection()?;
  178. let view_table = ViewTableSql::read_view(&view_id, &*conn)?;
  179. Ok(Some(view_table.into()))
  180. },
  181. }
  182. }
  183. pub(crate) fn set_latest_view(&self, view: &View) { KV::set_str(LATEST_VIEW_ID, view.id.clone()); }
  184. }
  185. impl ViewController {
  186. #[tracing::instrument(skip(self), err)]
  187. async fn create_view_on_server(&self, params: CreateViewParams) -> Result<View, WorkspaceError> {
  188. let token = self.user.token()?;
  189. let view = self.server.create_view(&token, params).await?;
  190. Ok(view)
  191. }
  192. #[tracing::instrument(skip(self), err)]
  193. fn update_view_on_server(&self, params: UpdateViewParams) -> Result<(), WorkspaceError> {
  194. let token = self.user.token()?;
  195. let server = self.server.clone();
  196. tokio::spawn(async move {
  197. match server.update_view(&token, params).await {
  198. Ok(_) => {},
  199. Err(e) => {
  200. // TODO: retry?
  201. log::error!("Update view failed: {:?}", e);
  202. },
  203. }
  204. });
  205. Ok(())
  206. }
  207. #[tracing::instrument(skip(self), err)]
  208. fn read_view_on_server(&self, params: ViewIdentifier) -> Result<(), WorkspaceError> {
  209. let token = self.user.token()?;
  210. let server = self.server.clone();
  211. let pool = self.database.db_pool()?;
  212. // Opti: retry?
  213. tokio::spawn(async move {
  214. match server.read_view(&token, params).await {
  215. Ok(Some(view)) => match pool.get() {
  216. Ok(conn) => {
  217. let view_table = ViewTable::new(view.clone());
  218. let result = ViewTableSql::create_view(view_table, &conn);
  219. match result {
  220. Ok(_) => {
  221. send_dart_notification(&view.id, WorkspaceNotification::ViewUpdated)
  222. .payload(view.clone())
  223. .send();
  224. },
  225. Err(e) => log::error!("Save view failed: {:?}", e),
  226. }
  227. },
  228. Err(e) => log::error!("Require db connection failed: {:?}", e),
  229. },
  230. Ok(None) => {},
  231. Err(e) => log::error!("Read view failed: {:?}", e),
  232. }
  233. });
  234. Ok(())
  235. }
  236. fn listen_trash_can_event(&self) {
  237. let mut rx = self.trash_can.subscribe();
  238. let database = self.database.clone();
  239. let document = self.document.clone();
  240. let trash_can = self.trash_can.clone();
  241. let _ = tokio::spawn(async move {
  242. loop {
  243. let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
  244. match result {
  245. Ok(event) => event.select(TrashType::View),
  246. Err(_e) => None,
  247. }
  248. }));
  249. match stream.next().await {
  250. Some(event) => {
  251. handle_trash_event(database.clone(), document.clone(), trash_can.clone(), event).await
  252. },
  253. None => {},
  254. }
  255. }
  256. });
  257. }
  258. }
  259. #[tracing::instrument(level = "trace", skip(database, document, trash_can))]
  260. async fn handle_trash_event(
  261. database: Arc<dyn WorkspaceDatabase>,
  262. document: Arc<FlowyDocument>,
  263. trash_can: Arc<TrashCan>,
  264. event: TrashEvent,
  265. ) {
  266. let db_result = database.db_connection();
  267. match event {
  268. TrashEvent::NewTrash(identifiers, ret) => {
  269. let result = || {
  270. let conn = &*db_result?;
  271. let view_tables = get_view_table_from(identifiers, conn)?;
  272. for view_table in view_tables {
  273. let _ = notify_views_changed(&view_table.belong_to_id, trash_can.clone(), conn)?;
  274. notify_dart(view_table, WorkspaceNotification::ViewDeleted);
  275. }
  276. Ok::<(), WorkspaceError>(())
  277. };
  278. let _ = ret.send(result()).await;
  279. },
  280. TrashEvent::Putback(identifiers, ret) => {
  281. let result = || {
  282. let conn = &*db_result?;
  283. let view_tables = get_view_table_from(identifiers, conn)?;
  284. for view_table in view_tables {
  285. let _ = notify_views_changed(&view_table.belong_to_id, trash_can.clone(), conn)?;
  286. notify_dart(view_table, WorkspaceNotification::ViewRestored);
  287. }
  288. Ok::<(), WorkspaceError>(())
  289. };
  290. let _ = ret.send(result()).await;
  291. },
  292. TrashEvent::Delete(identifiers, ret) => {
  293. let result = || {
  294. let conn = &*db_result?;
  295. let _ = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  296. let mut notify_ids = HashSet::new();
  297. for identifier in identifiers.items {
  298. let view_table = ViewTableSql::read_view(&identifier.id, conn)?;
  299. let _ = ViewTableSql::delete_view(&identifier.id, conn)?;
  300. let _ = document.delete(identifier.id.clone().into())?;
  301. notify_ids.insert(view_table.belong_to_id);
  302. }
  303. for notify_id in notify_ids {
  304. let _ = notify_views_changed(&notify_id, trash_can.clone(), conn)?;
  305. }
  306. Ok(())
  307. })?;
  308. Ok::<(), WorkspaceError>(())
  309. };
  310. let _ = ret.send(result()).await;
  311. },
  312. }
  313. }
  314. fn get_view_table_from(
  315. identifiers: TrashIdentifiers,
  316. conn: &SqliteConnection,
  317. ) -> Result<Vec<ViewTable>, WorkspaceError> {
  318. let mut view_tables = vec![];
  319. let _ = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  320. for identifier in identifiers.items {
  321. let view_table = ViewTableSql::read_view(&identifier.id, conn)?;
  322. view_tables.push(view_table);
  323. }
  324. Ok(())
  325. })?;
  326. Ok(view_tables)
  327. }
  328. fn notify_dart(view_table: ViewTable, notification: WorkspaceNotification) {
  329. let view: View = view_table.into();
  330. send_dart_notification(&view.id, notification).payload(view).send();
  331. }
  332. #[tracing::instrument(skip(belong_to_id, trash_can, conn), fields(view_count), err)]
  333. fn notify_views_changed(belong_to_id: &str, trash_can: Arc<TrashCan>, conn: &SqliteConnection) -> WorkspaceResult<()> {
  334. let repeated_view = read_local_belonging_view(belong_to_id, trash_can.clone(), conn)?;
  335. tracing::Span::current().record("view_count", &format!("{}", repeated_view.len()).as_str());
  336. send_dart_notification(&belong_to_id, WorkspaceNotification::AppViewsChanged)
  337. .payload(repeated_view)
  338. .send();
  339. Ok(())
  340. }
  341. fn read_local_belonging_view(
  342. belong_to_id: &str,
  343. trash_can: Arc<TrashCan>,
  344. conn: &SqliteConnection,
  345. ) -> WorkspaceResult<RepeatedView> {
  346. let mut view_tables = ViewTableSql::read_views(belong_to_id, conn)?;
  347. let trash_ids = trash_can.trash_ids(conn)?;
  348. view_tables.retain(|view_table| !trash_ids.contains(&view_table.id));
  349. let views = view_tables
  350. .into_iter()
  351. .map(|view_table| view_table.into())
  352. .collect::<Vec<View>>();
  353. Ok(RepeatedView { items: views })
  354. }