rev_queue.rs 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. use crate::{RevIdCounter, RevisionMergeable, RevisionPersistence};
  2. use async_stream::stream;
  3. use bytes::Bytes;
  4. use flowy_error::FlowyError;
  5. use futures::stream::StreamExt;
  6. use revision_model::Revision;
  7. use std::sync::Arc;
  8. use tokio::sync::mpsc::{Receiver, Sender};
  9. use tokio::sync::oneshot;
  10. #[derive(Debug)]
  11. pub(crate) enum RevisionCommand {
  12. RevisionData {
  13. data: Bytes,
  14. object_md5: String,
  15. ret: Ret<i64>,
  16. },
  17. }
  18. /// [RevisionQueue] is used to keep the [RevisionCommand] processing in order.
  19. pub(crate) struct RevisionQueue<Connection> {
  20. object_id: String,
  21. rev_id_counter: Arc<RevIdCounter>,
  22. rev_persistence: Arc<RevisionPersistence<Connection>>,
  23. rev_compress: Arc<dyn RevisionMergeable>,
  24. receiver: Option<RevCommandReceiver>,
  25. }
  26. impl<Connection> RevisionQueue<Connection>
  27. where
  28. Connection: 'static,
  29. {
  30. pub fn new(
  31. object_id: String,
  32. rev_id_counter: Arc<RevIdCounter>,
  33. rev_persistence: Arc<RevisionPersistence<Connection>>,
  34. rev_compress: Arc<dyn RevisionMergeable>,
  35. receiver: RevCommandReceiver,
  36. ) -> Self {
  37. Self {
  38. object_id,
  39. rev_id_counter,
  40. rev_persistence,
  41. rev_compress,
  42. receiver: Some(receiver),
  43. }
  44. }
  45. pub async fn run(mut self) {
  46. let mut receiver = self.receiver.take().expect("Only take once");
  47. let object_id = self.object_id.clone();
  48. let stream = stream! {
  49. loop {
  50. match receiver.recv().await {
  51. Some(msg) => yield msg,
  52. None => {
  53. tracing::trace!("{}'s RevQueue exist", &object_id);
  54. break
  55. },
  56. }
  57. }
  58. };
  59. stream
  60. .for_each(|command| async {
  61. match self.handle_command(command).await {
  62. Ok(_) => {},
  63. Err(e) => tracing::error!("[RevQueue]: {}", e),
  64. }
  65. })
  66. .await;
  67. }
  68. async fn handle_command(&self, command: RevisionCommand) -> Result<(), FlowyError> {
  69. match command {
  70. RevisionCommand::RevisionData {
  71. data,
  72. object_md5: data_md5,
  73. ret,
  74. } => {
  75. let base_rev_id = self.rev_id_counter.value();
  76. let rev_id = self.rev_id_counter.next_id();
  77. let revision = Revision::new(&self.object_id, base_rev_id, rev_id, data, data_md5);
  78. let new_rev_id = self
  79. .rev_persistence
  80. .add_local_revision(revision, &self.rev_compress)
  81. .await?;
  82. self.rev_id_counter.set(new_rev_id);
  83. let _ = ret.send(Ok(new_rev_id));
  84. },
  85. }
  86. Ok(())
  87. }
  88. }
  89. pub(crate) type RevCommandSender = Sender<RevisionCommand>;
  90. pub(crate) type RevCommandReceiver = Receiver<RevisionCommand>;
  91. pub(crate) type Ret<T> = oneshot::Sender<Result<T, FlowyError>>;