controller.rs 12 KB


  1. use crate::{
  2. dart_notification::{send_anonymous_dart_notification, FolderNotification},
  3. entities::trash::{RepeatedTrash, RepeatedTrashId, Trash, TrashId, TrashType},
  4. errors::{FlowyError, FlowyResult},
  5. event_map::{FolderCouldServiceV1, WorkspaceUser},
  6. services::persistence::{FolderPersistence, FolderPersistenceTransaction},
  7. };
  8. use flowy_folder_data_model::revision::TrashRevision;
  9. use std::{fmt::Formatter, sync::Arc};
  10. use tokio::sync::{broadcast, mpsc};
  11. pub struct TrashController {
  12. persistence: Arc<FolderPersistence>,
  13. notify: broadcast::Sender<TrashEvent>,
  14. cloud_service: Arc<dyn FolderCouldServiceV1>,
  15. user: Arc<dyn WorkspaceUser>,
  16. }
  17. impl TrashController {
  18. pub fn new(
  19. persistence: Arc<FolderPersistence>,
  20. cloud_service: Arc<dyn FolderCouldServiceV1>,
  21. user: Arc<dyn WorkspaceUser>,
  22. ) -> Self {
  23. let (tx, _) = broadcast::channel(10);
  24. Self {
  25. persistence,
  26. notify: tx,
  27. cloud_service,
  28. user,
  29. }
  30. }
  31. #[tracing::instrument(level = "debug", skip(self), fields(putback) err)]
  32. pub async fn putback(&self, trash_id: &str) -> FlowyResult<()> {
  33. let (tx, mut rx) = mpsc::channel::<FlowyResult<()>>(1);
  34. let trash = self
  35. .persistence
  36. .begin_transaction(|transaction| {
  37. let mut repeated_trash = transaction.read_trash(Some(trash_id.to_owned()))?;
  38. let _ = transaction.delete_trash(Some(vec![trash_id.to_owned()]))?;
  39. notify_trash_changed(transaction.read_trash(None)?);
  40. if repeated_trash.is_empty() {
  41. return Err(FlowyError::internal().context("Try to put back trash is not exists"));
  42. }
  43. Ok(repeated_trash.pop().unwrap())
  44. })
  45. .await?;
  46. let identifier = TrashId {
  47. id: trash.id,
  48. ty: trash.ty.into(),
  49. };
  50. let _ = self.delete_trash_on_server(RepeatedTrashId {
  51. items: vec![identifier.clone()],
  52. delete_all: false,
  53. })?;
  54. tracing::Span::current().record("putback", &format!("{:?}", &identifier).as_str());
  55. let _ = self.notify.send(TrashEvent::Putback(vec![identifier].into(), tx));
  56. let _ = rx.recv().await.unwrap()?;
  57. Ok(())
  58. }
  59. #[tracing::instrument(level = "debug", skip(self) err)]
  60. pub async fn restore_all_trash(&self) -> FlowyResult<()> {
  61. let trash_identifier: RepeatedTrashId = self
  62. .persistence
  63. .begin_transaction(|transaction| {
  64. let trash = transaction.read_trash(None);
  65. let _ = transaction.delete_trash(None);
  66. trash
  67. })
  68. .await?
  69. .into();
  70. let (tx, mut rx) = mpsc::channel::<FlowyResult<()>>(1);
  71. let _ = self.notify.send(TrashEvent::Putback(trash_identifier, tx));
  72. let _ = rx.recv().await;
  73. notify_trash_changed(RepeatedTrash { items: vec![] });
  74. let _ = self.delete_all_trash_on_server().await?;
  75. Ok(())
  76. }
  77. #[tracing::instrument(level = "debug", skip(self), err)]
  78. pub async fn delete_all_trash(&self) -> FlowyResult<()> {
  79. let all_trash_identifiers: RepeatedTrashId = self
  80. .persistence
  81. .begin_transaction(|transaction| transaction.read_trash(None))
  82. .await?
  83. .into();
  84. let _ = self.delete_with_identifiers(all_trash_identifiers).await?;
  85. notify_trash_changed(RepeatedTrash { items: vec![] });
  86. let _ = self.delete_all_trash_on_server().await?;
  87. Ok(())
  88. }
  89. #[tracing::instrument(level = "debug", skip(self), err)]
  90. pub async fn delete(&self, trash_identifiers: RepeatedTrashId) -> FlowyResult<()> {
  91. let _ = self.delete_with_identifiers(trash_identifiers.clone()).await?;
  92. let trash_revs = self
  93. .persistence
  94. .begin_transaction(|transaction| transaction.read_trash(None))
  95. .await?;
  96. notify_trash_changed(trash_revs);
  97. let _ = self.delete_trash_on_server(trash_identifiers)?;
  98. Ok(())
  99. }
  100. #[tracing::instrument(level = "debug", skip(self), fields(delete_trash_ids), err)]
  101. pub async fn delete_with_identifiers(&self, trash_identifiers: RepeatedTrashId) -> FlowyResult<()> {
  102. let (tx, mut rx) = mpsc::channel::<FlowyResult<()>>(1);
  103. tracing::Span::current().record("delete_trash_ids", &format!("{}", trash_identifiers).as_str());
  104. let _ = self.notify.send(TrashEvent::Delete(trash_identifiers.clone(), tx));
  105. match rx.recv().await {
  106. None => {}
  107. Some(result) => match result {
  108. Ok(_) => {}
  109. Err(e) => log::error!("{}", e),
  110. },
  111. }
  112. let _ = self
  113. .persistence
  114. .begin_transaction(|transaction| {
  115. let ids = trash_identifiers
  116. .items
  117. .into_iter()
  118. .map(|item| item.id)
  119. .collect::<Vec<_>>();
  120. transaction.delete_trash(Some(ids))
  121. })
  122. .await?;
  123. Ok(())
  124. }
  125. // [[ transaction ]]
  126. // https://www.tutlane.com/tutorial/sqlite/sqlite-transactions-begin-commit-rollback
  127. // We can use these commands only when we are performing INSERT, UPDATE, and
  128. // DELETE operations. It’s not possible for us to use these commands to
  129. // CREATE and DROP tables operations because those are auto-commit in the
  130. // database.
  131. #[tracing::instrument(name = "add_trash", level = "debug", skip(self, trash), fields(trash_ids), err)]
  132. pub async fn add<T: Into<TrashRevision>>(&self, trash: Vec<T>) -> Result<(), FlowyError> {
  133. let (tx, mut rx) = mpsc::channel::<FlowyResult<()>>(1);
  134. let trash_revs: Vec<TrashRevision> = trash.into_iter().map(|t| t.into()).collect();
  135. let identifiers = trash_revs.iter().map(|t| t.into()).collect::<Vec<TrashId>>();
  136. tracing::Span::current().record(
  137. "trash_ids",
  138. &format!(
  139. "{:?}",
  140. identifiers
  141. .iter()
  142. .map(|identifier| format!("{:?}:{}", identifier.ty, identifier.id))
  143. .collect::<Vec<_>>()
  144. )
  145. .as_str(),
  146. );
  147. let _ = self
  148. .persistence
  149. .begin_transaction(|transaction| {
  150. let _ = transaction.create_trash(trash_revs.clone())?;
  151. let _ = self.create_trash_on_server(trash_revs);
  152. notify_trash_changed(transaction.read_trash(None)?);
  153. Ok(())
  154. })
  155. .await?;
  156. let _ = self.notify.send(TrashEvent::NewTrash(identifiers.into(), tx));
  157. let _ = rx.recv().await.unwrap()?;
  158. Ok(())
  159. }
  160. pub fn subscribe(&self) -> broadcast::Receiver<TrashEvent> {
  161. self.notify.subscribe()
  162. }
  163. pub async fn read_trash(&self) -> Result<RepeatedTrash, FlowyError> {
  164. let items: Vec<Trash> = self
  165. .persistence
  166. .begin_transaction(|transaction| transaction.read_trash(None))
  167. .await?
  168. .into_iter()
  169. .map(|trash_rev| trash_rev.into())
  170. .collect();
  171. let _ = self.read_trash_on_server()?;
  172. Ok(RepeatedTrash { items })
  173. }
  174. pub fn read_trash_ids<'a>(
  175. &self,
  176. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  177. ) -> Result<Vec<String>, FlowyError> {
  178. let ids = transaction
  179. .read_trash(None)?
  180. .into_iter()
  181. .map(|item| item.id)
  182. .collect::<Vec<String>>();
  183. Ok(ids)
  184. }
  185. }
  186. impl TrashController {
  187. #[tracing::instrument(level = "trace", skip(self, trash), err)]
  188. fn create_trash_on_server<T: Into<RepeatedTrashId>>(&self, trash: T) -> FlowyResult<()> {
  189. let token = self.user.token()?;
  190. let trash_identifiers = trash.into();
  191. let server = self.cloud_service.clone();
  192. // TODO: retry?
  193. let _ = tokio::spawn(async move {
  194. match server.create_trash(&token, trash_identifiers).await {
  195. Ok(_) => {}
  196. Err(e) => log::error!("Create trash failed: {:?}", e),
  197. }
  198. });
  199. Ok(())
  200. }
  201. #[tracing::instrument(level = "trace", skip(self, trash), err)]
  202. fn delete_trash_on_server<T: Into<RepeatedTrashId>>(&self, trash: T) -> FlowyResult<()> {
  203. let token = self.user.token()?;
  204. let trash_identifiers = trash.into();
  205. let server = self.cloud_service.clone();
  206. let _ = tokio::spawn(async move {
  207. match server.delete_trash(&token, trash_identifiers).await {
  208. Ok(_) => {}
  209. Err(e) => log::error!("Delete trash failed: {:?}", e),
  210. }
  211. });
  212. Ok(())
  213. }
  214. #[tracing::instrument(level = "trace", skip(self), err)]
  215. fn read_trash_on_server(&self) -> FlowyResult<()> {
  216. let token = self.user.token()?;
  217. let server = self.cloud_service.clone();
  218. let persistence = self.persistence.clone();
  219. tokio::spawn(async move {
  220. match server.read_trash(&token).await {
  221. Ok(trash_rev) => {
  222. tracing::debug!("Remote trash count: {}", trash_rev.len());
  223. let result = persistence
  224. .begin_transaction(|transaction| {
  225. let _ = transaction.create_trash(trash_rev.clone())?;
  226. transaction.read_trash(None)
  227. })
  228. .await;
  229. match result {
  230. Ok(trash_revs) => {
  231. notify_trash_changed(trash_revs);
  232. }
  233. Err(e) => log::error!("Save trash failed: {:?}", e),
  234. }
  235. }
  236. Err(e) => log::error!("Read trash failed: {:?}", e),
  237. }
  238. });
  239. Ok(())
  240. }
  241. #[tracing::instrument(level = "trace", skip(self), err)]
  242. async fn delete_all_trash_on_server(&self) -> FlowyResult<()> {
  243. let token = self.user.token()?;
  244. let server = self.cloud_service.clone();
  245. server.delete_trash(&token, RepeatedTrashId::all()).await
  246. }
  247. }
  248. #[tracing::instrument(level = "debug", skip(repeated_trash), fields(n_trash))]
  249. fn notify_trash_changed<T: Into<RepeatedTrash>>(repeated_trash: T) {
  250. let repeated_trash = repeated_trash.into();
  251. tracing::Span::current().record("n_trash", &repeated_trash.len());
  252. send_anonymous_dart_notification(FolderNotification::TrashUpdated)
  253. .payload(repeated_trash)
  254. .send();
  255. }
  256. #[derive(Clone)]
  257. pub enum TrashEvent {
  258. NewTrash(RepeatedTrashId, mpsc::Sender<FlowyResult<()>>),
  259. Putback(RepeatedTrashId, mpsc::Sender<FlowyResult<()>>),
  260. Delete(RepeatedTrashId, mpsc::Sender<FlowyResult<()>>),
  261. }
  262. impl std::fmt::Debug for TrashEvent {
  263. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
  264. match self {
  265. TrashEvent::NewTrash(identifiers, _) => f.write_str(&format!("{:?}", identifiers)),
  266. TrashEvent::Putback(identifiers, _) => f.write_str(&format!("{:?}", identifiers)),
  267. TrashEvent::Delete(identifiers, _) => f.write_str(&format!("{:?}", identifiers)),
  268. }
  269. }
  270. }
  271. impl TrashEvent {
  272. pub fn select(self, s: TrashType) -> Option<TrashEvent> {
  273. match self {
  274. TrashEvent::Putback(mut identifiers, sender) => {
  275. identifiers.items.retain(|item| item.ty == s);
  276. if identifiers.items.is_empty() {
  277. None
  278. } else {
  279. Some(TrashEvent::Putback(identifiers, sender))
  280. }
  281. }
  282. TrashEvent::Delete(mut identifiers, sender) => {
  283. identifiers.items.retain(|item| item.ty == s);
  284. if identifiers.items.is_empty() {
  285. None
  286. } else {
  287. Some(TrashEvent::Delete(identifiers, sender))
  288. }
  289. }
  290. TrashEvent::NewTrash(mut identifiers, sender) => {
  291. identifiers.items.retain(|item| item.ty == s);
  292. if identifiers.items.is_empty() {
  293. None
  294. } else {
  295. Some(TrashEvent::NewTrash(identifiers, sender))
  296. }
  297. }
  298. }
  299. }
  300. }