controller.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383
  1. use bytes::Bytes;
  2. use flowy_collaboration::entities::{
  3. doc::{DocumentDelta, DocumentId},
  4. revision::{RepeatedRevision, Revision},
  5. };
  6. use futures::{FutureExt, StreamExt};
  7. use std::{collections::HashSet, sync::Arc};
  8. use crate::{
  9. dart_notification::{send_dart_notification, WorkspaceNotification},
  10. entities::{
  11. trash::{RepeatedTrashId, TrashType},
  12. view::{CreateViewParams, RepeatedView, UpdateViewParams, View, ViewId},
  13. },
  14. errors::{FlowyError, FlowyResult},
  15. module::{WorkspaceCloudService, WorkspaceUser},
  16. services::{
  17. persistence::{FlowyCorePersistence, FlowyCorePersistenceTransaction, ViewChangeset},
  18. TrashController,
  19. TrashEvent,
  20. },
  21. };
  22. use flowy_core_data_model::entities::share::{ExportData, ExportParams};
  23. use flowy_database::kv::KV;
  24. use flowy_document::context::DocumentContext;
  25. use lib_infra::uuid_string;
  26. const LATEST_VIEW_ID: &str = "latest_view_id";
  27. pub(crate) struct ViewController {
  28. user: Arc<dyn WorkspaceUser>,
  29. cloud_service: Arc<dyn WorkspaceCloudService>,
  30. persistence: Arc<FlowyCorePersistence>,
  31. trash_controller: Arc<TrashController>,
  32. document_ctx: Arc<DocumentContext>,
  33. }
  34. impl ViewController {
  35. pub(crate) fn new(
  36. user: Arc<dyn WorkspaceUser>,
  37. persistence: Arc<FlowyCorePersistence>,
  38. cloud_service: Arc<dyn WorkspaceCloudService>,
  39. trash_can: Arc<TrashController>,
  40. document_ctx: Arc<DocumentContext>,
  41. ) -> Self {
  42. Self {
  43. user,
  44. cloud_service,
  45. persistence,
  46. trash_controller: trash_can,
  47. document_ctx,
  48. }
  49. }
  50. pub(crate) fn init(&self) -> Result<(), FlowyError> {
  51. let _ = self.document_ctx.init()?;
  52. self.listen_trash_can_event();
  53. Ok(())
  54. }
  55. #[tracing::instrument(level = "debug", skip(self, params), fields(name = %params.name), err)]
  56. pub(crate) async fn create_view_from_params(&self, params: CreateViewParams) -> Result<View, FlowyError> {
  57. let delta_data = Bytes::from(params.view_data.clone());
  58. let user_id = self.user.user_id()?;
  59. let repeated_revision: RepeatedRevision =
  60. Revision::initial_revision(&user_id, &params.view_id, delta_data).into();
  61. let _ = self
  62. .document_ctx
  63. .controller
  64. .save_document(&params.view_id, repeated_revision)
  65. .await?;
  66. let view = self.create_view_on_server(params).await?;
  67. let _ = self.create_view_on_local(view.clone()).await?;
  68. Ok(view)
  69. }
  70. pub(crate) async fn create_view_on_local(&self, view: View) -> Result<(), FlowyError> {
  71. let trash_controller = self.trash_controller.clone();
  72. self.persistence.begin_transaction(|transaction| {
  73. let belong_to_id = view.belong_to_id.clone();
  74. let _ = transaction.create_view(view)?;
  75. let _ = notify_views_changed(&belong_to_id, trash_controller, &transaction)?;
  76. Ok(())
  77. })
  78. }
  79. #[tracing::instrument(skip(self, params), fields(view_id = %params.view_id), err)]
  80. pub(crate) async fn read_view(&self, params: ViewId) -> Result<View, FlowyError> {
  81. let view = self.persistence.begin_transaction(|transaction| {
  82. let view = transaction.read_view(&params.view_id)?;
  83. let trash_ids = self.trash_controller.read_trash_ids(&transaction)?;
  84. if trash_ids.contains(&view.id) {
  85. return Err(FlowyError::record_not_found());
  86. }
  87. Ok(view)
  88. })?;
  89. let _ = self.read_view_on_server(params);
  90. Ok(view)
  91. }
  92. pub(crate) fn read_local_views(&self, ids: Vec<String>) -> Result<Vec<View>, FlowyError> {
  93. self.persistence.begin_transaction(|transaction| {
  94. let mut views = vec![];
  95. for view_id in ids {
  96. views.push(transaction.read_view(&view_id)?);
  97. }
  98. Ok(views)
  99. })
  100. }
  101. #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
  102. pub(crate) async fn open_view(&self, params: DocumentId) -> Result<DocumentDelta, FlowyError> {
  103. let doc_id = params.doc_id.clone();
  104. let editor = self.document_ctx.controller.open_document(&params.doc_id).await?;
  105. KV::set_str(LATEST_VIEW_ID, doc_id.clone());
  106. let document_json = editor.document_json().await?;
  107. Ok(DocumentDelta {
  108. doc_id,
  109. delta_json: document_json,
  110. })
  111. }
  112. #[tracing::instrument(level = "debug", skip(self, params), err)]
  113. pub(crate) async fn close_view(&self, params: DocumentId) -> Result<(), FlowyError> {
  114. let _ = self.document_ctx.controller.close_document(&params.doc_id)?;
  115. Ok(())
  116. }
  117. #[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.doc_id), err)]
  118. pub(crate) async fn delete_view(&self, params: DocumentId) -> Result<(), FlowyError> {
  119. if let Some(view_id) = KV::get_str(LATEST_VIEW_ID) {
  120. if view_id == params.doc_id {
  121. let _ = KV::remove(LATEST_VIEW_ID);
  122. }
  123. }
  124. let _ = self.document_ctx.controller.close_document(&params.doc_id)?;
  125. Ok(())
  126. }
  127. #[tracing::instrument(level = "debug", skip(self, params), fields(doc_id = %params.doc_id), err)]
  128. pub(crate) async fn duplicate_view(&self, params: DocumentId) -> Result<(), FlowyError> {
  129. let view = self
  130. .persistence
  131. .begin_transaction(|transaction| transaction.read_view(&params.doc_id))?;
  132. let editor = self.document_ctx.controller.open_document(&params.doc_id).await?;
  133. let document_json = editor.document_json().await?;
  134. let duplicate_params = CreateViewParams {
  135. belong_to_id: view.belong_to_id.clone(),
  136. name: format!("{} (copy)", &view.name),
  137. desc: view.desc.clone(),
  138. thumbnail: "".to_owned(),
  139. view_type: view.view_type.clone(),
  140. view_data: document_json,
  141. view_id: uuid_string(),
  142. };
  143. let _ = self.create_view_from_params(duplicate_params).await?;
  144. Ok(())
  145. }
  146. #[tracing::instrument(level = "debug", skip(self, params), err)]
  147. pub(crate) async fn export_doc(&self, params: ExportParams) -> Result<ExportData, FlowyError> {
  148. let editor = self.document_ctx.controller.open_document(&params.doc_id).await?;
  149. let delta_json = editor.document_json().await?;
  150. Ok(ExportData {
  151. data: delta_json,
  152. export_type: params.export_type,
  153. })
  154. }
  155. // belong_to_id will be the app_id or view_id.
  156. #[tracing::instrument(level = "debug", skip(self), err)]
  157. pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result<RepeatedView, FlowyError> {
  158. self.persistence.begin_transaction(|transaction| {
  159. read_belonging_views_on_local(belong_to_id, self.trash_controller.clone(), &transaction)
  160. })
  161. }
  162. #[tracing::instrument(level = "debug", skip(self, params), err)]
  163. pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result<View, FlowyError> {
  164. let changeset = ViewChangeset::new(params.clone());
  165. let view_id = changeset.id.clone();
  166. let view = self.persistence.begin_transaction(|transaction| {
  167. let _ = transaction.update_view(changeset)?;
  168. let view = transaction.read_view(&view_id)?;
  169. send_dart_notification(&view_id, WorkspaceNotification::ViewUpdated)
  170. .payload(view.clone())
  171. .send();
  172. let _ = notify_views_changed(&view.belong_to_id, self.trash_controller.clone(), &transaction)?;
  173. Ok(view)
  174. })?;
  175. let _ = self.update_view_on_server(params);
  176. Ok(view)
  177. }
  178. pub(crate) async fn receive_document_delta(&self, params: DocumentDelta) -> Result<DocumentDelta, FlowyError> {
  179. let doc = self.document_ctx.controller.receive_local_delta(params).await?;
  180. Ok(doc)
  181. }
  182. pub(crate) fn latest_visit_view(&self) -> FlowyResult<Option<View>> {
  183. match KV::get_str(LATEST_VIEW_ID) {
  184. None => Ok(None),
  185. Some(view_id) => {
  186. let view = self
  187. .persistence
  188. .begin_transaction(|transaction| transaction.read_view(&view_id))?;
  189. Ok(Some(view))
  190. },
  191. }
  192. }
  193. pub(crate) fn set_latest_view(&self, view: &View) { KV::set_str(LATEST_VIEW_ID, view.id.clone()); }
  194. }
  195. impl ViewController {
  196. #[tracing::instrument(skip(self), err)]
  197. async fn create_view_on_server(&self, params: CreateViewParams) -> Result<View, FlowyError> {
  198. let token = self.user.token()?;
  199. let view = self.cloud_service.create_view(&token, params).await?;
  200. Ok(view)
  201. }
  202. #[tracing::instrument(skip(self), err)]
  203. fn update_view_on_server(&self, params: UpdateViewParams) -> Result<(), FlowyError> {
  204. let token = self.user.token()?;
  205. let server = self.cloud_service.clone();
  206. tokio::spawn(async move {
  207. match server.update_view(&token, params).await {
  208. Ok(_) => {},
  209. Err(e) => {
  210. // TODO: retry?
  211. log::error!("Update view failed: {:?}", e);
  212. },
  213. }
  214. });
  215. Ok(())
  216. }
  217. #[tracing::instrument(skip(self), err)]
  218. fn read_view_on_server(&self, params: ViewId) -> Result<(), FlowyError> {
  219. let token = self.user.token()?;
  220. let server = self.cloud_service.clone();
  221. let persistence = self.persistence.clone();
  222. // TODO: Retry with RetryAction?
  223. tokio::spawn(async move {
  224. match server.read_view(&token, params).await {
  225. Ok(Some(view)) => {
  226. match persistence.begin_transaction(|transaction| transaction.create_view(view.clone())) {
  227. Ok(_) => {
  228. send_dart_notification(&view.id, WorkspaceNotification::ViewUpdated)
  229. .payload(view.clone())
  230. .send();
  231. },
  232. Err(e) => log::error!("Save view failed: {:?}", e),
  233. }
  234. },
  235. Ok(None) => {},
  236. Err(e) => log::error!("Read view failed: {:?}", e),
  237. }
  238. });
  239. Ok(())
  240. }
  241. fn listen_trash_can_event(&self) {
  242. let mut rx = self.trash_controller.subscribe();
  243. let persistence = self.persistence.clone();
  244. let document = self.document_ctx.clone();
  245. let trash_controller = self.trash_controller.clone();
  246. let _ = tokio::spawn(async move {
  247. loop {
  248. let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
  249. match result {
  250. Ok(event) => event.select(TrashType::View),
  251. Err(_e) => None,
  252. }
  253. }));
  254. if let Some(event) = stream.next().await {
  255. handle_trash_event(persistence.clone(), document.clone(), trash_controller.clone(), event).await
  256. }
  257. }
  258. });
  259. }
  260. }
  261. #[tracing::instrument(level = "trace", skip(persistence, context, trash_can))]
  262. async fn handle_trash_event(
  263. persistence: Arc<FlowyCorePersistence>,
  264. context: Arc<DocumentContext>,
  265. trash_can: Arc<TrashController>,
  266. event: TrashEvent,
  267. ) {
  268. match event {
  269. TrashEvent::NewTrash(identifiers, ret) => {
  270. let result = persistence.begin_transaction(|transaction| {
  271. let views = read_local_views_with_transaction(identifiers, &transaction)?;
  272. for view in views {
  273. let _ = notify_views_changed(&view.belong_to_id, trash_can.clone(), &transaction)?;
  274. notify_dart(view, WorkspaceNotification::ViewDeleted);
  275. }
  276. Ok(())
  277. });
  278. let _ = ret.send(result).await;
  279. },
  280. TrashEvent::Putback(identifiers, ret) => {
  281. let result = persistence.begin_transaction(|transaction| {
  282. let views = read_local_views_with_transaction(identifiers, &transaction)?;
  283. for view in views {
  284. let _ = notify_views_changed(&view.belong_to_id, trash_can.clone(), &transaction)?;
  285. notify_dart(view, WorkspaceNotification::ViewRestored);
  286. }
  287. Ok(())
  288. });
  289. let _ = ret.send(result).await;
  290. },
  291. TrashEvent::Delete(identifiers, ret) => {
  292. let result = persistence.begin_transaction(|transaction| {
  293. let mut notify_ids = HashSet::new();
  294. for identifier in identifiers.items {
  295. let view = transaction.read_view(&identifier.id)?;
  296. let _ = transaction.delete_view(&identifier.id)?;
  297. let _ = context.controller.delete(&identifier.id)?;
  298. notify_ids.insert(view.belong_to_id);
  299. }
  300. for notify_id in notify_ids {
  301. let _ = notify_views_changed(&notify_id, trash_can.clone(), &transaction)?;
  302. }
  303. Ok(())
  304. });
  305. let _ = ret.send(result).await;
  306. },
  307. }
  308. }
  309. fn read_local_views_with_transaction<'a>(
  310. identifiers: RepeatedTrashId,
  311. transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a),
  312. ) -> Result<Vec<View>, FlowyError> {
  313. let mut views = vec![];
  314. for identifier in identifiers.items {
  315. let view = transaction.read_view(&identifier.id)?;
  316. views.push(view);
  317. }
  318. Ok(views)
  319. }
  320. fn notify_dart(view: View, notification: WorkspaceNotification) {
  321. send_dart_notification(&view.id, notification).payload(view).send();
  322. }
  323. #[tracing::instrument(skip(belong_to_id, trash_controller, transaction), fields(view_count), err)]
  324. fn notify_views_changed<'a>(
  325. belong_to_id: &str,
  326. trash_controller: Arc<TrashController>,
  327. transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a),
  328. ) -> FlowyResult<()> {
  329. let repeated_view = read_belonging_views_on_local(belong_to_id, trash_controller.clone(), transaction)?;
  330. tracing::Span::current().record("view_count", &format!("{}", repeated_view.len()).as_str());
  331. send_dart_notification(&belong_to_id, WorkspaceNotification::AppViewsChanged)
  332. .payload(repeated_view)
  333. .send();
  334. Ok(())
  335. }
  336. fn read_belonging_views_on_local<'a>(
  337. belong_to_id: &str,
  338. trash_controller: Arc<TrashController>,
  339. transaction: &'a (dyn FlowyCorePersistenceTransaction + 'a),
  340. ) -> FlowyResult<RepeatedView> {
  341. let mut views = transaction.read_views(belong_to_id)?;
  342. let trash_ids = trash_controller.read_trash_ids(transaction)?;
  343. views.retain(|view_table| !trash_ids.contains(&view_table.id));
  344. Ok(RepeatedView { items: views })
  345. }