controller.rs 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. use flowy_collaboration::entities::doc::{DocumentDelta, DocumentId};
  2. use flowy_database::SqliteConnection;
  3. use futures::{FutureExt, StreamExt};
  4. use std::{collections::HashSet, sync::Arc};
  5. use crate::{
  6. entities::{
  7. trash::{RepeatedTrashId, TrashType},
  8. view::{CreateViewParams, RepeatedView, UpdateViewParams, View, ViewId},
  9. },
  10. errors::{FlowyError, FlowyResult},
  11. module::{WorkspaceDatabase, WorkspaceUser},
  12. notify::{send_dart_notification, WorkspaceNotification},
  13. services::{
  14. server::Server,
  15. view::sql::{ViewTable, ViewTableChangeset, ViewTableSql},
  16. TrashController,
  17. TrashEvent,
  18. },
  19. };
  20. use flowy_core_data_model::entities::share::{ExportData, ExportParams};
  21. use flowy_database::kv::KV;
  22. use flowy_document::context::DocumentContext;
  23. use lib_infra::uuid_string;
  24. const LATEST_VIEW_ID: &str = "latest_view_id";
  25. pub(crate) struct ViewController {
  26. user: Arc<dyn WorkspaceUser>,
  27. server: Server,
  28. database: Arc<dyn WorkspaceDatabase>,
  29. trash_controller: Arc<TrashController>,
  30. document_ctx: Arc<DocumentContext>,
  31. }
  32. impl ViewController {
  33. pub(crate) fn new(
  34. user: Arc<dyn WorkspaceUser>,
  35. database: Arc<dyn WorkspaceDatabase>,
  36. server: Server,
  37. trash_can: Arc<TrashController>,
  38. document_ctx: Arc<DocumentContext>,
  39. ) -> Self {
  40. Self {
  41. user,
  42. server,
  43. database,
  44. trash_controller: trash_can,
  45. document_ctx,
  46. }
  47. }
  48. pub(crate) fn init(&self) -> Result<(), FlowyError> {
  49. let _ = self.document_ctx.init()?;
  50. self.listen_trash_can_event();
  51. Ok(())
  52. }
  53. #[tracing::instrument(level = "debug", skip(self, params), fields(name = %params.name), err)]
  54. pub(crate) async fn create_view_from_params(&self, params: CreateViewParams) -> Result<View, FlowyError> {
  55. let view = self.create_view_on_server(params).await?;
  56. let view = self.create_view_on_local(view).await?;
  57. Ok(view)
  58. }
  59. pub(crate) async fn create_view_on_local(&self, view: View) -> Result<View, FlowyError> {
  60. let conn = &*self.database.db_connection()?;
  61. let trash_can = self.trash_controller.clone();
  62. conn.immediate_transaction::<_, FlowyError, _>(|| {
  63. let _ = self.save_view(view.clone(), conn)?;
  64. let _ = notify_views_changed(&view.belong_to_id, trash_can, &conn)?;
  65. Ok(())
  66. })?;
  67. Ok(view)
  68. }
  69. pub(crate) fn save_view(&self, view: View, conn: &SqliteConnection) -> Result<(), FlowyError> {
  70. let view_table = ViewTable::new(view);
  71. let _ = ViewTableSql::create_view(view_table, conn)?;
  72. Ok(())
  73. }
  74. #[tracing::instrument(skip(self, params), fields(view_id = %params.view_id), err)]
  75. pub(crate) async fn read_view(&self, params: ViewId) -> Result<View, FlowyError> {
  76. let conn = self.database.db_connection()?;
  77. let view_table = ViewTableSql::read_view(&params.view_id, &*conn)?;
  78. let trash_ids = self.trash_controller.read_trash_ids(&conn)?;
  79. if trash_ids.contains(&view_table.id) {
  80. return Err(FlowyError::record_not_found());
  81. }
  82. let view: View = view_table.into();
  83. let _ = self.read_view_on_server(params);
  84. Ok(view)
  85. }
  86. pub(crate) fn read_view_tables(&self, ids: Vec<String>) -> Result<Vec<ViewTable>, FlowyError> {
  87. let conn = &*self.database.db_connection()?;
  88. let mut view_tables = vec![];
  89. conn.immediate_transaction::<_, FlowyError, _>(|| {
  90. for view_id in ids {
  91. view_tables.push(ViewTableSql::read_view(&view_id, conn)?);
  92. }
  93. Ok(())
  94. })?;
  95. Ok(view_tables)
  96. }
  97. #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
  98. pub(crate) async fn open_view(&self, params: DocumentId) -> Result<DocumentDelta, FlowyError> {
  99. let doc_id = params.doc_id.clone();
  100. let db_pool = self.database.db_pool()?;
  101. let editor = self.document_ctx.controller.open(&params.doc_id, db_pool).await?;
  102. KV::set_str(LATEST_VIEW_ID, doc_id.clone());
  103. let document_json = editor.document_json().await?;
  104. Ok(DocumentDelta {
  105. doc_id,
  106. delta_json: document_json,
  107. })
  108. }
  109. #[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.doc_id), err)]
  110. pub(crate) async fn close_view(&self, params: DocumentId) -> Result<(), FlowyError> {
  111. let _ = self.document_ctx.controller.close(&params.doc_id)?;
  112. Ok(())
  113. }
  114. #[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.doc_id), err)]
  115. pub(crate) async fn delete_view(&self, params: DocumentId) -> Result<(), FlowyError> {
  116. if let Some(view_id) = KV::get_str(LATEST_VIEW_ID) {
  117. if view_id == params.doc_id {
  118. let _ = KV::remove(LATEST_VIEW_ID);
  119. }
  120. }
  121. let _ = self.document_ctx.controller.close(&params.doc_id)?;
  122. Ok(())
  123. }
  124. #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
  125. pub(crate) async fn duplicate_view(&self, params: DocumentId) -> Result<(), FlowyError> {
  126. let view: View = ViewTableSql::read_view(&params.doc_id, &*self.database.db_connection()?)?.into();
  127. let editor = self
  128. .document_ctx
  129. .controller
  130. .open(&params.doc_id, self.database.db_pool()?)
  131. .await?;
  132. let document_json = editor.document_json().await?;
  133. let duplicate_params = CreateViewParams {
  134. belong_to_id: view.belong_to_id.clone(),
  135. name: format!("{} (copy)", &view.name),
  136. desc: view.desc.clone(),
  137. thumbnail: "".to_owned(),
  138. view_type: view.view_type.clone(),
  139. view_data: document_json,
  140. view_id: uuid_string(),
  141. };
  142. let _ = self.create_view_from_params(duplicate_params).await?;
  143. Ok(())
  144. }
  145. #[tracing::instrument(level = "debug", skip(self, params), err)]
  146. pub(crate) async fn export_doc(&self, params: ExportParams) -> Result<ExportData, FlowyError> {
  147. let editor = self
  148. .document_ctx
  149. .controller
  150. .open(&params.doc_id, self.database.db_pool()?)
  151. .await?;
  152. let delta_json = editor.document_json().await?;
  153. Ok(ExportData {
  154. data: delta_json,
  155. export_type: params.export_type,
  156. })
  157. }
  158. // belong_to_id will be the app_id or view_id.
  159. #[tracing::instrument(level = "debug", skip(self), err)]
  160. pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result<RepeatedView, FlowyError> {
  161. // TODO: read from server
  162. let conn = self.database.db_connection()?;
  163. let repeated_view = read_belonging_views_on_local(belong_to_id, self.trash_controller.clone(), &conn)?;
  164. Ok(repeated_view)
  165. }
  166. #[tracing::instrument(level = "debug", skip(self, params), err)]
  167. pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result<View, FlowyError> {
  168. let conn = &*self.database.db_connection()?;
  169. let changeset = ViewTableChangeset::new(params.clone());
  170. let view_id = changeset.id.clone();
  171. let updated_view = conn.immediate_transaction::<_, FlowyError, _>(|| {
  172. let _ = ViewTableSql::update_view(changeset, conn)?;
  173. let view: View = ViewTableSql::read_view(&view_id, conn)?.into();
  174. Ok(view)
  175. })?;
  176. send_dart_notification(&view_id, WorkspaceNotification::ViewUpdated)
  177. .payload(updated_view.clone())
  178. .send();
  179. //
  180. let _ = notify_views_changed(&updated_view.belong_to_id, self.trash_controller.clone(), conn)?;
  181. let _ = self.update_view_on_server(params);
  182. Ok(updated_view)
  183. }
  184. pub(crate) async fn receive_document_delta(&self, params: DocumentDelta) -> Result<DocumentDelta, FlowyError> {
  185. let db_pool = self.document_ctx.user.db_pool()?;
  186. let doc = self
  187. .document_ctx
  188. .controller
  189. .apply_document_delta(params, db_pool)
  190. .await?;
  191. Ok(doc)
  192. }
  193. pub(crate) fn latest_visit_view(&self) -> FlowyResult<Option<View>> {
  194. match KV::get_str(LATEST_VIEW_ID) {
  195. None => Ok(None),
  196. Some(view_id) => {
  197. let conn = self.database.db_connection()?;
  198. let view_table = ViewTableSql::read_view(&view_id, &*conn)?;
  199. Ok(Some(view_table.into()))
  200. },
  201. }
  202. }
  203. pub(crate) fn set_latest_view(&self, view: &View) { KV::set_str(LATEST_VIEW_ID, view.id.clone()); }
  204. }
  205. impl ViewController {
  206. #[tracing::instrument(skip(self), err)]
  207. async fn create_view_on_server(&self, params: CreateViewParams) -> Result<View, FlowyError> {
  208. let token = self.user.token()?;
  209. let view = self.server.create_view(&token, params).await?;
  210. Ok(view)
  211. }
  212. #[tracing::instrument(skip(self), err)]
  213. fn update_view_on_server(&self, params: UpdateViewParams) -> Result<(), FlowyError> {
  214. let token = self.user.token()?;
  215. let server = self.server.clone();
  216. tokio::spawn(async move {
  217. match server.update_view(&token, params).await {
  218. Ok(_) => {},
  219. Err(e) => {
  220. // TODO: retry?
  221. log::error!("Update view failed: {:?}", e);
  222. },
  223. }
  224. });
  225. Ok(())
  226. }
  227. #[tracing::instrument(skip(self), err)]
  228. fn read_view_on_server(&self, params: ViewId) -> Result<(), FlowyError> {
  229. let token = self.user.token()?;
  230. let server = self.server.clone();
  231. let pool = self.database.db_pool()?;
  232. // Opti: retry?
  233. tokio::spawn(async move {
  234. match server.read_view(&token, params).await {
  235. Ok(Some(view)) => match pool.get() {
  236. Ok(conn) => {
  237. let view_table = ViewTable::new(view.clone());
  238. let result = ViewTableSql::create_view(view_table, &conn);
  239. match result {
  240. Ok(_) => {
  241. send_dart_notification(&view.id, WorkspaceNotification::ViewUpdated)
  242. .payload(view.clone())
  243. .send();
  244. },
  245. Err(e) => log::error!("Save view failed: {:?}", e),
  246. }
  247. },
  248. Err(e) => log::error!("Require db connection failed: {:?}", e),
  249. },
  250. Ok(None) => {},
  251. Err(e) => log::error!("Read view failed: {:?}", e),
  252. }
  253. });
  254. Ok(())
  255. }
  256. fn listen_trash_can_event(&self) {
  257. let mut rx = self.trash_controller.subscribe();
  258. let database = self.database.clone();
  259. let document = self.document_ctx.clone();
  260. let trash_can = self.trash_controller.clone();
  261. let _ = tokio::spawn(async move {
  262. loop {
  263. let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
  264. match result {
  265. Ok(event) => event.select(TrashType::View),
  266. Err(_e) => None,
  267. }
  268. }));
  269. if let Some(event) = stream.next().await {
  270. handle_trash_event(database.clone(), document.clone(), trash_can.clone(), event).await
  271. }
  272. }
  273. });
  274. }
  275. }
  276. #[tracing::instrument(level = "trace", skip(database, context, trash_can))]
  277. async fn handle_trash_event(
  278. database: Arc<dyn WorkspaceDatabase>,
  279. context: Arc<DocumentContext>,
  280. trash_can: Arc<TrashController>,
  281. event: TrashEvent,
  282. ) {
  283. let db_result = database.db_connection();
  284. match event {
  285. TrashEvent::NewTrash(identifiers, ret) => {
  286. let result = || {
  287. let conn = &*db_result?;
  288. let view_tables = read_view_tables(identifiers, conn)?;
  289. for view_table in view_tables {
  290. let _ = notify_views_changed(&view_table.belong_to_id, trash_can.clone(), conn)?;
  291. notify_dart(view_table, WorkspaceNotification::ViewDeleted);
  292. }
  293. Ok::<(), FlowyError>(())
  294. };
  295. let _ = ret.send(result()).await;
  296. },
  297. TrashEvent::Putback(identifiers, ret) => {
  298. let result = || {
  299. let conn = &*db_result?;
  300. let view_tables = read_view_tables(identifiers, conn)?;
  301. for view_table in view_tables {
  302. let _ = notify_views_changed(&view_table.belong_to_id, trash_can.clone(), conn)?;
  303. notify_dart(view_table, WorkspaceNotification::ViewRestored);
  304. }
  305. Ok::<(), FlowyError>(())
  306. };
  307. let _ = ret.send(result()).await;
  308. },
  309. TrashEvent::Delete(identifiers, ret) => {
  310. let result = || {
  311. let conn = &*db_result?;
  312. let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
  313. let mut notify_ids = HashSet::new();
  314. for identifier in identifiers.items {
  315. let view_table = ViewTableSql::read_view(&identifier.id, conn)?;
  316. let _ = ViewTableSql::delete_view(&identifier.id, conn)?;
  317. let _ = context.controller.delete(&identifier.id)?;
  318. notify_ids.insert(view_table.belong_to_id);
  319. }
  320. for notify_id in notify_ids {
  321. let _ = notify_views_changed(&notify_id, trash_can.clone(), conn)?;
  322. }
  323. Ok(())
  324. })?;
  325. Ok::<(), FlowyError>(())
  326. };
  327. let _ = ret.send(result()).await;
  328. },
  329. }
  330. }
  331. fn read_view_tables(identifiers: RepeatedTrashId, conn: &SqliteConnection) -> Result<Vec<ViewTable>, FlowyError> {
  332. let mut view_tables = vec![];
  333. let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
  334. for identifier in identifiers.items {
  335. let view_table = ViewTableSql::read_view(&identifier.id, conn)?;
  336. view_tables.push(view_table);
  337. }
  338. Ok(())
  339. })?;
  340. Ok(view_tables)
  341. }
  342. fn notify_dart(view_table: ViewTable, notification: WorkspaceNotification) {
  343. let view: View = view_table.into();
  344. send_dart_notification(&view.id, notification).payload(view).send();
  345. }
  346. #[tracing::instrument(skip(belong_to_id, trash_controller, conn), fields(view_count), err)]
  347. fn notify_views_changed(
  348. belong_to_id: &str,
  349. trash_controller: Arc<TrashController>,
  350. conn: &SqliteConnection,
  351. ) -> FlowyResult<()> {
  352. let repeated_view = read_belonging_views_on_local(belong_to_id, trash_controller.clone(), conn)?;
  353. tracing::Span::current().record("view_count", &format!("{}", repeated_view.len()).as_str());
  354. send_dart_notification(&belong_to_id, WorkspaceNotification::AppViewsChanged)
  355. .payload(repeated_view)
  356. .send();
  357. Ok(())
  358. }
  359. fn read_belonging_views_on_local(
  360. belong_to_id: &str,
  361. trash_controller: Arc<TrashController>,
  362. conn: &SqliteConnection,
  363. ) -> FlowyResult<RepeatedView> {
  364. let mut view_tables = ViewTableSql::read_views(belong_to_id, conn)?;
  365. let trash_ids = trash_controller.read_trash_ids(conn)?;
  366. view_tables.retain(|view_table| !trash_ids.contains(&view_table.id));
  367. let views = view_tables
  368. .into_iter()
  369. .map(|view_table| view_table.into())
  370. .collect::<Vec<View>>();
  371. Ok(RepeatedView { items: views })
  372. }