rev_queue.rs 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. #![allow(clippy::while_let_loop)]
  2. use crate::{RevIdCounter, RevisionMergeable, RevisionPersistence};
  3. use async_stream::stream;
  4. use bytes::Bytes;
  5. use flowy_error::FlowyError;
  6. use futures::stream::StreamExt;
  7. use revision_model::Revision;
  8. use std::sync::Arc;
  9. use tokio::sync::mpsc::{Receiver, Sender};
  10. use tokio::sync::oneshot;
  11. #[derive(Debug)]
  12. pub(crate) enum RevisionCommand {
  13. RevisionData {
  14. data: Bytes,
  15. object_md5: String,
  16. ret: Ret<i64>,
  17. },
  18. }
  19. /// [RevisionQueue] is used to keep the [RevisionCommand] processing in order.
  20. pub(crate) struct RevisionQueue<Connection> {
  21. object_id: String,
  22. rev_id_counter: Arc<RevIdCounter>,
  23. rev_persistence: Arc<RevisionPersistence<Connection>>,
  24. rev_compress: Arc<dyn RevisionMergeable>,
  25. receiver: Option<RevCommandReceiver>,
  26. }
  27. impl<Connection> RevisionQueue<Connection>
  28. where
  29. Connection: 'static,
  30. {
  31. pub fn new(
  32. object_id: String,
  33. rev_id_counter: Arc<RevIdCounter>,
  34. rev_persistence: Arc<RevisionPersistence<Connection>>,
  35. rev_compress: Arc<dyn RevisionMergeable>,
  36. receiver: RevCommandReceiver,
  37. ) -> Self {
  38. Self {
  39. object_id,
  40. rev_id_counter,
  41. rev_persistence,
  42. rev_compress,
  43. receiver: Some(receiver),
  44. }
  45. }
  46. pub async fn run(mut self) {
  47. let mut receiver = self.receiver.take().expect("Only take once");
  48. let object_id = self.object_id.clone();
  49. let stream = stream! {
  50. loop {
  51. match receiver.recv().await {
  52. Some(msg) => yield msg,
  53. None => {
  54. tracing::trace!("{}'s RevQueue exist", &object_id);
  55. break
  56. },
  57. }
  58. }
  59. };
  60. stream
  61. .for_each(|command| async {
  62. match self.handle_command(command).await {
  63. Ok(_) => {},
  64. Err(e) => tracing::error!("[RevQueue]: {}", e),
  65. }
  66. })
  67. .await;
  68. }
  69. async fn handle_command(&self, command: RevisionCommand) -> Result<(), FlowyError> {
  70. match command {
  71. RevisionCommand::RevisionData {
  72. data,
  73. object_md5: data_md5,
  74. ret,
  75. } => {
  76. let base_rev_id = self.rev_id_counter.value();
  77. let rev_id = self.rev_id_counter.next_id();
  78. let revision = Revision::new(&self.object_id, base_rev_id, rev_id, data, data_md5);
  79. let new_rev_id = self
  80. .rev_persistence
  81. .add_local_revision(revision, &self.rev_compress)
  82. .await?;
  83. self.rev_id_counter.set(new_rev_id);
  84. let _ = ret.send(Ok(new_rev_id));
  85. },
  86. }
  87. Ok(())
  88. }
  89. }
  90. pub(crate) type RevCommandSender = Sender<RevisionCommand>;
  91. pub(crate) type RevCommandReceiver = Receiver<RevisionCommand>;
  92. pub(crate) type Ret<T> = oneshot::Sender<Result<T, FlowyError>>;