cache.rs 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. use crate::{
  2. core::revision::{
  3. disk::{DocumentRevisionDiskCache, RevisionChangeset, RevisionTableState, SQLitePersistence},
  4. memory::{DocumentRevisionMemoryCache, RevisionMemoryCacheDelegate},
  5. },
  6. errors::FlowyError,
  7. };
  8. use flowy_collaboration::entities::revision::{Revision, RevisionRange, RevisionState};
  9. use flowy_database::ConnectionPool;
  10. use flowy_error::{internal_error, FlowyResult};
  11. use std::{
  12. borrow::Cow,
  13. sync::{
  14. atomic::{AtomicI64, Ordering::SeqCst},
  15. Arc,
  16. },
  17. };
  18. use tokio::task::spawn_blocking;
  19. pub struct DocumentRevisionCache {
  20. doc_id: String,
  21. disk_cache: Arc<dyn DocumentRevisionDiskCache<Error = FlowyError>>,
  22. memory_cache: Arc<DocumentRevisionMemoryCache>,
  23. latest_rev_id: AtomicI64,
  24. }
  25. impl DocumentRevisionCache {
  26. pub fn new(user_id: &str, doc_id: &str, pool: Arc<ConnectionPool>) -> DocumentRevisionCache {
  27. let disk_cache = Arc::new(SQLitePersistence::new(user_id, pool));
  28. let memory_cache = Arc::new(DocumentRevisionMemoryCache::new(doc_id, Arc::new(disk_cache.clone())));
  29. let doc_id = doc_id.to_owned();
  30. Self {
  31. doc_id,
  32. disk_cache,
  33. memory_cache,
  34. latest_rev_id: AtomicI64::new(0),
  35. }
  36. }
  37. pub async fn add(
  38. &self,
  39. revision: Revision,
  40. state: RevisionState,
  41. write_to_disk: bool,
  42. ) -> FlowyResult<RevisionRecord> {
  43. if self.memory_cache.contains(&revision.rev_id) {
  44. return Err(FlowyError::internal().context(format!("Duplicate revision: {} {:?}", revision.rev_id, state)));
  45. }
  46. let state = state.as_ref().clone();
  47. let rev_id = revision.rev_id;
  48. let record = RevisionRecord {
  49. revision,
  50. state,
  51. write_to_disk,
  52. };
  53. self.memory_cache.add(Cow::Borrowed(&record)).await;
  54. self.set_latest_rev_id(rev_id);
  55. Ok(record)
  56. }
  57. pub async fn ack(&self, rev_id: i64) { self.memory_cache.ack(&rev_id).await; }
  58. pub async fn get(&self, rev_id: i64) -> Option<RevisionRecord> {
  59. match self.memory_cache.get(&rev_id).await {
  60. None => match self.disk_cache.read_revision_records(&self.doc_id, Some(vec![rev_id])) {
  61. Ok(mut records) => {
  62. if !records.is_empty() {
  63. assert_eq!(records.len(), 1);
  64. }
  65. records.pop()
  66. },
  67. Err(e) => {
  68. tracing::error!("{}", e);
  69. None
  70. },
  71. },
  72. Some(revision) => Some(revision),
  73. }
  74. }
  75. pub fn batch_get(&self, doc_id: &str) -> FlowyResult<Vec<RevisionRecord>> {
  76. self.disk_cache.read_revision_records(doc_id, None)
  77. }
  78. pub async fn latest_revision(&self) -> Revision {
  79. let rev_id = self.latest_rev_id.load(SeqCst);
  80. self.get(rev_id).await.unwrap().revision
  81. }
  82. pub async fn revisions_in_range(&self, range: RevisionRange) -> FlowyResult<Vec<Revision>> {
  83. let mut records = self.memory_cache.get_with_range(&range).await?;
  84. let range_len = range.len() as usize;
  85. if records.len() != range_len {
  86. let disk_cache = self.disk_cache.clone();
  87. let doc_id = self.doc_id.clone();
  88. records = spawn_blocking(move || disk_cache.read_revision_records_with_range(&doc_id, &range))
  89. .await
  90. .map_err(internal_error)??;
  91. if records.len() != range_len {
  92. log::error!("Revisions len is not equal to range required");
  93. }
  94. }
  95. Ok(records
  96. .into_iter()
  97. .map(|record| record.revision)
  98. .collect::<Vec<Revision>>())
  99. }
  100. #[tracing::instrument(level = "debug", skip(self, doc_id, revisions))]
  101. pub async fn reset_document(&self, doc_id: &str, revisions: Vec<Revision>) -> FlowyResult<()> {
  102. let revision_records = revisions
  103. .to_vec()
  104. .into_iter()
  105. .map(|revision| RevisionRecord {
  106. revision,
  107. state: RevisionState::Local,
  108. write_to_disk: false,
  109. })
  110. .collect::<Vec<_>>();
  111. let _ = self.memory_cache.reset_with_revisions(&revision_records).await?;
  112. let _ = self.disk_cache.reset_document(doc_id, revision_records)?;
  113. Ok(())
  114. }
  115. #[inline]
  116. fn set_latest_rev_id(&self, rev_id: i64) {
  117. let _ = self.latest_rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id));
  118. }
  119. }
  120. impl RevisionMemoryCacheDelegate for Arc<SQLitePersistence> {
  121. #[tracing::instrument(level = "trace", skip(self, records), fields(checkpoint_result), err)]
  122. fn checkpoint_tick(&self, mut records: Vec<RevisionRecord>) -> FlowyResult<()> {
  123. let conn = &*self.pool.get().map_err(internal_error)?;
  124. records.retain(|record| record.write_to_disk);
  125. if !records.is_empty() {
  126. tracing::Span::current().record(
  127. "checkpoint_result",
  128. &format!("{} records were saved", records.len()).as_str(),
  129. );
  130. let _ = self.write_revision_records(records, &conn)?;
  131. }
  132. Ok(())
  133. }
  134. fn receive_ack(&self, doc_id: &str, rev_id: i64) {
  135. let changeset = RevisionChangeset {
  136. doc_id: doc_id.to_string(),
  137. rev_id: rev_id.into(),
  138. state: RevisionTableState::Ack,
  139. };
  140. match self.update_revision_record(vec![changeset]) {
  141. Ok(_) => {},
  142. Err(e) => tracing::error!("{}", e),
  143. }
  144. }
  145. }
  146. #[derive(Clone)]
  147. pub struct RevisionRecord {
  148. pub revision: Revision,
  149. pub state: RevisionState,
  150. pub write_to_disk: bool,
  151. }
  152. impl RevisionRecord {
  153. pub fn ack(&mut self) { self.state = RevisionState::Ack; }
  154. }