controller.rs 16 KB


  1. use bytes::Bytes;
  2. use flowy_collaboration::entities::{
  3. doc::{DocumentDelta, DocumentId},
  4. revision::{RepeatedRevision, Revision},
  5. };
  6. use flowy_database::SqliteConnection;
  7. use futures::{FutureExt, StreamExt};
  8. use std::{collections::HashSet, sync::Arc};
  9. use crate::{
  10. entities::{
  11. trash::{RepeatedTrashId, TrashType},
  12. view::{CreateViewParams, RepeatedView, UpdateViewParams, View, ViewId},
  13. },
  14. errors::{FlowyError, FlowyResult},
  15. module::{WorkspaceCloudService, WorkspaceDatabase, WorkspaceUser},
  16. notify::{send_dart_notification, WorkspaceNotification},
  17. services::{
  18. view::sql::{ViewTable, ViewTableChangeset, ViewTableSql},
  19. TrashController,
  20. TrashEvent,
  21. },
  22. };
  23. use flowy_core_data_model::entities::share::{ExportData, ExportParams};
  24. use flowy_database::kv::KV;
  25. use flowy_document::context::DocumentContext;
  26. use lib_infra::uuid_string;
  27. const LATEST_VIEW_ID: &str = "latest_view_id";
  28. pub(crate) struct ViewController {
  29. user: Arc<dyn WorkspaceUser>,
  30. cloud_service: Arc<dyn WorkspaceCloudService>,
  31. database: Arc<dyn WorkspaceDatabase>,
  32. trash_controller: Arc<TrashController>,
  33. document_ctx: Arc<DocumentContext>,
  34. }
  35. impl ViewController {
  36. pub(crate) fn new(
  37. user: Arc<dyn WorkspaceUser>,
  38. database: Arc<dyn WorkspaceDatabase>,
  39. cloud_service: Arc<dyn WorkspaceCloudService>,
  40. trash_can: Arc<TrashController>,
  41. document_ctx: Arc<DocumentContext>,
  42. ) -> Self {
  43. Self {
  44. user,
  45. cloud_service,
  46. database,
  47. trash_controller: trash_can,
  48. document_ctx,
  49. }
  50. }
  51. pub(crate) fn init(&self) -> Result<(), FlowyError> {
  52. let _ = self.document_ctx.init()?;
  53. self.listen_trash_can_event();
  54. Ok(())
  55. }
  56. #[tracing::instrument(level = "debug", skip(self, params), fields(name = %params.name), err)]
  57. pub(crate) async fn create_view_from_params(&self, params: CreateViewParams) -> Result<View, FlowyError> {
  58. let delta_data = Bytes::from(params.view_data.clone());
  59. let user_id = self.user.user_id()?;
  60. let repeated_revision: RepeatedRevision =
  61. Revision::initial_revision(&user_id, &params.view_id, delta_data).into();
  62. let _ = self
  63. .document_ctx
  64. .controller
  65. .save_document(&params.view_id, repeated_revision)
  66. .await?;
  67. let view = self.create_view_on_server(params).await?;
  68. let _ = self.create_view_on_local(view.clone()).await?;
  69. Ok(view)
  70. }
  71. pub(crate) async fn create_view_on_local(&self, view: View) -> Result<(), FlowyError> {
  72. let conn = &*self.database.db_connection()?;
  73. let trash_can = self.trash_controller.clone();
  74. conn.immediate_transaction::<_, FlowyError, _>(|| {
  75. let belong_to_id = view.belong_to_id.clone();
  76. let _ = self.save_view(view, conn)?;
  77. let _ = notify_views_changed(&belong_to_id, trash_can, &conn)?;
  78. Ok(())
  79. })?;
  80. Ok(())
  81. }
  82. pub(crate) fn save_view(&self, view: View, conn: &SqliteConnection) -> Result<(), FlowyError> {
  83. let view_table = ViewTable::new(view);
  84. let _ = ViewTableSql::create_view(view_table, conn)?;
  85. Ok(())
  86. }
  87. #[tracing::instrument(skip(self, params), fields(view_id = %params.view_id), err)]
  88. pub(crate) async fn read_view(&self, params: ViewId) -> Result<View, FlowyError> {
  89. let conn = self.database.db_connection()?;
  90. let view_table = ViewTableSql::read_view(&params.view_id, &*conn)?;
  91. let trash_ids = self.trash_controller.read_trash_ids(&conn)?;
  92. if trash_ids.contains(&view_table.id) {
  93. return Err(FlowyError::record_not_found());
  94. }
  95. let view: View = view_table.into();
  96. let _ = self.read_view_on_server(params);
  97. Ok(view)
  98. }
  99. pub(crate) fn read_view_tables(&self, ids: Vec<String>) -> Result<Vec<ViewTable>, FlowyError> {
  100. let conn = &*self.database.db_connection()?;
  101. let mut view_tables = vec![];
  102. conn.immediate_transaction::<_, FlowyError, _>(|| {
  103. for view_id in ids {
  104. view_tables.push(ViewTableSql::read_view(&view_id, conn)?);
  105. }
  106. Ok(())
  107. })?;
  108. Ok(view_tables)
  109. }
  110. #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
  111. pub(crate) async fn open_view(&self, params: DocumentId) -> Result<DocumentDelta, FlowyError> {
  112. let doc_id = params.doc_id.clone();
  113. let editor = self.document_ctx.controller.open_document(&params.doc_id).await?;
  114. KV::set_str(LATEST_VIEW_ID, doc_id.clone());
  115. let document_json = editor.document_json().await?;
  116. Ok(DocumentDelta {
  117. doc_id,
  118. delta_json: document_json,
  119. })
  120. }
  121. #[tracing::instrument(level = "debug", skip(self, params), err)]
  122. pub(crate) async fn close_view(&self, params: DocumentId) -> Result<(), FlowyError> {
  123. let _ = self.document_ctx.controller.close_document(&params.doc_id)?;
  124. Ok(())
  125. }
  126. #[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.doc_id), err)]
  127. pub(crate) async fn delete_view(&self, params: DocumentId) -> Result<(), FlowyError> {
  128. if let Some(view_id) = KV::get_str(LATEST_VIEW_ID) {
  129. if view_id == params.doc_id {
  130. let _ = KV::remove(LATEST_VIEW_ID);
  131. }
  132. }
  133. let _ = self.document_ctx.controller.close_document(&params.doc_id)?;
  134. Ok(())
  135. }
  136. #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
  137. pub(crate) async fn duplicate_view(&self, params: DocumentId) -> Result<(), FlowyError> {
  138. let view: View = ViewTableSql::read_view(&params.doc_id, &*self.database.db_connection()?)?.into();
  139. let editor = self.document_ctx.controller.open_document(&params.doc_id).await?;
  140. let document_json = editor.document_json().await?;
  141. let duplicate_params = CreateViewParams {
  142. belong_to_id: view.belong_to_id.clone(),
  143. name: format!("{} (copy)", &view.name),
  144. desc: view.desc.clone(),
  145. thumbnail: "".to_owned(),
  146. view_type: view.view_type.clone(),
  147. view_data: document_json,
  148. view_id: uuid_string(),
  149. };
  150. let _ = self.create_view_from_params(duplicate_params).await?;
  151. Ok(())
  152. }
  153. #[tracing::instrument(level = "debug", skip(self, params), err)]
  154. pub(crate) async fn export_doc(&self, params: ExportParams) -> Result<ExportData, FlowyError> {
  155. let editor = self.document_ctx.controller.open_document(&params.doc_id).await?;
  156. let delta_json = editor.document_json().await?;
  157. Ok(ExportData {
  158. data: delta_json,
  159. export_type: params.export_type,
  160. })
  161. }
  162. // belong_to_id will be the app_id or view_id.
  163. #[tracing::instrument(level = "debug", skip(self), err)]
  164. pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result<RepeatedView, FlowyError> {
  165. // TODO: read from server
  166. let conn = self.database.db_connection()?;
  167. let repeated_view = read_belonging_views_on_local(belong_to_id, self.trash_controller.clone(), &conn)?;
  168. Ok(repeated_view)
  169. }
  170. #[tracing::instrument(level = "debug", skip(self, params), err)]
  171. pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result<View, FlowyError> {
  172. let conn = &*self.database.db_connection()?;
  173. let changeset = ViewTableChangeset::new(params.clone());
  174. let view_id = changeset.id.clone();
  175. let updated_view = conn.immediate_transaction::<_, FlowyError, _>(|| {
  176. let _ = ViewTableSql::update_view(changeset, conn)?;
  177. let view: View = ViewTableSql::read_view(&view_id, conn)?.into();
  178. Ok(view)
  179. })?;
  180. send_dart_notification(&view_id, WorkspaceNotification::ViewUpdated)
  181. .payload(updated_view.clone())
  182. .send();
  183. //
  184. let _ = notify_views_changed(&updated_view.belong_to_id, self.trash_controller.clone(), conn)?;
  185. let _ = self.update_view_on_server(params);
  186. Ok(updated_view)
  187. }
  188. pub(crate) async fn receive_document_delta(&self, params: DocumentDelta) -> Result<DocumentDelta, FlowyError> {
  189. let doc = self.document_ctx.controller.receive_local_delta(params).await?;
  190. Ok(doc)
  191. }
  192. pub(crate) fn latest_visit_view(&self) -> FlowyResult<Option<View>> {
  193. match KV::get_str(LATEST_VIEW_ID) {
  194. None => Ok(None),
  195. Some(view_id) => {
  196. let conn = self.database.db_connection()?;
  197. let view_table = ViewTableSql::read_view(&view_id, &*conn)?;
  198. Ok(Some(view_table.into()))
  199. },
  200. }
  201. }
  202. pub(crate) fn set_latest_view(&self, view: &View) { KV::set_str(LATEST_VIEW_ID, view.id.clone()); }
  203. }
  204. impl ViewController {
  205. #[tracing::instrument(skip(self), err)]
  206. async fn create_view_on_server(&self, params: CreateViewParams) -> Result<View, FlowyError> {
  207. let token = self.user.token()?;
  208. let view = self.cloud_service.create_view(&token, params).await?;
  209. Ok(view)
  210. }
  211. #[tracing::instrument(skip(self), err)]
  212. fn update_view_on_server(&self, params: UpdateViewParams) -> Result<(), FlowyError> {
  213. let token = self.user.token()?;
  214. let server = self.cloud_service.clone();
  215. tokio::spawn(async move {
  216. match server.update_view(&token, params).await {
  217. Ok(_) => {},
  218. Err(e) => {
  219. // TODO: retry?
  220. log::error!("Update view failed: {:?}", e);
  221. },
  222. }
  223. });
  224. Ok(())
  225. }
  226. #[tracing::instrument(skip(self), err)]
  227. fn read_view_on_server(&self, params: ViewId) -> Result<(), FlowyError> {
  228. let token = self.user.token()?;
  229. let server = self.cloud_service.clone();
  230. let pool = self.database.db_pool()?;
  231. // TODO: Retry with RetryAction?
  232. tokio::spawn(async move {
  233. match server.read_view(&token, params).await {
  234. Ok(Some(view)) => match pool.get() {
  235. Ok(conn) => {
  236. let view_table = ViewTable::new(view.clone());
  237. let result = ViewTableSql::create_view(view_table, &conn);
  238. match result {
  239. Ok(_) => {
  240. send_dart_notification(&view.id, WorkspaceNotification::ViewUpdated)
  241. .payload(view.clone())
  242. .send();
  243. },
  244. Err(e) => log::error!("Save view failed: {:?}", e),
  245. }
  246. },
  247. Err(e) => log::error!("Require db connection failed: {:?}", e),
  248. },
  249. Ok(None) => {},
  250. Err(e) => log::error!("Read view failed: {:?}", e),
  251. }
  252. });
  253. Ok(())
  254. }
  255. fn listen_trash_can_event(&self) {
  256. let mut rx = self.trash_controller.subscribe();
  257. let database = self.database.clone();
  258. let document = self.document_ctx.clone();
  259. let trash_can = self.trash_controller.clone();
  260. let _ = tokio::spawn(async move {
  261. loop {
  262. let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
  263. match result {
  264. Ok(event) => event.select(TrashType::View),
  265. Err(_e) => None,
  266. }
  267. }));
  268. if let Some(event) = stream.next().await {
  269. handle_trash_event(database.clone(), document.clone(), trash_can.clone(), event).await
  270. }
  271. }
  272. });
  273. }
  274. }
  275. #[tracing::instrument(level = "trace", skip(database, context, trash_can))]
  276. async fn handle_trash_event(
  277. database: Arc<dyn WorkspaceDatabase>,
  278. context: Arc<DocumentContext>,
  279. trash_can: Arc<TrashController>,
  280. event: TrashEvent,
  281. ) {
  282. let db_result = database.db_connection();
  283. match event {
  284. TrashEvent::NewTrash(identifiers, ret) => {
  285. let result = || {
  286. let conn = &*db_result?;
  287. let view_tables = read_view_tables(identifiers, conn)?;
  288. for view_table in view_tables {
  289. let _ = notify_views_changed(&view_table.belong_to_id, trash_can.clone(), conn)?;
  290. notify_dart(view_table, WorkspaceNotification::ViewDeleted);
  291. }
  292. Ok::<(), FlowyError>(())
  293. };
  294. let _ = ret.send(result()).await;
  295. },
  296. TrashEvent::Putback(identifiers, ret) => {
  297. let result = || {
  298. let conn = &*db_result?;
  299. let view_tables = read_view_tables(identifiers, conn)?;
  300. for view_table in view_tables {
  301. let _ = notify_views_changed(&view_table.belong_to_id, trash_can.clone(), conn)?;
  302. notify_dart(view_table, WorkspaceNotification::ViewRestored);
  303. }
  304. Ok::<(), FlowyError>(())
  305. };
  306. let _ = ret.send(result()).await;
  307. },
  308. TrashEvent::Delete(identifiers, ret) => {
  309. let result = || {
  310. let conn = &*db_result?;
  311. let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
  312. let mut notify_ids = HashSet::new();
  313. for identifier in identifiers.items {
  314. let view_table = ViewTableSql::read_view(&identifier.id, conn)?;
  315. let _ = ViewTableSql::delete_view(&identifier.id, conn)?;
  316. let _ = context.controller.delete(&identifier.id)?;
  317. notify_ids.insert(view_table.belong_to_id);
  318. }
  319. for notify_id in notify_ids {
  320. let _ = notify_views_changed(&notify_id, trash_can.clone(), conn)?;
  321. }
  322. Ok(())
  323. })?;
  324. Ok::<(), FlowyError>(())
  325. };
  326. let _ = ret.send(result()).await;
  327. },
  328. }
  329. }
  330. fn read_view_tables(identifiers: RepeatedTrashId, conn: &SqliteConnection) -> Result<Vec<ViewTable>, FlowyError> {
  331. let mut view_tables = vec![];
  332. let _ = conn.immediate_transaction::<_, FlowyError, _>(|| {
  333. for identifier in identifiers.items {
  334. let view_table = ViewTableSql::read_view(&identifier.id, conn)?;
  335. view_tables.push(view_table);
  336. }
  337. Ok(())
  338. })?;
  339. Ok(view_tables)
  340. }
  341. fn notify_dart(view_table: ViewTable, notification: WorkspaceNotification) {
  342. let view: View = view_table.into();
  343. send_dart_notification(&view.id, notification).payload(view).send();
  344. }
  345. #[tracing::instrument(skip(belong_to_id, trash_controller, conn), fields(view_count), err)]
  346. fn notify_views_changed(
  347. belong_to_id: &str,
  348. trash_controller: Arc<TrashController>,
  349. conn: &SqliteConnection,
  350. ) -> FlowyResult<()> {
  351. let repeated_view = read_belonging_views_on_local(belong_to_id, trash_controller.clone(), conn)?;
  352. tracing::Span::current().record("view_count", &format!("{}", repeated_view.len()).as_str());
  353. send_dart_notification(&belong_to_id, WorkspaceNotification::AppViewsChanged)
  354. .payload(repeated_view)
  355. .send();
  356. Ok(())
  357. }
  358. fn read_belonging_views_on_local(
  359. belong_to_id: &str,
  360. trash_controller: Arc<TrashController>,
  361. conn: &SqliteConnection,
  362. ) -> FlowyResult<RepeatedView> {
  363. let mut view_tables = ViewTableSql::read_views(belong_to_id, conn)?;
  364. let trash_ids = trash_controller.read_trash_ids(conn)?;
  365. view_tables.retain(|view_table| !trash_ids.contains(&view_table.id));
  366. let views = view_tables
  367. .into_iter()
  368. .map(|view_table| view_table.into())
  369. .collect::<Vec<View>>();
  370. Ok(RepeatedView { items: views })
  371. }