controller.rs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554
  1. pub use crate::entities::view::ViewDataFormatPB;
  2. use crate::entities::{AppPB, DeletedViewPB, ViewInfoPB, ViewLayoutTypePB};
  3. use crate::manager::{ViewDataProcessor, ViewDataProcessorMap};
  4. use crate::{
  5. dart_notification::{send_dart_notification, FolderNotification},
  6. entities::{
  7. trash::{RepeatedTrashIdPB, TrashType},
  8. view::{CreateViewParams, RepeatedViewPB, UpdateViewParams, ViewIdPB, ViewPB},
  9. },
  10. errors::{FlowyError, FlowyResult},
  11. event_map::{FolderCouldServiceV1, WorkspaceUser},
  12. services::{
  13. persistence::{FolderPersistence, FolderPersistenceTransaction, ViewChangeset},
  14. TrashController, TrashEvent,
  15. },
  16. };
  17. use bytes::Bytes;
  18. use flowy_database::kv::KV;
  19. use flowy_sync::entities::document::DocumentIdPB;
  20. use folder_rev_model::{gen_view_id, ViewRevision};
  21. use futures::{FutureExt, StreamExt};
  22. use std::{collections::HashSet, sync::Arc};
  23. const LATEST_VIEW_ID: &str = "latest_view_id";
  24. pub(crate) struct ViewController {
  25. user: Arc<dyn WorkspaceUser>,
  26. cloud_service: Arc<dyn FolderCouldServiceV1>,
  27. persistence: Arc<FolderPersistence>,
  28. trash_controller: Arc<TrashController>,
  29. data_processors: ViewDataProcessorMap,
  30. }
  31. impl ViewController {
  32. pub(crate) fn new(
  33. user: Arc<dyn WorkspaceUser>,
  34. persistence: Arc<FolderPersistence>,
  35. cloud_service: Arc<dyn FolderCouldServiceV1>,
  36. trash_controller: Arc<TrashController>,
  37. data_processors: ViewDataProcessorMap,
  38. ) -> Self {
  39. Self {
  40. user,
  41. cloud_service,
  42. persistence,
  43. trash_controller,
  44. data_processors,
  45. }
  46. }
  47. pub(crate) fn initialize(&self) -> Result<(), FlowyError> {
  48. self.listen_trash_can_event();
  49. Ok(())
  50. }
  51. #[tracing::instrument(level = "trace", skip(self, params), fields(name = %params.name), err)]
  52. pub(crate) async fn create_view_from_params(
  53. &self,
  54. mut params: CreateViewParams,
  55. ) -> Result<ViewRevision, FlowyError> {
  56. let processor = self.get_data_processor(params.data_format.clone())?;
  57. let user_id = self.user.user_id()?;
  58. if params.view_content_data.is_empty() {
  59. tracing::trace!("Create view with build-in data");
  60. let view_data = processor
  61. .create_default_view(
  62. &user_id,
  63. &params.view_id,
  64. params.layout.clone(),
  65. params.data_format.clone(),
  66. )
  67. .await?;
  68. params.view_content_data = view_data.to_vec();
  69. } else {
  70. tracing::trace!("Create view with view data");
  71. let delta_data = processor
  72. .create_view_from_delta_data(
  73. &user_id,
  74. &params.view_id,
  75. params.view_content_data.clone(),
  76. params.layout.clone(),
  77. )
  78. .await?;
  79. let _ = self
  80. .create_view(
  81. &params.view_id,
  82. params.data_format.clone(),
  83. params.layout.clone(),
  84. delta_data,
  85. )
  86. .await?;
  87. };
  88. let view_rev = self.create_view_on_server(params).await?;
  89. let _ = self.create_view_on_local(view_rev.clone()).await?;
  90. Ok(view_rev)
  91. }
  92. #[tracing::instrument(level = "debug", skip(self, view_id, view_data), err)]
  93. pub(crate) async fn create_view(
  94. &self,
  95. view_id: &str,
  96. data_type: ViewDataFormatPB,
  97. layout_type: ViewLayoutTypePB,
  98. view_data: Bytes,
  99. ) -> Result<(), FlowyError> {
  100. if view_data.is_empty() {
  101. return Err(FlowyError::internal().context("The content of the view should not be empty"));
  102. }
  103. let user_id = self.user.user_id()?;
  104. let processor = self.get_data_processor(data_type)?;
  105. let _ = processor.create_view(&user_id, view_id, layout_type, view_data).await?;
  106. Ok(())
  107. }
  108. pub(crate) async fn create_view_on_local(&self, view_rev: ViewRevision) -> Result<(), FlowyError> {
  109. let trash_controller = self.trash_controller.clone();
  110. self.persistence
  111. .begin_transaction(|transaction| {
  112. let belong_to_id = view_rev.app_id.clone();
  113. let _ = transaction.create_view(view_rev)?;
  114. let _ = notify_views_changed(&belong_to_id, trash_controller, &transaction)?;
  115. Ok(())
  116. })
  117. .await
  118. }
  119. #[tracing::instrument(level = "debug", skip(self, view_id), err)]
  120. pub(crate) async fn read_view(&self, view_id: &str) -> Result<ViewRevision, FlowyError> {
  121. let view_rev = self
  122. .persistence
  123. .begin_transaction(|transaction| {
  124. let view = transaction.read_view(view_id)?;
  125. let trash_ids = self.trash_controller.read_trash_ids(&transaction)?;
  126. if trash_ids.contains(&view.id) {
  127. return Err(FlowyError::record_not_found());
  128. }
  129. Ok(view)
  130. })
  131. .await?;
  132. Ok(view_rev)
  133. }
  134. #[tracing::instrument(level = "debug", skip(self, view_id), fields(view_id = %view_id.value), err)]
  135. pub(crate) async fn read_view_pb(&self, view_id: ViewIdPB) -> Result<ViewInfoPB, FlowyError> {
  136. let view_info = self
  137. .persistence
  138. .begin_transaction(|transaction| {
  139. let view_rev = transaction.read_view(&view_id.value)?;
  140. let items: Vec<ViewPB> = view_rev
  141. .belongings
  142. .into_iter()
  143. .map(|view_rev| view_rev.into())
  144. .collect();
  145. let view_info = ViewInfoPB {
  146. id: view_rev.id,
  147. belong_to_id: view_rev.app_id,
  148. name: view_rev.name,
  149. desc: view_rev.desc,
  150. data_type: view_rev.data_format.into(),
  151. belongings: RepeatedViewPB { items },
  152. ext_data: view_rev.ext_data,
  153. };
  154. Ok(view_info)
  155. })
  156. .await?;
  157. Ok(view_info)
  158. }
  159. pub(crate) async fn read_local_views(&self, ids: Vec<String>) -> Result<Vec<ViewRevision>, FlowyError> {
  160. self.persistence
  161. .begin_transaction(|transaction| {
  162. let mut views = vec![];
  163. for view_id in ids {
  164. views.push(transaction.read_view(&view_id)?);
  165. }
  166. Ok(views)
  167. })
  168. .await
  169. }
  170. #[tracing::instrument(level = "trace", skip(self), err)]
  171. pub(crate) fn set_latest_view(&self, view_id: &str) -> Result<(), FlowyError> {
  172. KV::set_str(LATEST_VIEW_ID, view_id.to_owned());
  173. Ok(())
  174. }
  175. #[tracing::instrument(level = "debug", skip(self), err)]
  176. pub(crate) async fn close_view(&self, view_id: &str) -> Result<(), FlowyError> {
  177. let processor = self.get_data_processor_from_view_id(view_id).await?;
  178. let _ = processor.close_view(view_id).await?;
  179. Ok(())
  180. }
  181. #[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.value), err)]
  182. pub(crate) async fn move_view_to_trash(&self, params: DocumentIdPB) -> Result<(), FlowyError> {
  183. let view_id = params.value;
  184. if let Some(latest_view_id) = KV::get_str(LATEST_VIEW_ID) {
  185. if latest_view_id == view_id {
  186. let _ = KV::remove(LATEST_VIEW_ID);
  187. }
  188. }
  189. let deleted_view = self
  190. .persistence
  191. .begin_transaction(|transaction| {
  192. let view = transaction.read_view(&view_id)?;
  193. let views = read_belonging_views_on_local(&view.app_id, self.trash_controller.clone(), &transaction)?;
  194. let index = views
  195. .iter()
  196. .position(|view| view.id == view_id)
  197. .map(|index| index as i32);
  198. Ok(DeletedViewPB {
  199. view_id: view_id.clone(),
  200. index,
  201. })
  202. })
  203. .await?;
  204. send_dart_notification(&view_id, FolderNotification::ViewMoveToTrash)
  205. .payload(deleted_view)
  206. .send();
  207. let processor = self.get_data_processor_from_view_id(&view_id).await?;
  208. let _ = processor.close_view(&view_id).await?;
  209. Ok(())
  210. }
  211. #[tracing::instrument(level = "debug", skip(self), err)]
  212. pub(crate) async fn move_view(&self, view_id: &str, from: usize, to: usize) -> Result<(), FlowyError> {
  213. let _ = self
  214. .persistence
  215. .begin_transaction(|transaction| {
  216. let _ = transaction.move_view(view_id, from, to)?;
  217. let view = transaction.read_view(view_id)?;
  218. let _ = notify_views_changed(&view.app_id, self.trash_controller.clone(), &transaction)?;
  219. Ok(())
  220. })
  221. .await?;
  222. Ok(())
  223. }
  224. #[tracing::instrument(level = "debug", skip(self), err)]
  225. pub(crate) async fn duplicate_view(&self, view: ViewPB) -> Result<(), FlowyError> {
  226. let view_rev = self
  227. .persistence
  228. .begin_transaction(|transaction| transaction.read_view(&view.id))
  229. .await?;
  230. let processor = self.get_data_processor(view_rev.data_format.clone())?;
  231. let view_data = processor.get_view_data(&view).await?;
  232. let duplicate_params = CreateViewParams {
  233. belong_to_id: view_rev.app_id.clone(),
  234. name: format!("{} (copy)", &view_rev.name),
  235. desc: view_rev.desc,
  236. thumbnail: view_rev.thumbnail,
  237. data_format: view_rev.data_format.into(),
  238. layout: view_rev.layout.into(),
  239. view_content_data: view_data.to_vec(),
  240. view_id: gen_view_id(),
  241. };
  242. let _ = self.create_view_from_params(duplicate_params).await?;
  243. Ok(())
  244. }
  245. // belong_to_id will be the app_id or view_id.
  246. #[tracing::instrument(level = "trace", skip(self), err)]
  247. pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result<Vec<ViewRevision>, FlowyError> {
  248. self.persistence
  249. .begin_transaction(|transaction| {
  250. read_belonging_views_on_local(belong_to_id, self.trash_controller.clone(), &transaction)
  251. })
  252. .await
  253. }
  254. #[tracing::instrument(level = "debug", skip(self, params), err)]
  255. pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result<ViewRevision, FlowyError> {
  256. let changeset = ViewChangeset::new(params.clone());
  257. let view_id = changeset.id.clone();
  258. let view_rev = self
  259. .persistence
  260. .begin_transaction(|transaction| {
  261. let _ = transaction.update_view(changeset)?;
  262. let view_rev = transaction.read_view(&view_id)?;
  263. let view: ViewPB = view_rev.clone().into();
  264. send_dart_notification(&view_id, FolderNotification::ViewUpdated)
  265. .payload(view)
  266. .send();
  267. let _ = notify_views_changed(&view_rev.app_id, self.trash_controller.clone(), &transaction)?;
  268. Ok(view_rev)
  269. })
  270. .await?;
  271. let _ = self.update_view_on_server(params);
  272. Ok(view_rev)
  273. }
  274. pub(crate) async fn latest_visit_view(&self) -> FlowyResult<Option<ViewRevision>> {
  275. match KV::get_str(LATEST_VIEW_ID) {
  276. None => Ok(None),
  277. Some(view_id) => {
  278. let view_rev = self
  279. .persistence
  280. .begin_transaction(|transaction| transaction.read_view(&view_id))
  281. .await?;
  282. Ok(Some(view_rev))
  283. }
  284. }
  285. }
  286. }
  287. impl ViewController {
  288. #[tracing::instrument(level = "debug", skip(self, params), err)]
  289. async fn create_view_on_server(&self, params: CreateViewParams) -> Result<ViewRevision, FlowyError> {
  290. let token = self.user.token()?;
  291. let view_rev = self.cloud_service.create_view(&token, params).await?;
  292. Ok(view_rev)
  293. }
  294. #[tracing::instrument(level = "debug", skip(self), err)]
  295. fn update_view_on_server(&self, params: UpdateViewParams) -> Result<(), FlowyError> {
  296. let token = self.user.token()?;
  297. let server = self.cloud_service.clone();
  298. tokio::spawn(async move {
  299. match server.update_view(&token, params).await {
  300. Ok(_) => {}
  301. Err(e) => {
  302. // TODO: retry?
  303. log::error!("Update view failed: {:?}", e);
  304. }
  305. }
  306. });
  307. Ok(())
  308. }
  309. #[tracing::instrument(level = "debug", skip(self), err)]
  310. fn read_view_on_server(&self, params: ViewIdPB) -> Result<(), FlowyError> {
  311. let token = self.user.token()?;
  312. let server = self.cloud_service.clone();
  313. let persistence = self.persistence.clone();
  314. // TODO: Retry with RetryAction?
  315. tokio::spawn(async move {
  316. match server.read_view(&token, params).await {
  317. Ok(Some(view_rev)) => {
  318. match persistence
  319. .begin_transaction(|transaction| transaction.create_view(view_rev.clone()))
  320. .await
  321. {
  322. Ok(_) => {
  323. let view: ViewPB = view_rev.into();
  324. send_dart_notification(&view.id, FolderNotification::ViewUpdated)
  325. .payload(view)
  326. .send();
  327. }
  328. Err(e) => log::error!("Save view failed: {:?}", e),
  329. }
  330. }
  331. Ok(None) => {}
  332. Err(e) => log::error!("Read view failed: {:?}", e),
  333. }
  334. });
  335. Ok(())
  336. }
  337. fn listen_trash_can_event(&self) {
  338. let mut rx = self.trash_controller.subscribe();
  339. let persistence = self.persistence.clone();
  340. let data_processors = self.data_processors.clone();
  341. let trash_controller = self.trash_controller.clone();
  342. let _ = tokio::spawn(async move {
  343. loop {
  344. let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
  345. match result {
  346. Ok(event) => event.select(TrashType::TrashView),
  347. Err(_e) => None,
  348. }
  349. }));
  350. if let Some(event) = stream.next().await {
  351. handle_trash_event(
  352. persistence.clone(),
  353. data_processors.clone(),
  354. trash_controller.clone(),
  355. event,
  356. )
  357. .await
  358. }
  359. }
  360. });
  361. }
  362. async fn get_data_processor_from_view_id(
  363. &self,
  364. view_id: &str,
  365. ) -> FlowyResult<Arc<dyn ViewDataProcessor + Send + Sync>> {
  366. let view = self
  367. .persistence
  368. .begin_transaction(|transaction| transaction.read_view(view_id))
  369. .await?;
  370. self.get_data_processor(view.data_format)
  371. }
  372. #[inline]
  373. fn get_data_processor<T: Into<ViewDataFormatPB>>(
  374. &self,
  375. data_type: T,
  376. ) -> FlowyResult<Arc<dyn ViewDataProcessor + Send + Sync>> {
  377. let data_type = data_type.into();
  378. match self.data_processors.get(&data_type) {
  379. None => Err(FlowyError::internal().context(format!(
  380. "Get data processor failed. Unknown view data type: {:?}",
  381. data_type
  382. ))),
  383. Some(processor) => Ok(processor.clone()),
  384. }
  385. }
  386. }
  387. #[tracing::instrument(level = "trace", skip(persistence, data_processors, trash_can))]
  388. async fn handle_trash_event(
  389. persistence: Arc<FolderPersistence>,
  390. data_processors: ViewDataProcessorMap,
  391. trash_can: Arc<TrashController>,
  392. event: TrashEvent,
  393. ) {
  394. match event {
  395. TrashEvent::NewTrash(identifiers, ret) => {
  396. let result = persistence
  397. .begin_transaction(|transaction| {
  398. let view_revs = read_local_views_with_transaction(identifiers, &transaction)?;
  399. for view_rev in view_revs {
  400. let _ = notify_views_changed(&view_rev.app_id, trash_can.clone(), &transaction)?;
  401. notify_dart(view_rev.into(), FolderNotification::ViewDeleted);
  402. }
  403. Ok(())
  404. })
  405. .await;
  406. let _ = ret.send(result).await;
  407. }
  408. TrashEvent::Putback(identifiers, ret) => {
  409. let result = persistence
  410. .begin_transaction(|transaction| {
  411. let view_revs = read_local_views_with_transaction(identifiers, &transaction)?;
  412. for view_rev in view_revs {
  413. let _ = notify_views_changed(&view_rev.app_id, trash_can.clone(), &transaction)?;
  414. notify_dart(view_rev.into(), FolderNotification::ViewRestored);
  415. }
  416. Ok(())
  417. })
  418. .await;
  419. let _ = ret.send(result).await;
  420. }
  421. TrashEvent::Delete(identifiers, ret) => {
  422. let result = || async {
  423. let views = persistence
  424. .begin_transaction(|transaction| {
  425. let mut notify_ids = HashSet::new();
  426. let mut views = vec![];
  427. for identifier in identifiers.items {
  428. if let Ok(view_rev) = transaction.delete_view(&identifier.id) {
  429. notify_ids.insert(view_rev.app_id.clone());
  430. views.push(view_rev);
  431. }
  432. }
  433. for notify_id in notify_ids {
  434. let _ = notify_views_changed(&notify_id, trash_can.clone(), &transaction)?;
  435. }
  436. Ok(views)
  437. })
  438. .await?;
  439. for view in views {
  440. let data_type = view.data_format.clone().into();
  441. match get_data_processor(data_processors.clone(), &data_type) {
  442. Ok(processor) => {
  443. let _ = processor.close_view(&view.id).await?;
  444. }
  445. Err(e) => tracing::error!("{}", e),
  446. }
  447. }
  448. Ok(())
  449. };
  450. let _ = ret.send(result().await).await;
  451. }
  452. }
  453. }
  454. fn get_data_processor(
  455. data_processors: ViewDataProcessorMap,
  456. data_type: &ViewDataFormatPB,
  457. ) -> FlowyResult<Arc<dyn ViewDataProcessor + Send + Sync>> {
  458. match data_processors.get(data_type) {
  459. None => Err(FlowyError::internal().context(format!(
  460. "Get data processor failed. Unknown view data type: {:?}",
  461. data_type
  462. ))),
  463. Some(processor) => Ok(processor.clone()),
  464. }
  465. }
  466. fn read_local_views_with_transaction<'a>(
  467. identifiers: RepeatedTrashIdPB,
  468. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  469. ) -> Result<Vec<ViewRevision>, FlowyError> {
  470. let mut view_revs = vec![];
  471. for identifier in identifiers.items {
  472. view_revs.push(transaction.read_view(&identifier.id)?);
  473. }
  474. Ok(view_revs)
  475. }
  476. fn notify_dart(view: ViewPB, notification: FolderNotification) {
  477. send_dart_notification(&view.id, notification).payload(view).send();
  478. }
  479. #[tracing::instrument(
  480. level = "debug",
  481. skip(belong_to_id, trash_controller, transaction),
  482. fields(view_count),
  483. err
  484. )]
  485. fn notify_views_changed<'a>(
  486. belong_to_id: &str,
  487. trash_controller: Arc<TrashController>,
  488. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  489. ) -> FlowyResult<()> {
  490. let mut app_rev = transaction.read_app(belong_to_id)?;
  491. let trash_ids = trash_controller.read_trash_ids(transaction)?;
  492. app_rev.belongings.retain(|view| !trash_ids.contains(&view.id));
  493. let app: AppPB = app_rev.into();
  494. send_dart_notification(belong_to_id, FolderNotification::AppUpdated)
  495. .payload(app)
  496. .send();
  497. Ok(())
  498. }
  499. fn read_belonging_views_on_local<'a>(
  500. belong_to_id: &str,
  501. trash_controller: Arc<TrashController>,
  502. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  503. ) -> FlowyResult<Vec<ViewRevision>> {
  504. let mut view_revs = transaction.read_views(belong_to_id)?;
  505. let trash_ids = trash_controller.read_trash_ids(transaction)?;
  506. view_revs.retain(|view_table| !trash_ids.contains(&view_table.id));
  507. Ok(view_revs)
  508. }