controller.rs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. use crate::{
  2. dart_notification::{send_anonymous_dart_notification, WorkspaceNotification},
  3. entities::trash::{RepeatedTrash, RepeatedTrashId, Trash, TrashId, TrashType},
  4. errors::{FlowyError, FlowyResult},
  5. module::{FolderCouldServiceV1, WorkspaceUser},
  6. services::persistence::{FolderPersistence, FolderPersistenceTransaction},
  7. };
  8. use std::{fmt::Formatter, sync::Arc};
  9. use tokio::sync::{broadcast, mpsc};
  10. pub struct TrashController {
  11. persistence: Arc<FolderPersistence>,
  12. notify: broadcast::Sender<TrashEvent>,
  13. cloud_service: Arc<dyn FolderCouldServiceV1>,
  14. user: Arc<dyn WorkspaceUser>,
  15. }
  16. impl TrashController {
  17. pub fn new(
  18. persistence: Arc<FolderPersistence>,
  19. cloud_service: Arc<dyn FolderCouldServiceV1>,
  20. user: Arc<dyn WorkspaceUser>,
  21. ) -> Self {
  22. let (tx, _) = broadcast::channel(10);
  23. Self {
  24. persistence,
  25. notify: tx,
  26. cloud_service,
  27. user,
  28. }
  29. }
  30. #[tracing::instrument(level = "debug", skip(self), fields(putback) err)]
  31. pub async fn putback(&self, trash_id: &str) -> FlowyResult<()> {
  32. let (tx, mut rx) = mpsc::channel::<FlowyResult<()>>(1);
  33. let trash = self
  34. .persistence
  35. .begin_transaction(|transaction| {
  36. let mut repeated_trash = transaction.read_trash(Some(trash_id.to_owned()))?;
  37. let _ = transaction.delete_trash(Some(vec![trash_id.to_owned()]))?;
  38. notify_trash_changed(transaction.read_trash(None)?);
  39. if repeated_trash.is_empty() {
  40. return Err(FlowyError::internal().context("Try to put back trash is not exists"));
  41. }
  42. Ok(repeated_trash.pop().unwrap())
  43. })
  44. .await?;
  45. let identifier = TrashId {
  46. id: trash.id,
  47. ty: trash.ty,
  48. };
  49. let _ = self.delete_trash_on_server(RepeatedTrashId {
  50. items: vec![identifier.clone()],
  51. delete_all: false,
  52. })?;
  53. tracing::Span::current().record("putback", &format!("{:?}", &identifier).as_str());
  54. let _ = self.notify.send(TrashEvent::Putback(vec![identifier].into(), tx));
  55. let _ = rx.recv().await.unwrap()?;
  56. Ok(())
  57. }
  58. #[tracing::instrument(level = "debug", skip(self) err)]
  59. pub async fn restore_all_trash(&self) -> FlowyResult<()> {
  60. let repeated_trash = self
  61. .persistence
  62. .begin_transaction(|transaction| {
  63. let trash = transaction.read_trash(None);
  64. let _ = transaction.delete_trash(None);
  65. trash
  66. })
  67. .await?;
  68. let identifiers: RepeatedTrashId = repeated_trash.items.clone().into();
  69. let (tx, mut rx) = mpsc::channel::<FlowyResult<()>>(1);
  70. let _ = self.notify.send(TrashEvent::Putback(identifiers, tx));
  71. let _ = rx.recv().await;
  72. notify_trash_changed(RepeatedTrash { items: vec![] });
  73. let _ = self.delete_all_trash_on_server().await?;
  74. Ok(())
  75. }
  76. #[tracing::instrument(level = "debug", skip(self), err)]
  77. pub async fn delete_all_trash(&self) -> FlowyResult<()> {
  78. let repeated_trash = self
  79. .persistence
  80. .begin_transaction(|transaction| transaction.read_trash(None))
  81. .await?;
  82. let trash_identifiers: RepeatedTrashId = repeated_trash.items.clone().into();
  83. let _ = self.delete_with_identifiers(trash_identifiers.clone()).await?;
  84. notify_trash_changed(RepeatedTrash { items: vec![] });
  85. let _ = self.delete_all_trash_on_server().await?;
  86. Ok(())
  87. }
  88. #[tracing::instrument(level = "debug", skip(self), err)]
  89. pub async fn delete(&self, trash_identifiers: RepeatedTrashId) -> FlowyResult<()> {
  90. let _ = self.delete_with_identifiers(trash_identifiers.clone()).await?;
  91. let repeated_trash = self
  92. .persistence
  93. .begin_transaction(|transaction| transaction.read_trash(None))
  94. .await?;
  95. notify_trash_changed(repeated_trash);
  96. let _ = self.delete_trash_on_server(trash_identifiers)?;
  97. Ok(())
  98. }
  99. #[tracing::instrument(level = "debug", skip(self), fields(delete_trash_ids), err)]
  100. pub async fn delete_with_identifiers(&self, trash_identifiers: RepeatedTrashId) -> FlowyResult<()> {
  101. let (tx, mut rx) = mpsc::channel::<FlowyResult<()>>(1);
  102. tracing::Span::current().record("delete_trash_ids", &format!("{}", trash_identifiers).as_str());
  103. let _ = self.notify.send(TrashEvent::Delete(trash_identifiers.clone(), tx));
  104. match rx.recv().await {
  105. None => {},
  106. Some(result) => match result {
  107. Ok(_) => {},
  108. Err(e) => log::error!("{}", e),
  109. },
  110. }
  111. let _ = self
  112. .persistence
  113. .begin_transaction(|transaction| {
  114. let ids = trash_identifiers
  115. .items
  116. .into_iter()
  117. .map(|item| item.id)
  118. .collect::<Vec<_>>();
  119. transaction.delete_trash(Some(ids))
  120. })
  121. .await?;
  122. Ok(())
  123. }
  124. // [[ transaction ]]
  125. // https://www.tutlane.com/tutorial/sqlite/sqlite-transactions-begin-commit-rollback
  126. // We can use these commands only when we are performing INSERT, UPDATE, and
  127. // DELETE operations. It’s not possible for us to use these commands to
  128. // CREATE and DROP tables operations because those are auto-commit in the
  129. // database.
  130. #[tracing::instrument(name = "add_trash", level = "debug", skip(self, trash), fields(trash_ids), err)]
  131. pub async fn add<T: Into<Trash>>(&self, trash: Vec<T>) -> Result<(), FlowyError> {
  132. let (tx, mut rx) = mpsc::channel::<FlowyResult<()>>(1);
  133. let repeated_trash = trash.into_iter().map(|t| t.into()).collect::<Vec<Trash>>();
  134. let identifiers = repeated_trash.iter().map(|t| t.into()).collect::<Vec<TrashId>>();
  135. tracing::Span::current().record(
  136. "trash_ids",
  137. &format!(
  138. "{:?}",
  139. identifiers
  140. .iter()
  141. .map(|identifier| format!("{:?}:{}", identifier.ty, identifier.id))
  142. .collect::<Vec<_>>()
  143. )
  144. .as_str(),
  145. );
  146. let _ = self
  147. .persistence
  148. .begin_transaction(|transaction| {
  149. let _ = transaction.create_trash(repeated_trash.clone())?;
  150. let _ = self.create_trash_on_server(repeated_trash);
  151. notify_trash_changed(transaction.read_trash(None)?);
  152. Ok(())
  153. })
  154. .await?;
  155. let _ = self.notify.send(TrashEvent::NewTrash(identifiers.into(), tx));
  156. let _ = rx.recv().await.unwrap()?;
  157. Ok(())
  158. }
  159. pub fn subscribe(&self) -> broadcast::Receiver<TrashEvent> { self.notify.subscribe() }
  160. pub async fn read_trash(&self) -> Result<RepeatedTrash, FlowyError> {
  161. let repeated_trash = self
  162. .persistence
  163. .begin_transaction(|transaction| transaction.read_trash(None))
  164. .await?;
  165. let _ = self.read_trash_on_server()?;
  166. Ok(repeated_trash)
  167. }
  168. pub fn read_trash_ids<'a>(
  169. &self,
  170. transaction: &'a (dyn FolderPersistenceTransaction + 'a),
  171. ) -> Result<Vec<String>, FlowyError> {
  172. let ids = transaction
  173. .read_trash(None)?
  174. .into_inner()
  175. .into_iter()
  176. .map(|item| item.id)
  177. .collect::<Vec<String>>();
  178. Ok(ids)
  179. }
  180. }
  181. impl TrashController {
  182. #[tracing::instrument(level = "trace", skip(self, trash), err)]
  183. fn create_trash_on_server<T: Into<RepeatedTrashId>>(&self, trash: T) -> FlowyResult<()> {
  184. let token = self.user.token()?;
  185. let trash_identifiers = trash.into();
  186. let server = self.cloud_service.clone();
  187. // TODO: retry?
  188. let _ = tokio::spawn(async move {
  189. match server.create_trash(&token, trash_identifiers).await {
  190. Ok(_) => {},
  191. Err(e) => log::error!("Create trash failed: {:?}", e),
  192. }
  193. });
  194. Ok(())
  195. }
  196. #[tracing::instrument(level = "trace", skip(self, trash), err)]
  197. fn delete_trash_on_server<T: Into<RepeatedTrashId>>(&self, trash: T) -> FlowyResult<()> {
  198. let token = self.user.token()?;
  199. let trash_identifiers = trash.into();
  200. let server = self.cloud_service.clone();
  201. let _ = tokio::spawn(async move {
  202. match server.delete_trash(&token, trash_identifiers).await {
  203. Ok(_) => {},
  204. Err(e) => log::error!("Delete trash failed: {:?}", e),
  205. }
  206. });
  207. Ok(())
  208. }
  209. #[tracing::instrument(level = "trace", skip(self), err)]
  210. fn read_trash_on_server(&self) -> FlowyResult<()> {
  211. let token = self.user.token()?;
  212. let server = self.cloud_service.clone();
  213. let persistence = self.persistence.clone();
  214. tokio::spawn(async move {
  215. match server.read_trash(&token).await {
  216. Ok(repeated_trash) => {
  217. tracing::debug!("Remote trash count: {}", repeated_trash.items.len());
  218. let result = persistence
  219. .begin_transaction(|transaction| {
  220. let _ = transaction.create_trash(repeated_trash.items.clone())?;
  221. transaction.read_trash(None)
  222. })
  223. .await;
  224. match result {
  225. Ok(repeated_trash) => {
  226. notify_trash_changed(repeated_trash);
  227. },
  228. Err(e) => log::error!("Save trash failed: {:?}", e),
  229. }
  230. },
  231. Err(e) => log::error!("Read trash failed: {:?}", e),
  232. }
  233. });
  234. Ok(())
  235. }
  236. #[tracing::instrument(level = "trace", skip(self), err)]
  237. async fn delete_all_trash_on_server(&self) -> FlowyResult<()> {
  238. let token = self.user.token()?;
  239. let server = self.cloud_service.clone();
  240. server.delete_trash(&token, RepeatedTrashId::all()).await
  241. }
  242. }
  243. #[tracing::instrument(skip(repeated_trash), fields(n_trash))]
  244. fn notify_trash_changed(repeated_trash: RepeatedTrash) {
  245. tracing::Span::current().record("n_trash", &repeated_trash.len());
  246. send_anonymous_dart_notification(WorkspaceNotification::TrashUpdated)
  247. .payload(repeated_trash)
  248. .send();
  249. }
  250. #[derive(Clone)]
  251. pub enum TrashEvent {
  252. NewTrash(RepeatedTrashId, mpsc::Sender<FlowyResult<()>>),
  253. Putback(RepeatedTrashId, mpsc::Sender<FlowyResult<()>>),
  254. Delete(RepeatedTrashId, mpsc::Sender<FlowyResult<()>>),
  255. }
  256. impl std::fmt::Debug for TrashEvent {
  257. fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
  258. match self {
  259. TrashEvent::NewTrash(identifiers, _) => f.write_str(&format!("{:?}", identifiers)),
  260. TrashEvent::Putback(identifiers, _) => f.write_str(&format!("{:?}", identifiers)),
  261. TrashEvent::Delete(identifiers, _) => f.write_str(&format!("{:?}", identifiers)),
  262. }
  263. }
  264. }
  265. impl TrashEvent {
  266. pub fn select(self, s: TrashType) -> Option<TrashEvent> {
  267. match self {
  268. TrashEvent::Putback(mut identifiers, sender) => {
  269. identifiers.items.retain(|item| item.ty == s);
  270. if identifiers.items.is_empty() {
  271. None
  272. } else {
  273. Some(TrashEvent::Putback(identifiers, sender))
  274. }
  275. },
  276. TrashEvent::Delete(mut identifiers, sender) => {
  277. identifiers.items.retain(|item| item.ty == s);
  278. if identifiers.items.is_empty() {
  279. None
  280. } else {
  281. Some(TrashEvent::Delete(identifiers, sender))
  282. }
  283. },
  284. TrashEvent::NewTrash(mut identifiers, sender) => {
  285. identifiers.items.retain(|item| item.ty == s);
  286. if identifiers.items.is_empty() {
  287. None
  288. } else {
  289. Some(TrashEvent::NewTrash(identifiers, sender))
  290. }
  291. },
  292. }
  293. }
  294. }