controller.rs 20 KB


  1. pub use crate::entities::view::ViewDataFormatPB;
  2. use crate::entities::{AppPB, DeletedViewPB, ViewInfoPB, ViewLayoutTypePB};
  3. use crate::manager::{ViewDataProcessor, ViewDataProcessorMap};
  4. use crate::{
  5. dart_notification::{send_dart_notification, FolderNotification},
  6. entities::{
  7. trash::{RepeatedTrashIdPB, TrashType},
  8. view::{CreateViewParams, RepeatedViewPB, UpdateViewParams, ViewIdPB, ViewPB},
  9. },
  10. errors::{FlowyError, FlowyResult},
  11. event_map::{FolderCouldServiceV1, WorkspaceUser},
  12. services::{
  13. persistence::{FolderPersistence, FolderPersistenceTransaction, ViewChangeset},
  14. TrashController, TrashEvent,
  15. },
  16. };
  17. use bytes::Bytes;
  18. use flowy_database::kv::KV;
  19. use flowy_http_model::document::DocumentId;
  20. use folder_rev_model::{gen_view_id, ViewRevision};
  21. use futures::{FutureExt, StreamExt};
  22. use std::{collections::HashSet, sync::Arc};
  23. const LATEST_VIEW_ID: &str = "latest_view_id";
  24. pub(crate) struct ViewController {
  25. user: Arc<dyn WorkspaceUser>,
  26. cloud_service: Arc<dyn FolderCouldServiceV1>,
  27. persistence: Arc<FolderPersistence>,
  28. trash_controller: Arc<TrashController>,
  29. data_processors: ViewDataProcessorMap,
  30. }
  31. impl ViewController {
  32. pub(crate) fn new(
  33. user: Arc<dyn WorkspaceUser>,
  34. persistence: Arc<FolderPersistence>,
  35. cloud_service: Arc<dyn FolderCouldServiceV1>,
  36. trash_controller: Arc<TrashController>,
  37. data_processors: ViewDataProcessorMap,
  38. ) -> Self {
  39. Self {
  40. user,
  41. cloud_service,
  42. persistence,
  43. trash_controller,
  44. data_processors,
  45. }
  46. }
  47. pub(crate) fn initialize(&self) -> Result<(), FlowyError> {
  48. self.listen_trash_can_event();
  49. Ok(())
  50. }
  51. #[tracing::instrument(level = "trace", skip(self, params), fields(name = %params.name), err)]
  52. pub(crate) async fn create_view_from_params(
  53. &self,
  54. mut params: CreateViewParams,
  55. ) -> Result<ViewRevision, FlowyError> {
  56. let processor = self.get_data_processor(params.data_format.clone())?;
  57. let user_id = self.user.user_id()?;
  58. if params.view_content_data.is_empty() {
  59. tracing::trace!("Create view with build-in data");
  60. let view_data = processor
  61. .create_default_view(
  62. &user_id,
  63. &params.view_id,
  64. params.layout.clone(),
  65. params.data_format.clone(),
  66. )
  67. .await?;
  68. params.view_content_data = view_data.to_vec();
  69. } else {
  70. tracing::trace!("Create view with view data");
  71. let delta_data = processor
  72. .create_view_from_delta_data(
  73. &user_id,
  74. &params.view_id,
  75. params.view_content_data.clone(),
  76. params.layout.clone(),
  77. )
  78. .await?;
  79. self.create_view(
  80. &params.view_id,
  81. params.data_format.clone(),
  82. params.layout.clone(),
  83. delta_data,
  84. )
  85. .await?;
  86. };
  87. let view_rev = self.create_view_on_server(params).await?;
  88. self.create_view_on_local(view_rev.clone()).await?;
  89. Ok(view_rev)
  90. }
  91. #[tracing::instrument(level = "debug", skip(self, view_id, view_data), err)]
  92. pub(crate) async fn create_view(
  93. &self,
  94. view_id: &str,
  95. data_type: ViewDataFormatPB,
  96. layout_type: ViewLayoutTypePB,
  97. view_data: Bytes,
  98. ) -> Result<(), FlowyError> {
  99. if view_data.is_empty() {
  100. return Err(FlowyError::internal().context("The content of the view should not be empty"));
  101. }
  102. let user_id = self.user.user_id()?;
  103. let processor = self.get_data_processor(data_type)?;
  104. processor.create_view(&user_id, view_id, layout_type, view_data).await?;
  105. Ok(())
  106. }
  107. pub(crate) async fn create_view_on_local(&self, view_rev: ViewRevision) -> Result<(), FlowyError> {
  108. let trash_controller = self.trash_controller.clone();
  109. self.persistence
  110. .begin_transaction(|transaction| {
  111. let belong_to_id = view_rev.app_id.clone();
  112. transaction.create_view(view_rev)?;
  113. notify_views_changed(&belong_to_id, trash_controller, &transaction)?;
  114. Ok(())
  115. })
  116. .await
  117. }
  118. #[tracing::instrument(level = "debug", skip(self, view_id), err)]
  119. pub(crate) async fn read_view(&self, view_id: &str) -> Result<ViewRevision, FlowyError> {
  120. let view_rev = self
  121. .persistence
  122. .begin_transaction(|transaction| {
  123. let view = transaction.read_view(view_id)?;
  124. let trash_ids = self.trash_controller.read_trash_ids(&transaction)?;
  125. if trash_ids.contains(&view.id) {
  126. return Err(FlowyError::record_not_found());
  127. }
  128. Ok(view)
  129. })
  130. .await?;
  131. Ok(view_rev)
  132. }
  133. #[tracing::instrument(level = "debug", skip(self, view_id), fields(view_id = %view_id.value), err)]
  134. pub(crate) async fn read_view_pb(&self, view_id: ViewIdPB) -> Result<ViewInfoPB, FlowyError> {
  135. let view_info = self
  136. .persistence
  137. .begin_transaction(|transaction| {
  138. let view_rev = transaction.read_view(&view_id.value)?;
  139. let items: Vec<ViewPB> = view_rev
  140. .belongings
  141. .into_iter()
  142. .map(|view_rev| view_rev.into())
  143. .collect();
  144. let view_info = ViewInfoPB {
  145. id: view_rev.id,
  146. belong_to_id: view_rev.app_id,
  147. name: view_rev.name,
  148. desc: view_rev.desc,
  149. data_type: view_rev.data_format.into(),
  150. belongings: RepeatedViewPB { items },
  151. ext_data: view_rev.ext_data,
  152. };
  153. Ok(view_info)
  154. })
  155. .await?;
  156. Ok(view_info)
  157. }
  158. pub(crate) async fn read_local_views(&self, ids: Vec<String>) -> Result<Vec<ViewRevision>, FlowyError> {
  159. self.persistence
  160. .begin_transaction(|transaction| {
  161. let mut views = vec![];
  162. for view_id in ids {
  163. views.push(transaction.read_view(&view_id)?);
  164. }
  165. Ok(views)
  166. })
  167. .await
  168. }
  169. #[tracing::instrument(level = "trace", skip(self), err)]
  170. pub(crate) fn set_latest_view(&self, view_id: &str) -> Result<(), FlowyError> {
  171. KV::set_str(LATEST_VIEW_ID, view_id.to_owned());
  172. Ok(())
  173. }
  174. #[tracing::instrument(level = "trace", skip(self))]
  175. pub(crate) fn clear_latest_view(&self) {
  176. let _ = KV::remove(LATEST_VIEW_ID);
  177. }
  178. #[tracing::instrument(level = "debug", skip(self), err)]
  179. pub(crate) async fn close_view(&self, view_id: &str) -> Result<(), FlowyError> {
  180. let processor = self.get_data_processor_from_view_id(view_id).await?;
  181. processor.close_view(view_id).await?;
  182. Ok(())
  183. }
  184. #[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.value), err)]
  185. pub(crate) async fn move_view_to_trash(&self, params: DocumentId) -> Result<(), FlowyError> {
  186. let view_id = params.value;
  187. if let Some(latest_view_id) = KV::get_str(LATEST_VIEW_ID) {
  188. if latest_view_id == view_id {
  189. let _ = KV::remove(LATEST_VIEW_ID);
  190. }
  191. }
  192. let deleted_view = self
  193. .persistence
  194. .begin_transaction(|transaction| {
  195. let view = transaction.read_view(&view_id)?;
  196. let views = read_belonging_views_on_local(&view.app_id, self.trash_controller.clone(), &transaction)?;
  197. let index = views
  198. .iter()
  199. .position(|view| view.id == view_id)
  200. .map(|index| index as i32);
  201. Ok(DeletedViewPB {
  202. view_id: view_id.clone(),
  203. index,
  204. })
  205. })
  206. .await?;
  207. send_dart_notification(&view_id, FolderNotification::ViewMoveToTrash)
  208. .payload(deleted_view)
  209. .send();
  210. let processor = self.get_data_processor_from_view_id(&view_id).await?;
  211. processor.close_view(&view_id).await?;
  212. Ok(())
  213. }
  214. #[tracing::instrument(level = "debug", skip(self), err)]
  215. pub(crate) async fn move_view(&self, view_id: &str, from: usize, to: usize) -> Result<(), FlowyError> {
  216. self.persistence
  217. .begin_transaction(|transaction| {
  218. transaction.move_view(view_id, from, to)?;
  219. let view = transaction.read_view(view_id)?;
  220. notify_views_changed(&view.app_id, self.trash_controller.clone(), &transaction)?;
  221. Ok(())
  222. })
  223. .await?;
  224. Ok(())
  225. }
  226. #[tracing::instrument(level = "debug", skip(self), err)]
  227. pub(crate) async fn duplicate_view(&self, view: ViewPB) -> Result<(), FlowyError> {
  228. let view_rev = self
  229. .persistence
  230. .begin_transaction(|transaction| transaction.read_view(&view.id))
  231. .await?;
  232. let processor = self.get_data_processor(view_rev.data_format.clone())?;
  233. let view_data = processor.get_view_data(&view).await?;
  234. let duplicate_params = CreateViewParams {
  235. belong_to_id: view_rev.app_id.clone(),
  236. name: format!("{} (copy)", &view_rev.name),
  237. desc: view_rev.desc,
  238. thumbnail: view_rev.thumbnail,
  239. data_format: view_rev.data_format.into(),
  240. layout: view_rev.layout.into(),
  241. view_content_data: view_data.to_vec(),
  242. view_id: gen_view_id(),
  243. };
  244. let _ = self.create_view_from_params(duplicate_params).await?;
  245. Ok(())
  246. }
  247. // belong_to_id will be the app_id or view_id.
  248. #[tracing::instrument(level = "trace", skip(self), err)]
  249. pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result<Vec<ViewRevision>, FlowyError> {
  250. self.persistence
  251. .begin_transaction(|transaction| {
  252. read_belonging_views_on_local(belong_to_id, self.trash_controller.clone(), &transaction)
  253. })
  254. .await
  255. }
  256. #[tracing::instrument(level = "debug", skip(self, params), err)]
  257. pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result<ViewRevision, FlowyError> {
  258. let changeset = ViewChangeset::new(params.clone());
  259. let view_id = changeset.id.clone();
  260. let view_rev = self
  261. .persistence
  262. .begin_transaction(|transaction| {
  263. transaction.update_view(changeset)?;
  264. let view_rev = transaction.read_view(&view_id)?;
  265. let view: ViewPB = view_rev.clone().into();
  266. send_dart_notification(&view_id, FolderNotification::ViewUpdated)
  267. .payload(view)
  268. .send();
  269. notify_views_changed(&view_rev.app_id, self.trash_controller.clone(), &transaction)?;
  270. Ok(view_rev)
  271. })
  272. .await?;
  273. let _ = self.update_view_on_server(params);
  274. Ok(view_rev)
  275. }
  276. pub(crate) async fn latest_visit_view(&self) -> FlowyResult<Option<ViewRevision>> {
  277. match KV::get_str(LATEST_VIEW_ID) {
  278. None => Ok(None),
  279. Some(view_id) => {
  280. let view_rev = self
  281. .persistence
  282. .begin_transaction(|transaction| transaction.read_view(&view_id))
  283. .await?;
  284. Ok(Some(view_rev))
  285. }
  286. }
  287. }
  288. }
  289. impl ViewController {
  290. #[tracing::instrument(level = "debug", skip(self, params), err)]
  291. async fn create_view_on_server(&self, params: CreateViewParams) -> Result<ViewRevision, FlowyError> {
  292. let token = self.user.token()?;
  293. let view_rev = self.cloud_service.create_view(&token, params).await?;
  294. Ok(view_rev)
  295. }
  296. #[tracing::instrument(level = "debug", skip(self), err)]
  297. fn update_view_on_server(&self, params: UpdateViewParams) -> Result<(), FlowyError> {
  298. let token = self.user.token()?;
  299. let server = self.cloud_service.clone();
  300. tokio::spawn(async move {
  301. match server.update_view(&token, params).await {
  302. Ok(_) => {}
  303. Err(e) => {
  304. // TODO: retry?
  305. log::error!("Update view failed: {:?}", e);
  306. }
  307. }
  308. });
  309. Ok(())
  310. }
  311. #[tracing::instrument(level = "debug", skip(self), err)]
  312. fn read_view_on_server(&self, params: ViewIdPB) -> Result<(), FlowyError> {
  313. let token = self.user.token()?;
  314. let server = self.cloud_service.clone();
  315. let persistence = self.persistence.clone();
  316. // TODO: Retry with RetryAction?
  317. tokio::spawn(async move {
  318. match server.read_view(&token, params).await {
  319. Ok(Some(view_rev)) => {
  320. match persistence
  321. .begin_transaction(|transaction| transaction.create_view(view_rev.clone()))
  322. .await
  323. {
  324. Ok(_) => {
  325. let view: ViewPB = view_rev.into();
  326. send_dart_notification(&view.id, FolderNotification::ViewUpdated)
  327. .payload(view)
  328. .send();
  329. }
  330. Err(e) => log::error!("Save view failed: {:?}", e),
  331. }
  332. }
  333. Ok(None) => {}
  334. Err(e) => log::error!("Read view failed: {:?}", e),
  335. }
  336. });
  337. Ok(())
  338. }
  339. fn listen_trash_can_event(&self) {
  340. let mut rx = self.trash_controller.subscribe();
  341. let persistence = self.persistence.clone();
  342. let data_processors = self.data_processors.clone();
  343. let trash_controller = self.trash_controller.clone();
  344. let _ = tokio::spawn(async move {
  345. loop {
  346. let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
  347. match result {
  348. Ok(event) => event.select(TrashType::TrashView),
  349. Err(_e) => None,
  350. }
  351. }));
  352. if let Some(event) = stream.next().await {
  353. handle_trash_event(
  354. persistence.clone(),
  355. data_processors.clone(),
  356. trash_controller.clone(),
  357. event,
  358. )
  359. .await
  360. }
  361. }
  362. });
  363. }
  364. async fn get_data_processor_from_view_id(
  365. &self,
  366. view_id: &str,
  367. ) -> FlowyResult<Arc<dyn ViewDataProcessor + Send + Sync>> {
  368. let view = self
  369. .persistence
  370. .begin_transaction(|transaction| transaction.read_view(view_id))
  371. .await?;
  372. self.get_data_processor(view.data_format)
  373. }
  374. #[inline]
  375. fn get_data_processor<T: Into<ViewDataFormatPB>>(
  376. &self,
  377. data_type: T,
  378. ) -> FlowyResult<Arc<dyn ViewDataProcessor + Send + Sync>> {
  379. let data_type = data_type.into();
  380. match self.data_processors.get(&data_type) {
  381. None => Err(FlowyError::internal().context(format!(
  382. "Get data processor failed. Unknown view data type: {:?}",
  383. data_type
  384. ))),
  385. Some(processor) => Ok(processor.clone()),
  386. }
  387. }
  388. }
  389. #[tracing::instrument(level = "trace", skip(persistence, data_processors, trash_can))]
  390. async fn handle_trash_event(
  391. persistence: Arc<FolderPersistence>,
  392. data_processors: ViewDataProcessorMap,
  393. trash_can: Arc<TrashController>,
  394. event: TrashEvent,
  395. ) {
  396. match event {
  397. TrashEvent::NewTrash(identifiers, ret) => {
  398. let result = persistence
  399. .begin_transaction(|transaction| {
  400. let view_revs = read_local_views_with_transaction(identifiers, &transaction)?;
  401. for view_rev in view_revs {
  402. notify_views_changed(&view_rev.app_id, trash_can.clone(), &transaction)?;
  403. notify_dart(view_rev.into(), FolderNotification::ViewDeleted);
  404. }
  405. Ok(())
  406. })
  407. .await;
  408. let _ = ret.send(result).await;
  409. }
  410. TrashEvent::Putback(identifiers, ret) => {
  411. let result = persistence
  412. .begin_transaction(|transaction| {
  413. let view_revs = read_local_views_with_transaction(identifiers, &transaction)?;
  414. for view_rev in view_revs {
  415. notify_views_changed(&view_rev.app_id, trash_can.clone(), &transaction)?;
  416. notify_dart(view_rev.into(), FolderNotification::ViewRestored);
  417. }
  418. Ok(())
  419. })
  420. .await;
  421. let _ = ret.send(result).await;
  422. }
  423. TrashEvent::Delete(identifiers, ret) => {
  424. let result = || async {
  425. let views = persistence
  426. .begin_transaction(|transaction| {
  427. let mut notify_ids = HashSet::new();
  428. let mut views = vec![];
  429. for identifier in identifiers.items {
  430. if let Ok(view_rev) = transaction.delete_view(&identifier.id) {
  431. notify_ids.insert(view_rev.app_id.clone());
  432. views.push(view_rev);
  433. }
  434. }
  435. for notify_id in notify_ids {
  436. notify_views_changed(&notify_id, trash_can.clone(), &transaction)?;
  437. }
  438. Ok(views)
  439. })
  440. .await?;
  441. for view in views {
  442. let data_type = view.data_format.clone().into();
  443. match get_data_processor(data_processors.clone(), &data_type) {
  444. Ok(processor) => {
  445. processor.close_view(&view.id).await?;
  446. }
  447. Err(e) => tracing::error!("{}", e),
  448. }
  449. }
  450. Ok(())
  451. };
  452. let _ = ret.send(result().await).await;
  453. }
  454. }
  455. }
  456. fn get_data_processor(
  457. data_processors: ViewDataProcessorMap,
  458. data_type: &ViewDataFormatPB,
  459. ) -> FlowyResult<Arc<dyn ViewDataProcessor + Send + Sync>> {
  460. match data_processors.get(data_type) {
  461. None => Err(FlowyError::internal().context(format!(
  462. "Get data processor failed. Unknown view data type: {:?}",
  463. data_type
  464. ))),
  465. Some(processor) => Ok(processor.clone()),
  466. }
  467. }
  468. fn read_local_views_with_transaction<'a>(
  469. identifiers: RepeatedTrashIdPB,
  470. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  471. ) -> Result<Vec<ViewRevision>, FlowyError> {
  472. let mut view_revs = vec![];
  473. for identifier in identifiers.items {
  474. view_revs.push(transaction.read_view(&identifier.id)?);
  475. }
  476. Ok(view_revs)
  477. }
  478. fn notify_dart(view: ViewPB, notification: FolderNotification) {
  479. send_dart_notification(&view.id, notification).payload(view).send();
  480. }
  481. #[tracing::instrument(
  482. level = "debug",
  483. skip(belong_to_id, trash_controller, transaction),
  484. fields(view_count),
  485. err
  486. )]
  487. fn notify_views_changed<'a>(
  488. belong_to_id: &str,
  489. trash_controller: Arc<TrashController>,
  490. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  491. ) -> FlowyResult<()> {
  492. let mut app_rev = transaction.read_app(belong_to_id)?;
  493. let trash_ids = trash_controller.read_trash_ids(transaction)?;
  494. app_rev.belongings.retain(|view| !trash_ids.contains(&view.id));
  495. let app: AppPB = app_rev.into();
  496. send_dart_notification(belong_to_id, FolderNotification::AppUpdated)
  497. .payload(app)
  498. .send();
  499. Ok(())
  500. }
  501. fn read_belonging_views_on_local<'a>(
  502. belong_to_id: &str,
  503. trash_controller: Arc<TrashController>,
  504. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  505. ) -> FlowyResult<Vec<ViewRevision>> {
  506. let mut view_revs = transaction.read_views(belong_to_id)?;
  507. let trash_ids = trash_controller.read_trash_ids(transaction)?;
  508. view_revs.retain(|view_table| !trash_ids.contains(&view_table.id));
  509. Ok(view_revs)
  510. }