controller.rs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. pub use crate::entities::view::ViewDataFormatPB;
  2. use crate::entities::{AppPB, DeletedViewPB, ViewLayoutTypePB};
  3. use crate::manager::{ViewDataProcessor, ViewDataProcessorMap};
  4. use crate::{
  5. entities::{
  6. trash::{RepeatedTrashIdPB, TrashType},
  7. view::{CreateViewParams, UpdateViewParams, ViewPB},
  8. },
  9. errors::{FlowyError, FlowyResult},
  10. event_map::{FolderCouldServiceV1, WorkspaceUser},
  11. notification::{send_notification, FolderNotification},
  12. services::{
  13. persistence::{FolderPersistence, FolderPersistenceTransaction, ViewChangeset},
  14. TrashController, TrashEvent,
  15. },
  16. };
  17. use bytes::Bytes;
  18. use flowy_sqlite::kv::KV;
  19. use folder_model::{gen_view_id, ViewRevision};
  20. use futures::{FutureExt, StreamExt};
  21. use std::{collections::HashSet, sync::Arc};
  22. const LATEST_VIEW_ID: &str = "latest_view_id";
  23. pub(crate) struct ViewController {
  24. user: Arc<dyn WorkspaceUser>,
  25. cloud_service: Arc<dyn FolderCouldServiceV1>,
  26. persistence: Arc<FolderPersistence>,
  27. trash_controller: Arc<TrashController>,
  28. data_processors: ViewDataProcessorMap,
  29. }
  30. impl ViewController {
  31. pub(crate) fn new(
  32. user: Arc<dyn WorkspaceUser>,
  33. persistence: Arc<FolderPersistence>,
  34. cloud_service: Arc<dyn FolderCouldServiceV1>,
  35. trash_controller: Arc<TrashController>,
  36. data_processors: ViewDataProcessorMap,
  37. ) -> Self {
  38. Self {
  39. user,
  40. cloud_service,
  41. persistence,
  42. trash_controller,
  43. data_processors,
  44. }
  45. }
  46. pub(crate) fn initialize(&self) -> Result<(), FlowyError> {
  47. self.listen_trash_can_event();
  48. Ok(())
  49. }
  50. #[tracing::instrument(level = "trace", skip(self, params), fields(name = %params.name), err)]
  51. pub(crate) async fn create_view_from_params(
  52. &self,
  53. mut params: CreateViewParams,
  54. ) -> Result<ViewRevision, FlowyError> {
  55. let processor = self.get_data_processor(params.data_format.clone())?;
  56. let user_id = self.user.user_id()?;
  57. if params.view_content_data.is_empty() {
  58. tracing::trace!("Create view with build-in data");
  59. let view_data = processor
  60. .create_default_view(
  61. &user_id,
  62. &params.view_id,
  63. params.layout.clone(),
  64. params.data_format.clone(),
  65. )
  66. .await?;
  67. params.view_content_data = view_data.to_vec();
  68. } else {
  69. tracing::trace!("Create view with view data");
  70. let delta_data = processor
  71. .create_view_from_delta_data(
  72. &user_id,
  73. &params.view_id,
  74. params.view_content_data.clone(),
  75. params.layout.clone(),
  76. )
  77. .await?;
  78. self.create_view(
  79. &params.view_id,
  80. params.data_format.clone(),
  81. params.layout.clone(),
  82. delta_data,
  83. )
  84. .await?;
  85. };
  86. let view_rev = self.create_view_on_server(params).await?;
  87. self.create_view_on_local(view_rev.clone()).await?;
  88. Ok(view_rev)
  89. }
  90. #[tracing::instrument(level = "debug", skip(self, view_id, view_data), err)]
  91. pub(crate) async fn create_view(
  92. &self,
  93. view_id: &str,
  94. data_type: ViewDataFormatPB,
  95. layout_type: ViewLayoutTypePB,
  96. view_data: Bytes,
  97. ) -> Result<(), FlowyError> {
  98. if view_data.is_empty() {
  99. return Err(FlowyError::internal().context("The content of the view should not be empty"));
  100. }
  101. let user_id = self.user.user_id()?;
  102. let processor = self.get_data_processor(data_type)?;
  103. processor.create_view(&user_id, view_id, layout_type, view_data).await?;
  104. Ok(())
  105. }
  106. pub(crate) async fn create_view_on_local(&self, view_rev: ViewRevision) -> Result<(), FlowyError> {
  107. let trash_controller = self.trash_controller.clone();
  108. self.persistence
  109. .begin_transaction(|transaction| {
  110. let belong_to_id = view_rev.app_id.clone();
  111. transaction.create_view(view_rev)?;
  112. notify_views_changed(&belong_to_id, trash_controller, &transaction)?;
  113. Ok(())
  114. })
  115. .await
  116. }
  117. #[tracing::instrument(level = "debug", skip(self, view_id), err)]
  118. pub(crate) async fn read_view(&self, view_id: &str) -> Result<ViewRevision, FlowyError> {
  119. let view_rev = self
  120. .persistence
  121. .begin_transaction(|transaction| {
  122. let view = transaction.read_view(view_id)?;
  123. let trash_ids = self.trash_controller.read_trash_ids(&transaction)?;
  124. if trash_ids.contains(&view.id) {
  125. return Err(FlowyError::record_not_found());
  126. }
  127. Ok(view)
  128. })
  129. .await?;
  130. Ok(view_rev)
  131. }
  132. pub(crate) async fn read_local_views(&self, ids: Vec<String>) -> Result<Vec<ViewRevision>, FlowyError> {
  133. self.persistence
  134. .begin_transaction(|transaction| {
  135. let mut views = vec![];
  136. for view_id in ids {
  137. views.push(transaction.read_view(&view_id)?);
  138. }
  139. Ok(views)
  140. })
  141. .await
  142. }
  143. #[tracing::instrument(level = "trace", skip(self), err)]
  144. pub(crate) fn set_latest_view(&self, view_id: &str) -> Result<(), FlowyError> {
  145. KV::set_str(LATEST_VIEW_ID, view_id.to_owned());
  146. Ok(())
  147. }
  148. #[tracing::instrument(level = "trace", skip(self))]
  149. pub(crate) fn clear_latest_view(&self) {
  150. let _ = KV::remove(LATEST_VIEW_ID);
  151. }
  152. #[tracing::instrument(level = "debug", skip(self), err)]
  153. pub(crate) async fn close_view(&self, view_id: &str) -> Result<(), FlowyError> {
  154. let processor = self.get_data_processor_from_view_id(view_id).await?;
  155. processor.close_view(view_id).await?;
  156. Ok(())
  157. }
  158. #[tracing::instrument(level = "debug", skip(self), err)]
  159. pub(crate) async fn move_view_to_trash(&self, view_id: &str) -> Result<(), FlowyError> {
  160. if let Some(latest_view_id) = KV::get_str(LATEST_VIEW_ID) {
  161. if latest_view_id == view_id {
  162. let _ = KV::remove(LATEST_VIEW_ID);
  163. }
  164. }
  165. let deleted_view = self
  166. .persistence
  167. .begin_transaction(|transaction| {
  168. let view = transaction.read_view(view_id)?;
  169. let views = read_belonging_views_on_local(&view.app_id, self.trash_controller.clone(), &transaction)?;
  170. let index = views
  171. .iter()
  172. .position(|view| view.id == view_id)
  173. .map(|index| index as i32);
  174. Ok(DeletedViewPB {
  175. view_id: view_id.to_owned(),
  176. index,
  177. })
  178. })
  179. .await?;
  180. send_notification(view_id, FolderNotification::DidMoveViewToTrash)
  181. .payload(deleted_view)
  182. .send();
  183. let processor = self.get_data_processor_from_view_id(view_id).await?;
  184. processor.close_view(view_id).await?;
  185. Ok(())
  186. }
  187. #[tracing::instrument(level = "debug", skip(self), err)]
  188. pub(crate) async fn move_view(&self, view_id: &str, from: usize, to: usize) -> Result<(), FlowyError> {
  189. self.persistence
  190. .begin_transaction(|transaction| {
  191. transaction.move_view(view_id, from, to)?;
  192. let view = transaction.read_view(view_id)?;
  193. notify_views_changed(&view.app_id, self.trash_controller.clone(), &transaction)?;
  194. Ok(())
  195. })
  196. .await?;
  197. Ok(())
  198. }
  199. #[tracing::instrument(level = "debug", skip(self), err)]
  200. pub(crate) async fn duplicate_view(&self, view: ViewPB) -> Result<(), FlowyError> {
  201. let view_rev = self
  202. .persistence
  203. .begin_transaction(|transaction| transaction.read_view(&view.id))
  204. .await?;
  205. let processor = self.get_data_processor(view_rev.data_format.clone())?;
  206. let view_data = processor.get_view_data(&view).await?;
  207. let duplicate_params = CreateViewParams {
  208. belong_to_id: view_rev.app_id.clone(),
  209. name: format!("{} (copy)", &view_rev.name),
  210. desc: view_rev.desc,
  211. thumbnail: view_rev.thumbnail,
  212. data_format: view_rev.data_format.into(),
  213. layout: view_rev.layout.into(),
  214. view_content_data: view_data.to_vec(),
  215. view_id: gen_view_id(),
  216. };
  217. let _ = self.create_view_from_params(duplicate_params).await?;
  218. Ok(())
  219. }
  220. // belong_to_id will be the app_id or view_id.
  221. #[tracing::instrument(level = "trace", skip(self), err)]
  222. pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result<Vec<ViewRevision>, FlowyError> {
  223. self.persistence
  224. .begin_transaction(|transaction| {
  225. read_belonging_views_on_local(belong_to_id, self.trash_controller.clone(), &transaction)
  226. })
  227. .await
  228. }
  229. #[tracing::instrument(level = "debug", skip(self, params), err)]
  230. pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result<ViewRevision, FlowyError> {
  231. let changeset = ViewChangeset::new(params.clone());
  232. let view_id = changeset.id.clone();
  233. let view_rev = self
  234. .persistence
  235. .begin_transaction(|transaction| {
  236. transaction.update_view(changeset)?;
  237. let view_rev = transaction.read_view(&view_id)?;
  238. let view: ViewPB = view_rev.clone().into();
  239. send_notification(&view_id, FolderNotification::DidUpdateView)
  240. .payload(view)
  241. .send();
  242. notify_views_changed(&view_rev.app_id, self.trash_controller.clone(), &transaction)?;
  243. Ok(view_rev)
  244. })
  245. .await?;
  246. let _ = self.update_view_on_server(params);
  247. Ok(view_rev)
  248. }
  249. pub(crate) async fn latest_visit_view(&self) -> FlowyResult<Option<ViewRevision>> {
  250. match KV::get_str(LATEST_VIEW_ID) {
  251. None => Ok(None),
  252. Some(view_id) => {
  253. let view_rev = self
  254. .persistence
  255. .begin_transaction(|transaction| transaction.read_view(&view_id))
  256. .await?;
  257. Ok(Some(view_rev))
  258. }
  259. }
  260. }
  261. }
  262. impl ViewController {
  263. #[tracing::instrument(level = "debug", skip(self, params), err)]
  264. async fn create_view_on_server(&self, params: CreateViewParams) -> Result<ViewRevision, FlowyError> {
  265. let token = self.user.token()?;
  266. let view_rev = self.cloud_service.create_view(&token, params).await?;
  267. Ok(view_rev)
  268. }
  269. #[tracing::instrument(level = "debug", skip(self), err)]
  270. fn update_view_on_server(&self, params: UpdateViewParams) -> Result<(), FlowyError> {
  271. let token = self.user.token()?;
  272. let server = self.cloud_service.clone();
  273. tokio::spawn(async move {
  274. match server.update_view(&token, params).await {
  275. Ok(_) => {}
  276. Err(e) => {
  277. // TODO: retry?
  278. log::error!("Update view failed: {:?}", e);
  279. }
  280. }
  281. });
  282. Ok(())
  283. }
  284. fn listen_trash_can_event(&self) {
  285. let mut rx = self.trash_controller.subscribe();
  286. let persistence = self.persistence.clone();
  287. let data_processors = self.data_processors.clone();
  288. let trash_controller = self.trash_controller.clone();
  289. let _ = tokio::spawn(async move {
  290. loop {
  291. let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
  292. match result {
  293. Ok(event) => event.select(TrashType::TrashView),
  294. Err(_e) => None,
  295. }
  296. }));
  297. if let Some(event) = stream.next().await {
  298. handle_trash_event(
  299. persistence.clone(),
  300. data_processors.clone(),
  301. trash_controller.clone(),
  302. event,
  303. )
  304. .await
  305. }
  306. }
  307. });
  308. }
  309. async fn get_data_processor_from_view_id(
  310. &self,
  311. view_id: &str,
  312. ) -> FlowyResult<Arc<dyn ViewDataProcessor + Send + Sync>> {
  313. let view = self
  314. .persistence
  315. .begin_transaction(|transaction| transaction.read_view(view_id))
  316. .await?;
  317. self.get_data_processor(view.data_format)
  318. }
  319. #[inline]
  320. fn get_data_processor<T: Into<ViewDataFormatPB>>(
  321. &self,
  322. data_type: T,
  323. ) -> FlowyResult<Arc<dyn ViewDataProcessor + Send + Sync>> {
  324. let data_type = data_type.into();
  325. match self.data_processors.get(&data_type) {
  326. None => Err(FlowyError::internal().context(format!(
  327. "Get data processor failed. Unknown view data type: {:?}",
  328. data_type
  329. ))),
  330. Some(processor) => Ok(processor.clone()),
  331. }
  332. }
  333. }
  334. #[tracing::instrument(level = "trace", skip(persistence, data_processors, trash_can))]
  335. async fn handle_trash_event(
  336. persistence: Arc<FolderPersistence>,
  337. data_processors: ViewDataProcessorMap,
  338. trash_can: Arc<TrashController>,
  339. event: TrashEvent,
  340. ) {
  341. match event {
  342. TrashEvent::NewTrash(identifiers, ret) => {
  343. let result = persistence
  344. .begin_transaction(|transaction| {
  345. let view_revs = read_local_views_with_transaction(identifiers, &transaction)?;
  346. for view_rev in view_revs {
  347. notify_views_changed(&view_rev.app_id, trash_can.clone(), &transaction)?;
  348. notify_dart(view_rev.into(), FolderNotification::DidDeleteView);
  349. }
  350. Ok(())
  351. })
  352. .await;
  353. let _ = ret.send(result).await;
  354. }
  355. TrashEvent::Putback(identifiers, ret) => {
  356. let result = persistence
  357. .begin_transaction(|transaction| {
  358. let view_revs = read_local_views_with_transaction(identifiers, &transaction)?;
  359. for view_rev in view_revs {
  360. notify_views_changed(&view_rev.app_id, trash_can.clone(), &transaction)?;
  361. notify_dart(view_rev.into(), FolderNotification::DidRestoreView);
  362. }
  363. Ok(())
  364. })
  365. .await;
  366. let _ = ret.send(result).await;
  367. }
  368. TrashEvent::Delete(identifiers, ret) => {
  369. let result = || async {
  370. let views = persistence
  371. .begin_transaction(|transaction| {
  372. let mut notify_ids = HashSet::new();
  373. let mut views = vec![];
  374. for identifier in identifiers.items {
  375. if let Ok(view_rev) = transaction.delete_view(&identifier.id) {
  376. notify_ids.insert(view_rev.app_id.clone());
  377. views.push(view_rev);
  378. }
  379. }
  380. for notify_id in notify_ids {
  381. notify_views_changed(&notify_id, trash_can.clone(), &transaction)?;
  382. }
  383. Ok(views)
  384. })
  385. .await?;
  386. for view in views {
  387. let data_type = view.data_format.clone().into();
  388. match get_data_processor(data_processors.clone(), &data_type) {
  389. Ok(processor) => {
  390. processor.close_view(&view.id).await?;
  391. }
  392. Err(e) => tracing::error!("{}", e),
  393. }
  394. }
  395. Ok(())
  396. };
  397. let _ = ret.send(result().await).await;
  398. }
  399. }
  400. }
  401. fn get_data_processor(
  402. data_processors: ViewDataProcessorMap,
  403. data_type: &ViewDataFormatPB,
  404. ) -> FlowyResult<Arc<dyn ViewDataProcessor + Send + Sync>> {
  405. match data_processors.get(data_type) {
  406. None => Err(FlowyError::internal().context(format!(
  407. "Get data processor failed. Unknown view data type: {:?}",
  408. data_type
  409. ))),
  410. Some(processor) => Ok(processor.clone()),
  411. }
  412. }
  413. fn read_local_views_with_transaction<'a>(
  414. identifiers: RepeatedTrashIdPB,
  415. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  416. ) -> Result<Vec<ViewRevision>, FlowyError> {
  417. let mut view_revs = vec![];
  418. for identifier in identifiers.items {
  419. view_revs.push(transaction.read_view(&identifier.id)?);
  420. }
  421. Ok(view_revs)
  422. }
  423. fn notify_dart(view: ViewPB, notification: FolderNotification) {
  424. send_notification(&view.id, notification).payload(view).send();
  425. }
  426. #[tracing::instrument(
  427. level = "debug",
  428. skip(belong_to_id, trash_controller, transaction),
  429. fields(view_count),
  430. err
  431. )]
  432. fn notify_views_changed<'a>(
  433. belong_to_id: &str,
  434. trash_controller: Arc<TrashController>,
  435. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  436. ) -> FlowyResult<()> {
  437. let mut app_rev = transaction.read_app(belong_to_id)?;
  438. let trash_ids = trash_controller.read_trash_ids(transaction)?;
  439. app_rev.belongings.retain(|view| !trash_ids.contains(&view.id));
  440. let app: AppPB = app_rev.into();
  441. send_notification(belong_to_id, FolderNotification::DidUpdateApp)
  442. .payload(app)
  443. .send();
  444. Ok(())
  445. }
  446. fn read_belonging_views_on_local<'a>(
  447. belong_to_id: &str,
  448. trash_controller: Arc<TrashController>,
  449. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  450. ) -> FlowyResult<Vec<ViewRevision>> {
  451. let mut view_revs = transaction.read_views(belong_to_id)?;
  452. let trash_ids = trash_controller.read_trash_ids(transaction)?;
  453. view_revs.retain(|view_table| !trash_ids.contains(&view_table.id));
  454. Ok(view_revs)
  455. }