sync.rs 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. use crate::services::doc::revision::RevisionRecord;
  2. use dashmap::DashMap;
  3. use flowy_error::{FlowyError, FlowyResult};
  4. use lib_ot::errors::OTError;
  5. use std::{collections::VecDeque, sync::Arc};
  6. use tokio::sync::RwLock;
  7. pub struct RevisionSyncSeq {
  8. revs_map: Arc<DashMap<i64, RevisionRecord>>,
  9. local_revs: Arc<RwLock<VecDeque<i64>>>,
  10. }
  11. impl std::default::Default for RevisionSyncSeq {
  12. fn default() -> Self {
  13. let local_revs = Arc::new(RwLock::new(VecDeque::new()));
  14. RevisionSyncSeq {
  15. revs_map: Arc::new(DashMap::new()),
  16. local_revs,
  17. }
  18. }
  19. }
  20. impl RevisionSyncSeq {
  21. pub fn new() -> Self { RevisionSyncSeq::default() }
  22. pub async fn add_revision(&self, record: RevisionRecord) -> Result<(), OTError> {
  23. // The last revision's rev_id must be greater than the new one.
  24. if let Some(rev_id) = self.local_revs.read().await.back() {
  25. if *rev_id >= record.revision.rev_id {
  26. return Err(OTError::revision_id_conflict()
  27. .context(format!("The new revision's id must be greater than {}", rev_id)));
  28. }
  29. }
  30. self.local_revs.write().await.push_back(record.revision.rev_id);
  31. self.revs_map.insert(record.revision.rev_id, record);
  32. Ok(())
  33. }
  34. pub async fn ack_revision(&self, rev_id: &i64) -> FlowyResult<()> {
  35. if let Some(pop_rev_id) = self.next_sync_rev_id().await {
  36. if &pop_rev_id != rev_id {
  37. let desc = format!(
  38. "The ack rev_id:{} is not equal to the current rev_id:{}",
  39. rev_id, pop_rev_id
  40. );
  41. // tracing::error!("{}", desc);
  42. return Err(FlowyError::internal().context(desc));
  43. }
  44. tracing::debug!("pop revision {}", pop_rev_id);
  45. self.revs_map.remove(&pop_rev_id);
  46. let _ = self.local_revs.write().await.pop_front();
  47. }
  48. Ok(())
  49. }
  50. pub async fn next_sync_revision(&self) -> Option<(i64, RevisionRecord)> {
  51. match self.local_revs.read().await.front() {
  52. None => None,
  53. Some(rev_id) => self.revs_map.get(rev_id).map(|r| (*r.key(), r.value().clone())),
  54. }
  55. }
  56. pub async fn next_sync_rev_id(&self) -> Option<i64> { self.local_revs.read().await.front().copied() }
  57. }
  58. #[cfg(feature = "flowy_unit_test")]
  59. impl RevisionSyncSeq {
  60. pub fn revs_map(&self) -> Arc<DashMap<i64, RevisionRecord>> { self.revs_map.clone() }
  61. pub fn pending_revs(&self) -> Arc<RwLock<VecDeque<i64>>> { self.local_revs.clone() }
  62. }