controller.rs 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452
  1. use crate::manager::{ViewDataProcessor, ViewDataProcessorMap};
  2. use crate::{
  3. dart_notification::{send_dart_notification, FolderNotification},
  4. entities::{
  5. trash::{RepeatedTrashId, TrashType},
  6. view::{CreateViewParams, RepeatedView, UpdateViewParams, View, ViewId},
  7. },
  8. errors::{FlowyError, FlowyResult},
  9. event_map::{FolderCouldServiceV1, WorkspaceUser},
  10. services::{
  11. persistence::{FolderPersistence, FolderPersistenceTransaction, ViewChangeset},
  12. TrashController, TrashEvent,
  13. },
  14. };
  15. use bytes::Bytes;
  16. use flowy_database::kv::KV;
  17. use flowy_folder_data_model::entities::view::ViewDataType;
  18. use flowy_sync::entities::text_block_info::TextBlockId;
  19. use futures::{FutureExt, StreamExt};
  20. use lib_infra::uuid;
  21. use std::{collections::HashSet, sync::Arc};
  22. const LATEST_VIEW_ID: &str = "latest_view_id";
  23. pub(crate) struct ViewController {
  24. user: Arc<dyn WorkspaceUser>,
  25. cloud_service: Arc<dyn FolderCouldServiceV1>,
  26. persistence: Arc<FolderPersistence>,
  27. trash_controller: Arc<TrashController>,
  28. data_processors: ViewDataProcessorMap,
  29. }
  30. impl ViewController {
  31. pub(crate) fn new(
  32. user: Arc<dyn WorkspaceUser>,
  33. persistence: Arc<FolderPersistence>,
  34. cloud_service: Arc<dyn FolderCouldServiceV1>,
  35. trash_controller: Arc<TrashController>,
  36. data_processors: ViewDataProcessorMap,
  37. ) -> Self {
  38. Self {
  39. user,
  40. cloud_service,
  41. persistence,
  42. trash_controller,
  43. data_processors,
  44. }
  45. }
  46. pub(crate) fn initialize(&self) -> Result<(), FlowyError> {
  47. self.listen_trash_can_event();
  48. Ok(())
  49. }
  50. #[tracing::instrument(level = "trace", skip(self, params), fields(name = %params.name), err)]
  51. pub(crate) async fn create_view_from_params(&self, mut params: CreateViewParams) -> Result<View, FlowyError> {
  52. let processor = self.get_data_processor(&params.data_type)?;
  53. let user_id = self.user.user_id()?;
  54. if params.data.is_empty() {
  55. let view_data = processor.create_default_view(&user_id, &params.view_id).await?;
  56. params.data = view_data.to_vec();
  57. } else {
  58. let delta_data = processor
  59. .process_create_view_data(&user_id, &params.view_id, params.data.clone())
  60. .await?;
  61. let _ = self
  62. .create_view(&params.view_id, params.data_type.clone(), delta_data)
  63. .await?;
  64. };
  65. let view = self.create_view_on_server(params).await?;
  66. let _ = self.create_view_on_local(view.clone()).await?;
  67. Ok(view)
  68. }
  69. #[tracing::instrument(level = "debug", skip(self, view_id, delta_data), err)]
  70. pub(crate) async fn create_view(
  71. &self,
  72. view_id: &str,
  73. data_type: ViewDataType,
  74. delta_data: Bytes,
  75. ) -> Result<(), FlowyError> {
  76. if delta_data.is_empty() {
  77. return Err(FlowyError::internal().context("The content of the view should not be empty"));
  78. }
  79. let user_id = self.user.user_id()?;
  80. let processor = self.get_data_processor(&data_type)?;
  81. let _ = processor.create_container(&user_id, view_id, delta_data).await?;
  82. Ok(())
  83. }
  84. pub(crate) async fn create_view_on_local(&self, view: View) -> Result<(), FlowyError> {
  85. let trash_controller = self.trash_controller.clone();
  86. self.persistence
  87. .begin_transaction(|transaction| {
  88. let belong_to_id = view.belong_to_id.clone();
  89. let _ = transaction.create_view(view)?;
  90. let _ = notify_views_changed(&belong_to_id, trash_controller, &transaction)?;
  91. Ok(())
  92. })
  93. .await
  94. }
  95. #[tracing::instrument(skip(self, view_id), fields(view_id = %view_id.value), err)]
  96. pub(crate) async fn read_view(&self, view_id: ViewId) -> Result<View, FlowyError> {
  97. let view = self
  98. .persistence
  99. .begin_transaction(|transaction| {
  100. let view = transaction.read_view(&view_id.value)?;
  101. let trash_ids = self.trash_controller.read_trash_ids(&transaction)?;
  102. if trash_ids.contains(&view.id) {
  103. return Err(FlowyError::record_not_found());
  104. }
  105. Ok(view)
  106. })
  107. .await?;
  108. let _ = self.read_view_on_server(view_id);
  109. Ok(view)
  110. }
  111. pub(crate) async fn read_local_views(&self, ids: Vec<String>) -> Result<Vec<View>, FlowyError> {
  112. self.persistence
  113. .begin_transaction(|transaction| {
  114. let mut views = vec![];
  115. for view_id in ids {
  116. views.push(transaction.read_view(&view_id)?);
  117. }
  118. Ok(views)
  119. })
  120. .await
  121. }
  122. #[tracing::instrument(level = "debug", skip(self), err)]
  123. pub(crate) fn set_latest_view(&self, view_id: &str) -> Result<(), FlowyError> {
  124. KV::set_str(LATEST_VIEW_ID, view_id.to_owned());
  125. Ok(())
  126. }
  127. #[tracing::instrument(level = "debug", skip(self), err)]
  128. pub(crate) async fn close_view(&self, view_id: &str) -> Result<(), FlowyError> {
  129. let processor = self.get_data_processor_from_view_id(view_id).await?;
  130. let _ = processor.close_container(view_id).await?;
  131. Ok(())
  132. }
  133. #[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.value), err)]
  134. pub(crate) async fn delete_view(&self, params: TextBlockId) -> Result<(), FlowyError> {
  135. if let Some(view_id) = KV::get_str(LATEST_VIEW_ID) {
  136. if view_id == params.value {
  137. let _ = KV::remove(LATEST_VIEW_ID);
  138. }
  139. }
  140. let processor = self.get_data_processor_from_view_id(&params.value).await?;
  141. let _ = processor.delete_container(&params.value).await?;
  142. Ok(())
  143. }
  144. #[tracing::instrument(level = "debug", skip(self), err)]
  145. pub(crate) async fn duplicate_view(&self, view_id: &str) -> Result<(), FlowyError> {
  146. let view = self
  147. .persistence
  148. .begin_transaction(|transaction| transaction.read_view(view_id))
  149. .await?;
  150. let processor = self.get_data_processor(&view.data_type)?;
  151. let delta_bytes = processor.delta_bytes(view_id).await?;
  152. let duplicate_params = CreateViewParams {
  153. belong_to_id: view.belong_to_id.clone(),
  154. name: format!("{} (copy)", &view.name),
  155. desc: view.desc,
  156. thumbnail: view.thumbnail,
  157. data_type: view.data_type,
  158. data: delta_bytes.to_vec(),
  159. view_id: uuid(),
  160. plugin_type: view.plugin_type,
  161. };
  162. let _ = self.create_view_from_params(duplicate_params).await?;
  163. Ok(())
  164. }
  165. // belong_to_id will be the app_id or view_id.
  166. #[tracing::instrument(level = "debug", skip(self), err)]
  167. pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result<RepeatedView, FlowyError> {
  168. self.persistence
  169. .begin_transaction(|transaction| {
  170. read_belonging_views_on_local(belong_to_id, self.trash_controller.clone(), &transaction)
  171. })
  172. .await
  173. }
  174. #[tracing::instrument(level = "debug", skip(self, params), err)]
  175. pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result<View, FlowyError> {
  176. let changeset = ViewChangeset::new(params.clone());
  177. let view_id = changeset.id.clone();
  178. let view = self
  179. .persistence
  180. .begin_transaction(|transaction| {
  181. let _ = transaction.update_view(changeset)?;
  182. let view = transaction.read_view(&view_id)?;
  183. send_dart_notification(&view_id, FolderNotification::ViewUpdated)
  184. .payload(view.clone())
  185. .send();
  186. let _ = notify_views_changed(&view.belong_to_id, self.trash_controller.clone(), &transaction)?;
  187. Ok(view)
  188. })
  189. .await?;
  190. let _ = self.update_view_on_server(params);
  191. Ok(view)
  192. }
  193. pub(crate) async fn latest_visit_view(&self) -> FlowyResult<Option<View>> {
  194. match KV::get_str(LATEST_VIEW_ID) {
  195. None => Ok(None),
  196. Some(view_id) => {
  197. let view = self
  198. .persistence
  199. .begin_transaction(|transaction| transaction.read_view(&view_id))
  200. .await?;
  201. Ok(Some(view))
  202. }
  203. }
  204. }
  205. }
  206. impl ViewController {
  207. #[tracing::instrument(skip(self), err)]
  208. async fn create_view_on_server(&self, params: CreateViewParams) -> Result<View, FlowyError> {
  209. let token = self.user.token()?;
  210. let view = self.cloud_service.create_view(&token, params).await?;
  211. Ok(view)
  212. }
  213. #[tracing::instrument(skip(self), err)]
  214. fn update_view_on_server(&self, params: UpdateViewParams) -> Result<(), FlowyError> {
  215. let token = self.user.token()?;
  216. let server = self.cloud_service.clone();
  217. tokio::spawn(async move {
  218. match server.update_view(&token, params).await {
  219. Ok(_) => {}
  220. Err(e) => {
  221. // TODO: retry?
  222. log::error!("Update view failed: {:?}", e);
  223. }
  224. }
  225. });
  226. Ok(())
  227. }
  228. #[tracing::instrument(skip(self), err)]
  229. fn read_view_on_server(&self, params: ViewId) -> Result<(), FlowyError> {
  230. let token = self.user.token()?;
  231. let server = self.cloud_service.clone();
  232. let persistence = self.persistence.clone();
  233. // TODO: Retry with RetryAction?
  234. tokio::spawn(async move {
  235. match server.read_view(&token, params).await {
  236. Ok(Some(view)) => {
  237. match persistence
  238. .begin_transaction(|transaction| transaction.create_view(view.clone()))
  239. .await
  240. {
  241. Ok(_) => {
  242. send_dart_notification(&view.id, FolderNotification::ViewUpdated)
  243. .payload(view.clone())
  244. .send();
  245. }
  246. Err(e) => log::error!("Save view failed: {:?}", e),
  247. }
  248. }
  249. Ok(None) => {}
  250. Err(e) => log::error!("Read view failed: {:?}", e),
  251. }
  252. });
  253. Ok(())
  254. }
  255. fn listen_trash_can_event(&self) {
  256. let mut rx = self.trash_controller.subscribe();
  257. let persistence = self.persistence.clone();
  258. let data_processors = self.data_processors.clone();
  259. let trash_controller = self.trash_controller.clone();
  260. let _ = tokio::spawn(async move {
  261. loop {
  262. let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
  263. match result {
  264. Ok(event) => event.select(TrashType::TrashView),
  265. Err(_e) => None,
  266. }
  267. }));
  268. if let Some(event) = stream.next().await {
  269. handle_trash_event(
  270. persistence.clone(),
  271. data_processors.clone(),
  272. trash_controller.clone(),
  273. event,
  274. )
  275. .await
  276. }
  277. }
  278. });
  279. }
  280. async fn get_data_processor_from_view_id(
  281. &self,
  282. view_id: &str,
  283. ) -> FlowyResult<Arc<dyn ViewDataProcessor + Send + Sync>> {
  284. let view = self
  285. .persistence
  286. .begin_transaction(|transaction| transaction.read_view(view_id))
  287. .await?;
  288. self.get_data_processor(&view.data_type)
  289. }
  290. #[inline]
  291. fn get_data_processor(&self, data_type: &ViewDataType) -> FlowyResult<Arc<dyn ViewDataProcessor + Send + Sync>> {
  292. match self.data_processors.get(data_type) {
  293. None => Err(FlowyError::internal().context(format!(
  294. "Get data processor failed. Unknown view data type: {:?}",
  295. data_type
  296. ))),
  297. Some(processor) => Ok(processor.clone()),
  298. }
  299. }
  300. }
  301. #[tracing::instrument(level = "trace", skip(persistence, data_processors, trash_can))]
  302. async fn handle_trash_event(
  303. persistence: Arc<FolderPersistence>,
  304. data_processors: ViewDataProcessorMap,
  305. trash_can: Arc<TrashController>,
  306. event: TrashEvent,
  307. ) {
  308. match event {
  309. TrashEvent::NewTrash(identifiers, ret) => {
  310. let result = persistence
  311. .begin_transaction(|transaction| {
  312. let views = read_local_views_with_transaction(identifiers, &transaction)?;
  313. for view in views {
  314. let _ = notify_views_changed(&view.belong_to_id, trash_can.clone(), &transaction)?;
  315. notify_dart(view, FolderNotification::ViewDeleted);
  316. }
  317. Ok(())
  318. })
  319. .await;
  320. let _ = ret.send(result).await;
  321. }
  322. TrashEvent::Putback(identifiers, ret) => {
  323. let result = persistence
  324. .begin_transaction(|transaction| {
  325. let views = read_local_views_with_transaction(identifiers, &transaction)?;
  326. for view in views {
  327. let _ = notify_views_changed(&view.belong_to_id, trash_can.clone(), &transaction)?;
  328. notify_dart(view, FolderNotification::ViewRestored);
  329. }
  330. Ok(())
  331. })
  332. .await;
  333. let _ = ret.send(result).await;
  334. }
  335. TrashEvent::Delete(identifiers, ret) => {
  336. let result = || async {
  337. let views = persistence
  338. .begin_transaction(|transaction| {
  339. let mut notify_ids = HashSet::new();
  340. let mut views = vec![];
  341. for identifier in identifiers.items {
  342. let view = transaction.read_view(&identifier.id)?;
  343. let _ = transaction.delete_view(&view.id)?;
  344. notify_ids.insert(view.belong_to_id.clone());
  345. views.push(view);
  346. }
  347. for notify_id in notify_ids {
  348. let _ = notify_views_changed(&notify_id, trash_can.clone(), &transaction)?;
  349. }
  350. Ok(views)
  351. })
  352. .await?;
  353. for view in views {
  354. match get_data_processor(data_processors.clone(), &view.data_type) {
  355. Ok(processor) => {
  356. let _ = processor.close_container(&view.id).await?;
  357. }
  358. Err(e) => {
  359. tracing::error!("{}", e)
  360. }
  361. }
  362. }
  363. Ok(())
  364. };
  365. let _ = ret.send(result().await).await;
  366. }
  367. }
  368. }
  369. fn get_data_processor(
  370. data_processors: ViewDataProcessorMap,
  371. data_type: &ViewDataType,
  372. ) -> FlowyResult<Arc<dyn ViewDataProcessor + Send + Sync>> {
  373. match data_processors.get(data_type) {
  374. None => Err(FlowyError::internal().context(format!(
  375. "Get data processor failed. Unknown view data type: {:?}",
  376. data_type
  377. ))),
  378. Some(processor) => Ok(processor.clone()),
  379. }
  380. }
  381. fn read_local_views_with_transaction<'a>(
  382. identifiers: RepeatedTrashId,
  383. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  384. ) -> Result<Vec<View>, FlowyError> {
  385. let mut views = vec![];
  386. for identifier in identifiers.items {
  387. let view = transaction.read_view(&identifier.id)?;
  388. views.push(view);
  389. }
  390. Ok(views)
  391. }
  392. fn notify_dart(view: View, notification: FolderNotification) {
  393. send_dart_notification(&view.id, notification).payload(view).send();
  394. }
  395. #[tracing::instrument(skip(belong_to_id, trash_controller, transaction), fields(view_count), err)]
  396. fn notify_views_changed<'a>(
  397. belong_to_id: &str,
  398. trash_controller: Arc<TrashController>,
  399. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  400. ) -> FlowyResult<()> {
  401. let repeated_view = read_belonging_views_on_local(belong_to_id, trash_controller.clone(), transaction)?;
  402. tracing::Span::current().record("view_count", &format!("{}", repeated_view.len()).as_str());
  403. send_dart_notification(belong_to_id, FolderNotification::AppViewsChanged)
  404. .payload(repeated_view)
  405. .send();
  406. Ok(())
  407. }
  408. fn read_belonging_views_on_local<'a>(
  409. belong_to_id: &str,
  410. trash_controller: Arc<TrashController>,
  411. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  412. ) -> FlowyResult<RepeatedView> {
  413. let mut views = transaction.read_views(belong_to_id)?;
  414. let trash_ids = trash_controller.read_trash_ids(transaction)?;
  415. views.retain(|view_table| !trash_ids.contains(&view_table.id));
  416. Ok(RepeatedView { items: views })
  417. }