controller.rs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. use bytes::Bytes;
  2. use flowy_collaboration::entities::{
  3. document_info::{BlockDelta, BlockId},
  4. revision::{RepeatedRevision, Revision},
  5. };
  6. use flowy_collaboration::client_document::default::initial_quill_delta_string;
  7. use futures::{FutureExt, StreamExt};
  8. use std::collections::HashMap;
  9. use std::{collections::HashSet, sync::Arc};
  10. use crate::manager::DataProcessorMap;
  11. use crate::{
  12. dart_notification::{send_dart_notification, FolderNotification},
  13. entities::{
  14. trash::{RepeatedTrashId, TrashType},
  15. view::{CreateViewParams, RepeatedView, UpdateViewParams, View, ViewId},
  16. },
  17. errors::{FlowyError, FlowyResult},
  18. event_map::{FolderCouldServiceV1, WorkspaceUser},
  19. services::{
  20. persistence::{FolderPersistence, FolderPersistenceTransaction, ViewChangeset},
  21. TrashController, TrashEvent,
  22. },
  23. };
  24. use flowy_block::BlockManager;
  25. use flowy_database::kv::KV;
  26. use flowy_folder_data_model::entities::view::ViewDataType;
  27. use lib_infra::uuid;
  28. const LATEST_VIEW_ID: &str = "latest_view_id";
  29. pub(crate) struct ViewController {
  30. user: Arc<dyn WorkspaceUser>,
  31. cloud_service: Arc<dyn FolderCouldServiceV1>,
  32. persistence: Arc<FolderPersistence>,
  33. trash_controller: Arc<TrashController>,
  34. data_processors: DataProcessorMap,
  35. block_manager: Arc<BlockManager>,
  36. }
  37. impl ViewController {
  38. pub(crate) fn new(
  39. user: Arc<dyn WorkspaceUser>,
  40. persistence: Arc<FolderPersistence>,
  41. cloud_service: Arc<dyn FolderCouldServiceV1>,
  42. trash_controller: Arc<TrashController>,
  43. data_processors: DataProcessorMap,
  44. block_manager: Arc<BlockManager>,
  45. ) -> Self {
  46. Self {
  47. user,
  48. cloud_service,
  49. persistence,
  50. trash_controller,
  51. data_processors,
  52. block_manager,
  53. }
  54. }
  55. pub(crate) fn initialize(&self) -> Result<(), FlowyError> {
  56. let _ = self.block_manager.init()?;
  57. self.listen_trash_can_event();
  58. Ok(())
  59. }
  60. #[tracing::instrument(level = "trace", skip(self, params), fields(name = %params.name), err)]
  61. pub(crate) async fn create_view_from_params(&self, params: CreateViewParams) -> Result<View, FlowyError> {
  62. let view_data = if params.data.is_empty() {
  63. initial_quill_delta_string()
  64. } else {
  65. params.data.clone()
  66. };
  67. let _ = self.create_view(&params.view_id, Bytes::from(view_data)).await?;
  68. let view = self.create_view_on_server(params).await?;
  69. let _ = self.create_view_on_local(view.clone()).await?;
  70. Ok(view)
  71. }
  72. #[tracing::instrument(level = "debug", skip(self, view_id, delta_data), err)]
  73. pub(crate) async fn create_view(&self, view_id: &str, delta_data: Bytes) -> Result<(), FlowyError> {
  74. if delta_data.is_empty() {
  75. return Err(FlowyError::internal().context("The content of the view should not be empty"));
  76. }
  77. let user_id = self.user.user_id()?;
  78. let repeated_revision: RepeatedRevision = Revision::initial_revision(&user_id, view_id, delta_data).into();
  79. let _ = self.block_manager.create_block(view_id, repeated_revision).await?;
  80. Ok(())
  81. }
  82. pub(crate) async fn create_view_on_local(&self, view: View) -> Result<(), FlowyError> {
  83. let trash_controller = self.trash_controller.clone();
  84. self.persistence
  85. .begin_transaction(|transaction| {
  86. let belong_to_id = view.belong_to_id.clone();
  87. let _ = transaction.create_view(view)?;
  88. let _ = notify_views_changed(&belong_to_id, trash_controller, &transaction)?;
  89. Ok(())
  90. })
  91. .await
  92. }
  93. #[tracing::instrument(skip(self, view_id), fields(view_id = %view_id.value), err)]
  94. pub(crate) async fn read_view(&self, view_id: ViewId) -> Result<View, FlowyError> {
  95. let view = self
  96. .persistence
  97. .begin_transaction(|transaction| {
  98. let view = transaction.read_view(&view_id.value)?;
  99. let trash_ids = self.trash_controller.read_trash_ids(&transaction)?;
  100. if trash_ids.contains(&view.id) {
  101. return Err(FlowyError::record_not_found());
  102. }
  103. Ok(view)
  104. })
  105. .await?;
  106. let _ = self.read_view_on_server(view_id);
  107. Ok(view)
  108. }
  109. pub(crate) async fn read_local_views(&self, ids: Vec<String>) -> Result<Vec<View>, FlowyError> {
  110. self.persistence
  111. .begin_transaction(|transaction| {
  112. let mut views = vec![];
  113. for view_id in ids {
  114. views.push(transaction.read_view(&view_id)?);
  115. }
  116. Ok(views)
  117. })
  118. .await
  119. }
  120. #[tracing::instrument(level = "debug", skip(self), err)]
  121. pub(crate) async fn open_view(&self, view_id: &str) -> Result<BlockDelta, FlowyError> {
  122. let editor = self.block_manager.open_block(view_id).await?;
  123. let delta_str = editor.delta_str().await?;
  124. KV::set_str(LATEST_VIEW_ID, view_id.to_owned());
  125. Ok(BlockDelta {
  126. block_id: view_id.to_string(),
  127. delta_str,
  128. })
  129. }
  130. #[tracing::instrument(level = "debug", skip(self), err)]
  131. pub(crate) async fn close_view(&self, doc_id: &str) -> Result<(), FlowyError> {
  132. let _ = self.block_manager.close_block(doc_id)?;
  133. Ok(())
  134. }
  135. #[tracing::instrument(level = "debug", skip(self,params), fields(doc_id = %params.value), err)]
  136. pub(crate) async fn delete_view(&self, params: BlockId) -> Result<(), FlowyError> {
  137. if let Some(view_id) = KV::get_str(LATEST_VIEW_ID) {
  138. if view_id == params.value {
  139. let _ = KV::remove(LATEST_VIEW_ID);
  140. }
  141. }
  142. let _ = self.block_manager.close_block(&params.value)?;
  143. Ok(())
  144. }
  145. #[tracing::instrument(level = "debug", skip(self), err)]
  146. pub(crate) async fn duplicate_view(&self, view_id: &str) -> Result<(), FlowyError> {
  147. let view = self
  148. .persistence
  149. .begin_transaction(|transaction| transaction.read_view(view_id))
  150. .await?;
  151. let editor = self.block_manager.open_block(view_id).await?;
  152. let delta_str = editor.delta_str().await?;
  153. let duplicate_params = CreateViewParams {
  154. belong_to_id: view.belong_to_id.clone(),
  155. name: format!("{} (copy)", &view.name),
  156. desc: view.desc,
  157. thumbnail: view.thumbnail,
  158. data_type: view.data_type,
  159. data: delta_str,
  160. view_id: uuid(),
  161. ext_data: view.ext_data,
  162. plugin_type: view.plugin_type,
  163. };
  164. let _ = self.create_view_from_params(duplicate_params).await?;
  165. Ok(())
  166. }
  167. // belong_to_id will be the app_id or view_id.
  168. #[tracing::instrument(level = "debug", skip(self), err)]
  169. pub(crate) async fn read_views_belong_to(&self, belong_to_id: &str) -> Result<RepeatedView, FlowyError> {
  170. self.persistence
  171. .begin_transaction(|transaction| {
  172. read_belonging_views_on_local(belong_to_id, self.trash_controller.clone(), &transaction)
  173. })
  174. .await
  175. }
  176. #[tracing::instrument(level = "debug", skip(self, params), err)]
  177. pub(crate) async fn update_view(&self, params: UpdateViewParams) -> Result<View, FlowyError> {
  178. let changeset = ViewChangeset::new(params.clone());
  179. let view_id = changeset.id.clone();
  180. let view = self
  181. .persistence
  182. .begin_transaction(|transaction| {
  183. let _ = transaction.update_view(changeset)?;
  184. let view = transaction.read_view(&view_id)?;
  185. send_dart_notification(&view_id, FolderNotification::ViewUpdated)
  186. .payload(view.clone())
  187. .send();
  188. let _ = notify_views_changed(&view.belong_to_id, self.trash_controller.clone(), &transaction)?;
  189. Ok(view)
  190. })
  191. .await?;
  192. let _ = self.update_view_on_server(params);
  193. Ok(view)
  194. }
  195. pub(crate) async fn latest_visit_view(&self) -> FlowyResult<Option<View>> {
  196. match KV::get_str(LATEST_VIEW_ID) {
  197. None => Ok(None),
  198. Some(view_id) => {
  199. let view = self
  200. .persistence
  201. .begin_transaction(|transaction| transaction.read_view(&view_id))
  202. .await?;
  203. Ok(Some(view))
  204. }
  205. }
  206. }
  207. pub(crate) fn set_latest_view(&self, view: &View) {
  208. KV::set_str(LATEST_VIEW_ID, view.id.clone());
  209. }
  210. }
  211. impl ViewController {
  212. #[tracing::instrument(skip(self), err)]
  213. async fn create_view_on_server(&self, params: CreateViewParams) -> Result<View, FlowyError> {
  214. let token = self.user.token()?;
  215. let view = self.cloud_service.create_view(&token, params).await?;
  216. Ok(view)
  217. }
  218. #[tracing::instrument(skip(self), err)]
  219. fn update_view_on_server(&self, params: UpdateViewParams) -> Result<(), FlowyError> {
  220. let token = self.user.token()?;
  221. let server = self.cloud_service.clone();
  222. tokio::spawn(async move {
  223. match server.update_view(&token, params).await {
  224. Ok(_) => {}
  225. Err(e) => {
  226. // TODO: retry?
  227. log::error!("Update view failed: {:?}", e);
  228. }
  229. }
  230. });
  231. Ok(())
  232. }
  233. #[tracing::instrument(skip(self), err)]
  234. fn read_view_on_server(&self, params: ViewId) -> Result<(), FlowyError> {
  235. let token = self.user.token()?;
  236. let server = self.cloud_service.clone();
  237. let persistence = self.persistence.clone();
  238. // TODO: Retry with RetryAction?
  239. tokio::spawn(async move {
  240. match server.read_view(&token, params).await {
  241. Ok(Some(view)) => {
  242. match persistence
  243. .begin_transaction(|transaction| transaction.create_view(view.clone()))
  244. .await
  245. {
  246. Ok(_) => {
  247. send_dart_notification(&view.id, FolderNotification::ViewUpdated)
  248. .payload(view.clone())
  249. .send();
  250. }
  251. Err(e) => log::error!("Save view failed: {:?}", e),
  252. }
  253. }
  254. Ok(None) => {}
  255. Err(e) => log::error!("Read view failed: {:?}", e),
  256. }
  257. });
  258. Ok(())
  259. }
  260. fn listen_trash_can_event(&self) {
  261. let mut rx = self.trash_controller.subscribe();
  262. let persistence = self.persistence.clone();
  263. let block_manager = self.block_manager.clone();
  264. let trash_controller = self.trash_controller.clone();
  265. let _ = tokio::spawn(async move {
  266. loop {
  267. let mut stream = Box::pin(rx.recv().into_stream().filter_map(|result| async move {
  268. match result {
  269. Ok(event) => event.select(TrashType::TrashView),
  270. Err(_e) => None,
  271. }
  272. }));
  273. if let Some(event) = stream.next().await {
  274. handle_trash_event(
  275. persistence.clone(),
  276. block_manager.clone(),
  277. trash_controller.clone(),
  278. event,
  279. )
  280. .await
  281. }
  282. }
  283. });
  284. }
  285. }
  286. #[tracing::instrument(level = "trace", skip(persistence, block_manager, trash_can))]
  287. async fn handle_trash_event(
  288. persistence: Arc<FolderPersistence>,
  289. block_manager: Arc<BlockManager>,
  290. trash_can: Arc<TrashController>,
  291. event: TrashEvent,
  292. ) {
  293. match event {
  294. TrashEvent::NewTrash(identifiers, ret) => {
  295. let result = persistence
  296. .begin_transaction(|transaction| {
  297. let views = read_local_views_with_transaction(identifiers, &transaction)?;
  298. for view in views {
  299. let _ = notify_views_changed(&view.belong_to_id, trash_can.clone(), &transaction)?;
  300. notify_dart(view, FolderNotification::ViewDeleted);
  301. }
  302. Ok(())
  303. })
  304. .await;
  305. let _ = ret.send(result).await;
  306. }
  307. TrashEvent::Putback(identifiers, ret) => {
  308. let result = persistence
  309. .begin_transaction(|transaction| {
  310. let views = read_local_views_with_transaction(identifiers, &transaction)?;
  311. for view in views {
  312. let _ = notify_views_changed(&view.belong_to_id, trash_can.clone(), &transaction)?;
  313. notify_dart(view, FolderNotification::ViewRestored);
  314. }
  315. Ok(())
  316. })
  317. .await;
  318. let _ = ret.send(result).await;
  319. }
  320. TrashEvent::Delete(identifiers, ret) => {
  321. let result = persistence
  322. .begin_transaction(|transaction| {
  323. let mut notify_ids = HashSet::new();
  324. for identifier in identifiers.items {
  325. let view = transaction.read_view(&identifier.id)?;
  326. let _ = transaction.delete_view(&identifier.id)?;
  327. let _ = block_manager.delete_block(&identifier.id)?;
  328. notify_ids.insert(view.belong_to_id);
  329. }
  330. for notify_id in notify_ids {
  331. let _ = notify_views_changed(&notify_id, trash_can.clone(), &transaction)?;
  332. }
  333. Ok(())
  334. })
  335. .await;
  336. let _ = ret.send(result).await;
  337. }
  338. }
  339. }
  340. fn read_local_views_with_transaction<'a>(
  341. identifiers: RepeatedTrashId,
  342. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  343. ) -> Result<Vec<View>, FlowyError> {
  344. let mut views = vec![];
  345. for identifier in identifiers.items {
  346. let view = transaction.read_view(&identifier.id)?;
  347. views.push(view);
  348. }
  349. Ok(views)
  350. }
  351. fn notify_dart(view: View, notification: FolderNotification) {
  352. send_dart_notification(&view.id, notification).payload(view).send();
  353. }
  354. #[tracing::instrument(skip(belong_to_id, trash_controller, transaction), fields(view_count), err)]
  355. fn notify_views_changed<'a>(
  356. belong_to_id: &str,
  357. trash_controller: Arc<TrashController>,
  358. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  359. ) -> FlowyResult<()> {
  360. let repeated_view = read_belonging_views_on_local(belong_to_id, trash_controller.clone(), transaction)?;
  361. tracing::Span::current().record("view_count", &format!("{}", repeated_view.len()).as_str());
  362. send_dart_notification(belong_to_id, FolderNotification::AppViewsChanged)
  363. .payload(repeated_view)
  364. .send();
  365. Ok(())
  366. }
  367. fn read_belonging_views_on_local<'a>(
  368. belong_to_id: &str,
  369. trash_controller: Arc<TrashController>,
  370. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  371. ) -> FlowyResult<RepeatedView> {
  372. let mut views = transaction.read_views(belong_to_id)?;
  373. let trash_ids = trash_controller.read_trash_ids(transaction)?;
  374. views.retain(|view_table| !trash_ids.contains(&view_table.id));
  375. Ok(RepeatedView { items: views })
  376. }