memory.rs 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. use crate::services::doc::RevisionRecord;
  2. use dashmap::DashMap;
  3. use flowy_error::FlowyError;
  4. use lib_infra::future::FutureResult;
  5. use lib_ot::revision::RevisionRange;
  6. use std::sync::Arc;
  7. use tokio::sync::RwLock;
  8. pub(crate) trait RevisionMemoryCacheMissing: Send + Sync {
  9. fn get_revision_record(&self, doc_id: &str, rev_id: i64) -> Result<Option<RevisionRecord>, FlowyError>;
  10. fn get_revision_records_with_range(
  11. &self,
  12. doc_id: &str,
  13. range: RevisionRange,
  14. ) -> FutureResult<Vec<RevisionRecord>, FlowyError>;
  15. }
  16. pub(crate) struct RevisionMemoryCache {
  17. doc_id: String,
  18. revs_map: Arc<DashMap<i64, RevisionRecord>>,
  19. rev_loader: Arc<dyn RevisionMemoryCacheMissing>,
  20. revs_order: Arc<RwLock<Vec<i64>>>,
  21. }
  22. // TODO: remove outdated revisions to reduce memory usage
  23. impl RevisionMemoryCache {
  24. pub(crate) fn new(doc_id: &str, rev_loader: Arc<dyn RevisionMemoryCacheMissing>) -> Self {
  25. RevisionMemoryCache {
  26. doc_id: doc_id.to_owned(),
  27. revs_map: Arc::new(DashMap::new()),
  28. rev_loader,
  29. revs_order: Arc::new(RwLock::new(vec![])),
  30. }
  31. }
  32. pub(crate) async fn is_empty(&self) -> bool { self.revs_order.read().await.is_empty() }
  33. pub(crate) fn contains(&self, rev_id: &i64) -> bool { self.revs_map.contains_key(rev_id) }
  34. pub(crate) async fn add_revision(&self, record: &RevisionRecord) {
  35. if let Some(rev_id) = self.revs_order.read().await.last() {
  36. if *rev_id >= record.revision.rev_id {
  37. tracing::error!("Duplicated revision added to memory_cache");
  38. return;
  39. }
  40. }
  41. self.revs_map.insert(record.revision.rev_id, record.clone());
  42. self.revs_order.write().await.push(record.revision.rev_id);
  43. }
  44. pub(crate) async fn get_revision(&self, rev_id: &i64) -> Option<RevisionRecord> {
  45. match self.revs_map.get(&rev_id).map(|r| r.value().clone()) {
  46. None => match self.rev_loader.get_revision_record(&self.doc_id, *rev_id) {
  47. Ok(revision) => revision,
  48. Err(e) => {
  49. tracing::error!("{}", e);
  50. None
  51. },
  52. },
  53. Some(revision) => Some(revision),
  54. }
  55. }
  56. pub(crate) async fn get_revisions_in_range(
  57. &self,
  58. range: &RevisionRange,
  59. ) -> Result<Vec<RevisionRecord>, FlowyError> {
  60. let range_len = range.len() as usize;
  61. let revs = range
  62. .iter()
  63. .flat_map(|rev_id| self.revs_map.get(&rev_id).map(|record| record.clone()))
  64. .collect::<Vec<RevisionRecord>>();
  65. if revs.len() == range_len {
  66. Ok(revs)
  67. } else {
  68. let revs = self
  69. .rev_loader
  70. .get_revision_records_with_range(&self.doc_id, range.clone())
  71. .await?;
  72. if revs.len() != range_len {
  73. log::error!("Revisions len is not equal to range required");
  74. }
  75. Ok(revs)
  76. }
  77. }
  78. }