memory.rs 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144
  1. use crate::{RevisionRecord, REVISION_WRITE_INTERVAL_IN_MILLIS};
  2. use dashmap::DashMap;
  3. use flowy_collaboration::entities::revision::RevisionRange;
  4. use flowy_error::{FlowyError, FlowyResult};
  5. use std::{borrow::Cow, sync::Arc, time::Duration};
  6. use tokio::{sync::RwLock, task::JoinHandle};
  7. pub(crate) trait RevisionMemoryCacheDelegate: Send + Sync {
  8. fn checkpoint_tick(&self, records: Vec<RevisionRecord>) -> FlowyResult<()>;
  9. fn receive_ack(&self, object_id: &str, rev_id: i64);
  10. }
  11. pub(crate) struct RevisionMemoryCache {
  12. object_id: String,
  13. revs_map: Arc<DashMap<i64, RevisionRecord>>,
  14. delegate: Arc<dyn RevisionMemoryCacheDelegate>,
  15. pending_write_revs: Arc<RwLock<Vec<i64>>>,
  16. defer_save: RwLock<Option<JoinHandle<()>>>,
  17. }
  18. impl RevisionMemoryCache {
  19. pub(crate) fn new(object_id: &str, delegate: Arc<dyn RevisionMemoryCacheDelegate>) -> Self {
  20. RevisionMemoryCache {
  21. object_id: object_id.to_owned(),
  22. revs_map: Arc::new(DashMap::new()),
  23. delegate,
  24. pending_write_revs: Arc::new(RwLock::new(vec![])),
  25. defer_save: RwLock::new(None),
  26. }
  27. }
  28. pub(crate) fn contains(&self, rev_id: &i64) -> bool {
  29. self.revs_map.contains_key(rev_id)
  30. }
  31. pub(crate) async fn add<'a>(&'a self, record: Cow<'a, RevisionRecord>) {
  32. let record = match record {
  33. Cow::Borrowed(record) => record.clone(),
  34. Cow::Owned(record) => record,
  35. };
  36. let rev_id = record.revision.rev_id;
  37. self.revs_map.insert(rev_id, record);
  38. let mut write_guard = self.pending_write_revs.write().await;
  39. if !write_guard.contains(&rev_id) {
  40. write_guard.push(rev_id);
  41. drop(write_guard);
  42. self.make_checkpoint().await;
  43. }
  44. }
  45. pub(crate) async fn ack(&self, rev_id: &i64) {
  46. match self.revs_map.get_mut(rev_id) {
  47. None => {}
  48. Some(mut record) => record.ack(),
  49. }
  50. if self.pending_write_revs.read().await.contains(rev_id) {
  51. self.make_checkpoint().await;
  52. } else {
  53. // The revision must be saved on disk if the pending_write_revs
  54. // doesn't contains the rev_id.
  55. self.delegate.receive_ack(&self.object_id, *rev_id);
  56. }
  57. }
  58. pub(crate) async fn get(&self, rev_id: &i64) -> Option<RevisionRecord> {
  59. self.revs_map.get(rev_id).map(|r| r.value().clone())
  60. }
  61. pub(crate) fn remove(&self, rev_id: &i64) {
  62. let _ = self.revs_map.remove(rev_id);
  63. }
  64. pub(crate) fn remove_with_range(&self, range: &RevisionRange) {
  65. for rev_id in range.iter() {
  66. self.remove(&rev_id);
  67. }
  68. }
  69. pub(crate) async fn get_with_range(&self, range: &RevisionRange) -> Result<Vec<RevisionRecord>, FlowyError> {
  70. let revs = range
  71. .iter()
  72. .flat_map(|rev_id| self.revs_map.get(&rev_id).map(|record| record.clone()))
  73. .collect::<Vec<RevisionRecord>>();
  74. Ok(revs)
  75. }
  76. pub(crate) async fn reset_with_revisions(&self, revision_records: Vec<RevisionRecord>) {
  77. self.revs_map.clear();
  78. if let Some(handler) = self.defer_save.write().await.take() {
  79. handler.abort();
  80. }
  81. let mut write_guard = self.pending_write_revs.write().await;
  82. write_guard.clear();
  83. for record in revision_records {
  84. write_guard.push(record.revision.rev_id);
  85. self.revs_map.insert(record.revision.rev_id, record);
  86. }
  87. drop(write_guard);
  88. self.make_checkpoint().await;
  89. }
  90. async fn make_checkpoint(&self) {
  91. // https://github.com/async-graphql/async-graphql/blob/ed8449beec3d9c54b94da39bab33cec809903953/src/dataloader/mod.rs#L362
  92. if let Some(handler) = self.defer_save.write().await.take() {
  93. handler.abort();
  94. }
  95. if self.pending_write_revs.read().await.is_empty() {
  96. return;
  97. }
  98. let rev_map = self.revs_map.clone();
  99. let pending_write_revs = self.pending_write_revs.clone();
  100. let delegate = self.delegate.clone();
  101. *self.defer_save.write().await = Some(tokio::spawn(async move {
  102. tokio::time::sleep(Duration::from_millis(REVISION_WRITE_INTERVAL_IN_MILLIS)).await;
  103. let mut revs_write_guard = pending_write_revs.write().await;
  104. // It may cause performance issues because we hold the write lock of the
  105. // rev_order and the lock will be released after the checkpoint has been written
  106. // to the disk.
  107. //
  108. // Use saturating_sub and split_off ?
  109. // https://stackoverflow.com/questions/28952411/what-is-the-idiomatic-way-to-pop-the-last-n-elements-in-a-mutable-vec
  110. let mut save_records: Vec<RevisionRecord> = vec![];
  111. revs_write_guard.iter().for_each(|rev_id| match rev_map.get(rev_id) {
  112. None => {}
  113. Some(value) => {
  114. save_records.push(value.value().clone());
  115. }
  116. });
  117. if delegate.checkpoint_tick(save_records).is_ok() {
  118. revs_write_guard.clear();
  119. drop(revs_write_guard);
  120. }
  121. }));
  122. }
  123. }