trash_can.rs 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. use crate::{
  2. entities::trash::{RepeatedTrash, Trash, TrashIdentifier, TrashIdentifiers, TrashType},
  3. errors::{WorkspaceError, WorkspaceResult},
  4. module::{WorkspaceDatabase, WorkspaceUser},
  5. notify::{send_anonymous_dart_notification, WorkspaceNotification},
  6. services::{helper::spawn, server::Server},
  7. sql_tables::trash::TrashTableSql,
  8. };
  9. use crossbeam_utils::thread;
  10. use flowy_database::SqliteConnection;
  11. use std::sync::Arc;
  12. use tokio::sync::{broadcast, mpsc};
  13. pub struct TrashCan {
  14. pub database: Arc<dyn WorkspaceDatabase>,
  15. notify: broadcast::Sender<TrashEvent>,
  16. server: Server,
  17. user: Arc<dyn WorkspaceUser>,
  18. }
  19. impl TrashCan {
  20. pub fn new(database: Arc<dyn WorkspaceDatabase>, server: Server, user: Arc<dyn WorkspaceUser>) -> Self {
  21. let (tx, _) = broadcast::channel(10);
  22. Self {
  23. database,
  24. notify: tx,
  25. server,
  26. user,
  27. }
  28. }
  29. pub(crate) fn init(&self) -> Result<(), WorkspaceError> { Ok(()) }
  30. pub fn read_trash(&self, conn: &SqliteConnection) -> Result<RepeatedTrash, WorkspaceError> {
  31. let repeated_trash = TrashTableSql::read_all(&*conn)?;
  32. let _ = self.read_trash_on_server()?;
  33. Ok(repeated_trash)
  34. }
  35. pub fn trash_ids(&self, conn: &SqliteConnection) -> Result<Vec<String>, WorkspaceError> {
  36. let ids = TrashTableSql::read_all(&*conn)?
  37. .take_items()
  38. .into_iter()
  39. .map(|item| item.id)
  40. .collect::<Vec<String>>();
  41. Ok(ids)
  42. }
  43. #[tracing::instrument(level = "debug", skip(self), fields(putback) err)]
  44. pub async fn putback(&self, trash_id: &str) -> WorkspaceResult<()> {
  45. let (tx, mut rx) = mpsc::channel::<WorkspaceResult<()>>(1);
  46. let trash_table = TrashTableSql::read(trash_id, &*self.database.db_connection()?)?;
  47. let _ = thread::scope(|_s| {
  48. let conn = self.database.db_connection()?;
  49. let _ = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  50. let _ = TrashTableSql::delete_trash(trash_id, &*conn)?;
  51. notify_trash_num_changed(TrashTableSql::read_all(&conn)?);
  52. Ok(())
  53. })?;
  54. Ok::<(), WorkspaceError>(())
  55. })
  56. .unwrap()?;
  57. let identifier = TrashIdentifier {
  58. id: trash_table.id,
  59. ty: trash_table.ty.into(),
  60. };
  61. let _ = self.delete_trash_on_server(TrashIdentifiers {
  62. items: vec![identifier.clone()],
  63. })?;
  64. tracing::Span::current().record("putback", &format!("{:?}", &identifier).as_str());
  65. let _ = self.notify.send(TrashEvent::Putback(vec![identifier].into(), tx));
  66. let _ = rx.recv().await.unwrap()?;
  67. Ok(())
  68. }
  69. #[tracing::instrument(level = "debug", skip(self) err)]
  70. pub fn restore_all(&self) -> WorkspaceResult<()> { Ok(()) }
  71. #[tracing::instrument(level = "debug", skip(self) err)]
  72. pub fn delete_all(&self) -> WorkspaceResult<()> { Ok(()) }
  73. #[tracing::instrument(level = "debug", skip(self) err)]
  74. pub async fn delete(&self, trash_identifiers: TrashIdentifiers) -> WorkspaceResult<()> {
  75. let (tx, mut rx) = mpsc::channel::<WorkspaceResult<()>>(1);
  76. let _ = self.notify.send(TrashEvent::Delete(trash_identifiers.clone(), tx));
  77. let _ = rx.recv().await.unwrap()?;
  78. let conn = self.database.db_connection()?;
  79. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  80. for trash_identifier in &trash_identifiers.items {
  81. let _ = TrashTableSql::delete_trash(&trash_identifier.id, &conn)?;
  82. }
  83. Ok(())
  84. })?;
  85. let _ = self.delete_trash_on_server(trash_identifiers)?;
  86. Ok(())
  87. }
  88. // [[ transaction ]]
  89. // https://www.tutlane.com/tutorial/sqlite/sqlite-transactions-begin-commit-rollback
  90. // We can use these commands only when we are performing INSERT, UPDATE, and
  91. // DELETE operations. It’s not possible for us to use these commands to
  92. // CREATE and DROP tables operations because those are auto-commit in the
  93. // database.
  94. #[tracing::instrument(level = "debug", skip(self, trash), err)]
  95. pub async fn add<T: Into<Trash>>(&self, trash: Vec<T>) -> Result<(), WorkspaceError> {
  96. let (tx, mut rx) = mpsc::channel::<WorkspaceResult<()>>(1);
  97. let trash = trash.into_iter().map(|t| t.into()).collect::<Vec<Trash>>();
  98. let mut items = vec![];
  99. let _ = thread::scope(|_s| {
  100. let conn = self.database.db_connection()?;
  101. conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  102. for t in &trash {
  103. log::debug!("create trash: {:?}", t);
  104. items.push(TrashIdentifier {
  105. id: t.id.clone(),
  106. ty: t.ty.clone(),
  107. });
  108. let _ = TrashTableSql::create_trash(t.clone().into(), &*conn)?;
  109. }
  110. self.create_trash_on_server(trash);
  111. notify_trash_num_changed(TrashTableSql::read_all(&conn)?);
  112. Ok(())
  113. })?;
  114. Ok::<(), WorkspaceError>(())
  115. })
  116. .unwrap()?;
  117. let _ = self.notify.send(TrashEvent::NewTrash(items.into(), tx));
  118. let _ = rx.recv().await.unwrap()?;
  119. Ok(())
  120. }
  121. pub fn subscribe(&self) -> broadcast::Receiver<TrashEvent> { self.notify.subscribe() }
  122. }
  123. impl TrashCan {
  124. #[tracing::instrument(level = "debug", skip(self, trash), err)]
  125. fn create_trash_on_server<T: Into<TrashIdentifiers>>(&self, trash: T) -> WorkspaceResult<()> {
  126. let token = self.user.token()?;
  127. let trash_identifiers = trash.into();
  128. let server = self.server.clone();
  129. // TODO: retry?
  130. let _ = tokio::spawn(async move {
  131. match server.create_trash(&token, trash_identifiers).await {
  132. Ok(_) => {},
  133. Err(e) => log::error!("Create trash failed: {:?}", e),
  134. }
  135. });
  136. Ok(())
  137. }
  138. #[tracing::instrument(level = "debug", skip(self, trash), err)]
  139. fn delete_trash_on_server<T: Into<TrashIdentifiers>>(&self, trash: T) -> WorkspaceResult<()> {
  140. let token = self.user.token()?;
  141. let trash_identifiers = trash.into();
  142. let server = self.server.clone();
  143. let _ = tokio::spawn(async move {
  144. match server.delete_trash(&token, trash_identifiers).await {
  145. Ok(_) => {},
  146. Err(e) => log::error!("Delete trash failed: {:?}", e),
  147. }
  148. });
  149. Ok(())
  150. }
  151. #[tracing::instrument(level = "debug", skip(self), err)]
  152. fn read_trash_on_server(&self) -> WorkspaceResult<()> {
  153. let token = self.user.token()?;
  154. let server = self.server.clone();
  155. let pool = self.database.db_pool()?;
  156. spawn(async move {
  157. match server.read_trash(&token).await {
  158. Ok(repeated_trash) => {
  159. match pool.get() {
  160. Ok(conn) => {
  161. let result = conn.immediate_transaction::<_, WorkspaceError, _>(|| {
  162. for trash in &repeated_trash.items {
  163. let _ = TrashTableSql::create_trash(trash.clone().into(), &*conn)?;
  164. }
  165. Ok(())
  166. });
  167. match result {
  168. Ok(_) => {
  169. // FIXME: User may modify the trash(add/putback) before the flying request comes
  170. // back that will cause the trash list to be outdated.
  171. // TODO: impl with operation transform
  172. notify_trash_num_changed(repeated_trash);
  173. },
  174. Err(e) => log::error!("Save trash failed: {:?}", e),
  175. }
  176. },
  177. Err(e) => log::error!("Require db connection failed: {:?}", e),
  178. }
  179. },
  180. Err(e) => log::error!("Read trash failed: {:?}", e),
  181. }
  182. });
  183. Ok(())
  184. }
  185. }
  186. #[tracing::instrument(skip(repeated_trash), fields(trash_count))]
  187. fn notify_trash_num_changed(repeated_trash: RepeatedTrash) {
  188. tracing::Span::current().record("trash_count", &repeated_trash.len());
  189. send_anonymous_dart_notification(WorkspaceNotification::TrashUpdated)
  190. .payload(repeated_trash)
  191. .send();
  192. }
  193. #[derive(Clone)]
  194. pub enum TrashEvent {
  195. NewTrash(TrashIdentifiers, mpsc::Sender<WorkspaceResult<()>>),
  196. Putback(TrashIdentifiers, mpsc::Sender<WorkspaceResult<()>>),
  197. Delete(TrashIdentifiers, mpsc::Sender<WorkspaceResult<()>>),
  198. }
  199. impl TrashEvent {
  200. pub fn select(self, s: TrashType) -> Option<TrashEvent> {
  201. match self {
  202. TrashEvent::Putback(mut identifiers, sender) => {
  203. identifiers.items.retain(|item| item.ty == s);
  204. if identifiers.items.is_empty() {
  205. None
  206. } else {
  207. Some(TrashEvent::Putback(identifiers, sender))
  208. }
  209. },
  210. TrashEvent::Delete(mut identifiers, sender) => {
  211. identifiers.items.retain(|item| item.ty == s);
  212. if identifiers.items.is_empty() {
  213. None
  214. } else {
  215. Some(TrashEvent::Delete(identifiers, sender))
  216. }
  217. },
  218. TrashEvent::NewTrash(mut identifiers, sender) => {
  219. identifiers.items.retain(|item| item.ty == s);
  220. if identifiers.items.is_empty() {
  221. None
  222. } else {
  223. Some(TrashEvent::NewTrash(identifiers, sender))
  224. }
  225. },
  226. }
  227. }
  228. }