rev_queue.rs 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. use crate::{RevIdCounter, RevisionMergeable, RevisionPersistence};
  2. use async_stream::stream;
  3. use bytes::Bytes;
  4. use flowy_error::FlowyError;
  5. use flowy_http_model::revision::Revision;
  6. use futures::stream::StreamExt;
  7. use std::sync::Arc;
  8. use tokio::sync::mpsc::{Receiver, Sender};
  9. use tokio::sync::oneshot;
  10. #[derive(Debug)]
  11. pub(crate) enum RevCommand {
  12. RevisionData {
  13. data: Bytes,
  14. object_md5: String,
  15. ret: Ret<i64>,
  16. },
  17. }
  18. pub(crate) struct RevQueue<Connection> {
  19. object_id: String,
  20. rev_id_counter: Arc<RevIdCounter>,
  21. rev_persistence: Arc<RevisionPersistence<Connection>>,
  22. rev_compress: Arc<dyn RevisionMergeable>,
  23. receiver: Option<RevCommandReceiver>,
  24. }
  25. impl<Connection> RevQueue<Connection>
  26. where
  27. Connection: 'static,
  28. {
  29. pub fn new(
  30. object_id: String,
  31. rev_id_counter: Arc<RevIdCounter>,
  32. rev_persistence: Arc<RevisionPersistence<Connection>>,
  33. rev_compress: Arc<dyn RevisionMergeable>,
  34. receiver: RevCommandReceiver,
  35. ) -> Self {
  36. Self {
  37. object_id,
  38. rev_id_counter,
  39. rev_persistence,
  40. rev_compress,
  41. receiver: Some(receiver),
  42. }
  43. }
  44. pub async fn run(mut self) {
  45. let mut receiver = self.receiver.take().expect("Only take once");
  46. let object_id = self.object_id.clone();
  47. let stream = stream! {
  48. loop {
  49. match receiver.recv().await {
  50. Some(msg) => yield msg,
  51. None => {
  52. tracing::trace!("{}'s RevQueue exist", &object_id);
  53. break
  54. },
  55. }
  56. }
  57. };
  58. stream
  59. .for_each(|command| async {
  60. match self.handle_command(command).await {
  61. Ok(_) => {}
  62. Err(e) => tracing::debug!("[RevQueue]: {}", e),
  63. }
  64. })
  65. .await;
  66. }
  67. async fn handle_command(&self, command: RevCommand) -> Result<(), FlowyError> {
  68. match command {
  69. RevCommand::RevisionData {
  70. data,
  71. object_md5: data_md5,
  72. ret,
  73. } => {
  74. let base_rev_id = self.rev_id_counter.value();
  75. let rev_id = self.rev_id_counter.next_id();
  76. let revision = Revision::new(&self.object_id, base_rev_id, rev_id, data, data_md5);
  77. let new_rev_id = self
  78. .rev_persistence
  79. .add_local_revision(revision, &self.rev_compress)
  80. .await?;
  81. self.rev_id_counter.set(new_rev_id);
  82. let _ = ret.send(Ok(new_rev_id));
  83. }
  84. }
  85. Ok(())
  86. }
  87. }
  88. pub(crate) type RevCommandSender = Sender<RevCommand>;
  89. pub(crate) type RevCommandReceiver = Receiver<RevCommand>;
  90. pub(crate) type Ret<T> = oneshot::Sender<Result<T, FlowyError>>;