view_controller.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405
  1. use std::{collections::HashSet, sync::Arc};
  2. use futures::{FutureExt, StreamExt};
  3. use flowy_database::SqliteConnection;
  4. use flowy_document::{
  5. entities::doc::{DocDelta, DocIdentifier},
  6. module::FlowyDocument,
  7. };
  8. use crate::{
  9. entities::{
  10. trash::{TrashIdentifiers, TrashType},
  11. view::{CreateViewParams, RepeatedView, UpdateViewParams, View, ViewIdentifier},
  12. },
  13. errors::{internal_error, WorkspaceError, WorkspaceResult},
  14. module::{WorkspaceDatabase, WorkspaceUser},
  15. notify::{send_dart_notification, WorkspaceNotification},
  16. services::{server::Server, TrashCan, TrashEvent},
  17. sql_tables::view::{ViewTable, ViewTableChangeset, ViewTableSql},
  18. };
  19. use flowy_infra::kv::KV;
  20. use flowy_workspace_infra::entities::share::{ExportData, ExportParams};
  21. const LATEST_VIEW_ID: &str = "latest_view_id";
  22. pub(crate) struct ViewController {
  23. user: Arc<dyn WorkspaceUser>,
  24. server: Server,
  25. database: Arc<dyn WorkspaceDatabase>,
  26. trash_can: Arc<TrashCan>,
  27. document: Arc<FlowyDocument>,
  28. }
  29. impl ViewController {
  30. pub(crate) fn new(
  31. user: Arc<dyn WorkspaceUser>,
  32. database: Arc<dyn WorkspaceDatabase>,
  33. server: Server,
  34. trash_can: Arc<TrashCan>,
  35. document: Arc<FlowyDocument>,
  36. ) -> Self {
  37. Self {
  38. user,
  39. server,
  40. database,
  41. trash_can,
  42. document,
  43. }
  44. }
  45. pub(crate) fn init(&self) -> Result<(), WorkspaceError> {
  46. let _ = self.document.init()?;
  47. self.listen_trash_can_event();
  48. Ok(())
  49. }
  50. #[tracing::instrument(level = "debug", skip(self, params), fields(name = %params.name), err)]
  51. pub(crate) async fn create_view_from_params(&self, params: CreateViewParams) -> Result<View, WorkspaceError> {
  52. let view = self.create_view_on_server(params.clone()).await?;
  53. self.create_view(view).await
  54. }
  55. pub(crate) async fn create_view(&self, view: View) -> Result<View, WorkspaceError> {
  56. let conn = &*self.database.db_connection()?;
  57. let trash_can = self.trash_can.clone();
  58. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  59. let _ = self.save_view(view.clone(), conn)?;
  60. let _ = notify_views_changed(&view.belong_to_id, trash_can, &conn)?;
  61. Ok(())
  62. })?;
  63. Ok(view)
  64. }
  65. pub(crate) fn save_view(&self, view: View, conn: &SqliteConnection) -> Result<(), WorkspaceError> {
  66. let view_table = ViewTable::new(view);
  67. let _ = ViewTableSql::create_view(view_table, conn)?;
  68. Ok(())
  69. }
  70. #[tracing::instrument(skip(self, params), fields(view_id = %params.view_id), err)]
  71. pub(crate) async fn read_view(&self, params: ViewIdentifier) -> Result<View, WorkspaceError> {
  72. let conn = self.database.db_connection()?;
  73. let view_table = ViewTableSql::read_view(&params.view_id, &*conn)?;
  74. let trash_ids = self.trash_can.trash_ids(&conn)?;
  75. if trash_ids.contains(&view_table.id) {
  76. return Err(WorkspaceError::record_not_found());
  77. }
  78. let view: View = view_table.into();
  79. let _ = self.read_view_on_server(params);
  80. Ok(view)
  81. }
  82. pub(crate) fn read_view_tables(&self, ids: Vec<String>) -> Result<Vec<ViewTable>, WorkspaceError> {
  83. let conn = &*self.database.db_connection()?;
  84. let mut view_tables = vec![];
  85. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  86. for view_id in ids {
  87. view_tables.push(ViewTableSql::read_view(&view_id, conn)?);
  88. }
  89. Ok(())
  90. })?;
  91. Ok(view_tables)
  92. }
  93. #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
  94. pub(crate) async fn open_view(&self, params: DocIdentifier) -> Result<DocDelta, WorkspaceError> {
  95. let doc_id = params.doc_id.clone();
  96. let edit_context = self.document.open(params, self.database.db_pool()?).await?;
  97. KV::set_str(LATEST_VIEW_ID, doc_id);
  98. Ok(edit_context.delta().await.map_err(internal_error)?)
  99. }
  100. #[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.doc_id), err)]
  101. pub(crate) async fn close_view(&self, params: DocIdentifier) -> Result<(), WorkspaceError> {
  102. let _ = self.document.close(params).await?;
  103. Ok(())
  104. }
  105. #[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.doc_id), err)]
  106. pub(crate) async fn delete_view(&self, params: DocIdentifier) -> Result<(), WorkspaceError> {
  107. if let Some(view_id) = KV::get_str(LATEST_VIEW_ID) {
  108. if view_id == params.doc_id {
  109. let _ = KV::remove(LATEST_VIEW_ID);
  110. }
  111. }
  112. let _ = self.document.close(params).await?;
  113. Ok(())
  114. }
  115. #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
  116. pub(crate) async fn duplicate_view(&self, params: DocIdentifier) -> Result<(), WorkspaceError> {
  117. let view: View = ViewTableSql::read_view(&params.doc_id, &*self.database.db_connection()?)?.into();
  118. let delta_data = self
  119. .document
  120. .read_document_data(params, self.database.db_pool()?)
  121. .await?;
  122. let duplicate_params = CreateViewParams {
  123. belong_to_id: view.belong_to_id.clone(),
  124. name: format!("{}_copy", &view.name),
  125. desc: view.desc.clone(),
  126. thumbnail: "".to_owned(),
  127. view_type: view.view_type.clone(),
  128. data: delta_data.data,
  129. };
  130. let _ = self.create_view_from_params(duplicate_params).await?;
  131. Ok(())
  132. }
  133. #[tracing::instrument(level = "debug", skip(self, params), err)]
  134. pub(crate) async fn export_doc(&self, params: ExportParams) -> Result<ExportData, WorkspaceError> {
  135. let doc_identifier: DocIdentifier = params.doc_id.into();
  136. let doc = self
  137. .document
  138. .read_document_data(doc_identifier, self.database.db_pool()?)
  139. .await?;
  140. Ok(ExportData {
  141. data: doc.data,
  142. export_type: params.export_type,
  143. })
  144. }
  145. // belong_to_id will be the app_id or view_id.
  146. #[tracing::instrument(level = "debug", skip(self), err)]
  147. pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result<RepeatedView, WorkspaceError> {
  148. // TODO: read from server
  149. let conn = self.database.db_connection()?;
  150. let repeated_view = read_local_belonging_view(belong_to_id, self.trash_can.clone(), &conn)?;
  151. Ok(repeated_view)
  152. }
  153. #[tracing::instrument(level = "debug", skip(self, params), err)]
  154. pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result<View, WorkspaceError> {
  155. let conn = &*self.database.db_connection()?;
  156. let changeset = ViewTableChangeset::new(params.clone());
  157. let view_id = changeset.id.clone();
  158. let updated_view = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  159. let _ = ViewTableSql::update_view(changeset, conn)?;
  160. let view: View = ViewTableSql::read_view(&view_id, conn)?.into();
  161. Ok(view)
  162. })?;
  163. send_dart_notification(&view_id, WorkspaceNotification::ViewUpdated)
  164. .payload(updated_view.clone())
  165. .send();
  166. //
  167. let _ = notify_views_changed(&updated_view.belong_to_id, self.trash_can.clone(), conn)?;
  168. let _ = self.update_view_on_server(params);
  169. Ok(updated_view)
  170. }
  171. pub(crate) async fn apply_doc_delta(&self, params: DocDelta) -> Result<DocDelta, WorkspaceError> {
  172. let doc = self.document.apply_doc_delta(params).await?;
  173. Ok(doc)
  174. }
  175. pub(crate) fn latest_visit_view(&self) -> WorkspaceResult<Option<View>> {
  176. match KV::get_str(LATEST_VIEW_ID) {
  177. None => Ok(None),
  178. Some(view_id) => {
  179. let conn = self.database.db_connection()?;
  180. let view_table = ViewTableSql::read_view(&view_id, &*conn)?;
  181. Ok(Some(view_table.into()))
  182. },
  183. }
  184. }
  185. pub(crate) fn set_latest_view(&self, view: &View) { KV::set_str(LATEST_VIEW_ID, view.id.clone()); }
  186. }
  187. impl ViewController {
  188. #[tracing::instrument(skip(self), err)]
  189. async fn create_view_on_server(&self, params: CreateViewParams) -> Result<View, WorkspaceError> {
  190. let token = self.user.token()?;
  191. let view = self.server.create_view(&token, params).await?;
  192. Ok(view)
  193. }
  194. #[tracing::instrument(skip(self), err)]
  195. fn update_view_on_server(&self, params: UpdateViewParams) -> Result<(), WorkspaceError> {
  196. let token = self.user.token()?;
  197. let server = self.server.clone();
  198. tokio::spawn(async move {
  199. match server.update_view(&token, params).await {
  200. Ok(_) => {},
  201. Err(e) => {
  202. // TODO: retry?
  203. log::error!("Update view failed: {:?}", e);
  204. },
  205. }
  206. });
  207. Ok(())
  208. }
  209. #[tracing::instrument(skip(self), err)]
  210. fn read_view_on_server(&self, params: ViewIdentifier) -> Result<(), WorkspaceError> {
  211. let token = self.user.token()?;
  212. let server = self.server.clone();
  213. let pool = self.database.db_pool()?;
  214. // Opti: retry?
  215. tokio::spawn(async move {
  216. match server.read_view(&token, params).await {
  217. Ok(Some(view)) => match pool.get() {
  218. Ok(conn) => {
  219. let view_table = ViewTable::new(view.clone());
  220. let result = ViewTableSql::create_view(view_table, &conn);
  221. match result {
  222. Ok(_) => {
  223. send_dart_notification(&view.id, WorkspaceNotification::ViewUpdated)
  224. .payload(view.clone())
  225. .send();
  226. },
  227. Err(e) => log::error!("Save view failed: {:?}", e),
  228. }
  229. },
  230. Err(e) => log::error!("Require db connection failed: {:?}", e),
  231. },
  232. Ok(None) => {},
  233. Err(e) => log::error!("Read view failed: {:?}", e),
  234. }
  235. });
  236. Ok(())
  237. }
  238. fn listen_trash_can_event(&self) {
  239. let mut rx = self.trash_can.subscribe();
  240. let database = self.database.clone();
  241. let document = self.document.clone();
  242. let trash_can = self.trash_can.clone();
  243. let _ = tokio::spawn(async move {
  244. loop {
  245. let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
  246. match result {
  247. Ok(event) => event.select(TrashType::View),
  248. Err(_e) => None,
  249. }
  250. }));
  251. match stream.next().await {
  252. Some(event) => {
  253. handle_trash_event(database.clone(), document.clone(), trash_can.clone(), event).await
  254. },
  255. None => {},
  256. }
  257. }
  258. });
  259. }
  260. }
  261. #[tracing::instrument(level = "trace", skip(database, document, trash_can))]
  262. async fn handle_trash_event(
  263. database: Arc<dyn WorkspaceDatabase>,
  264. document: Arc<FlowyDocument>,
  265. trash_can: Arc<TrashCan>,
  266. event: TrashEvent,
  267. ) {
  268. let db_result = database.db_connection();
  269. match event {
  270. TrashEvent::NewTrash(identifiers, ret) => {
  271. let result = || {
  272. let conn = &*db_result?;
  273. let view_tables = get_view_table_from(identifiers, conn)?;
  274. for view_table in view_tables {
  275. let _ = notify_views_changed(&view_table.belong_to_id, trash_can.clone(), conn)?;
  276. notify_dart(view_table, WorkspaceNotification::ViewDeleted);
  277. }
  278. Ok::<(), WorkspaceError>(())
  279. };
  280. let _ = ret.send(result()).await;
  281. },
  282. TrashEvent::Putback(identifiers, ret) => {
  283. let result = || {
  284. let conn = &*db_result?;
  285. let view_tables = get_view_table_from(identifiers, conn)?;
  286. for view_table in view_tables {
  287. let _ = notify_views_changed(&view_table.belong_to_id, trash_can.clone(), conn)?;
  288. notify_dart(view_table, WorkspaceNotification::ViewRestored);
  289. }
  290. Ok::<(), WorkspaceError>(())
  291. };
  292. let _ = ret.send(result()).await;
  293. },
  294. TrashEvent::Delete(identifiers, ret) => {
  295. let result = || {
  296. let conn = &*db_result?;
  297. let _ = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  298. let mut notify_ids = HashSet::new();
  299. for identifier in identifiers.items {
  300. let view_table = ViewTableSql::read_view(&identifier.id, conn)?;
  301. let _ = ViewTableSql::delete_view(&identifier.id, conn)?;
  302. let _ = document.delete(identifier.id.clone().into())?;
  303. notify_ids.insert(view_table.belong_to_id);
  304. }
  305. for notify_id in notify_ids {
  306. let _ = notify_views_changed(&notify_id, trash_can.clone(), conn)?;
  307. }
  308. Ok(())
  309. })?;
  310. Ok::<(), WorkspaceError>(())
  311. };
  312. let _ = ret.send(result()).await;
  313. },
  314. }
  315. }
  316. fn get_view_table_from(
  317. identifiers: TrashIdentifiers,
  318. conn: &SqliteConnection,
  319. ) -> Result<Vec<ViewTable>, WorkspaceError> {
  320. let mut view_tables = vec![];
  321. let _ = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  322. for identifier in identifiers.items {
  323. let view_table = ViewTableSql::read_view(&identifier.id, conn)?;
  324. view_tables.push(view_table);
  325. }
  326. Ok(())
  327. })?;
  328. Ok(view_tables)
  329. }
  330. fn notify_dart(view_table: ViewTable, notification: WorkspaceNotification) {
  331. let view: View = view_table.into();
  332. send_dart_notification(&view.id, notification).payload(view).send();
  333. }
  334. #[tracing::instrument(skip(belong_to_id, trash_can, conn), fields(view_count), err)]
  335. fn notify_views_changed(belong_to_id: &str, trash_can: Arc<TrashCan>, conn: &SqliteConnection) -> WorkspaceResult<()> {
  336. let repeated_view = read_local_belonging_view(belong_to_id, trash_can.clone(), conn)?;
  337. tracing::Span::current().record("view_count", &format!("{}", repeated_view.len()).as_str());
  338. send_dart_notification(&belong_to_id, WorkspaceNotification::AppViewsChanged)
  339. .payload(repeated_view)
  340. .send();
  341. Ok(())
  342. }
  343. fn read_local_belonging_view(
  344. belong_to_id: &str,
  345. trash_can: Arc<TrashCan>,
  346. conn: &SqliteConnection,
  347. ) -> WorkspaceResult<RepeatedView> {
  348. let mut view_tables = ViewTableSql::read_views(belong_to_id, conn)?;
  349. let trash_ids = trash_can.trash_ids(conn)?;
  350. view_tables.retain(|view_table| !trash_ids.contains(&view_table.id));
  351. let views = view_tables
  352. .into_iter()
  353. .map(|view_table| view_table.into())
  354. .collect::<Vec<View>>();
  355. Ok(RepeatedView { items: views })
  356. }