mod.rs 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. mod disk;
  2. mod memory;
  3. use crate::cache::{
  4. disk::{RevisionChangeset, RevisionDiskCache, RevisionTableState, SQLitePersistence},
  5. memory::{RevisionMemoryCache, RevisionMemoryCacheDelegate},
  6. };
  7. use flowy_collaboration::entities::revision::{Revision, RevisionRange, RevisionState};
  8. use flowy_database::ConnectionPool;
  9. use flowy_error::{internal_error, FlowyError, FlowyResult};
  10. use std::{
  11. borrow::Cow,
  12. sync::{
  13. atomic::{AtomicI64, Ordering::SeqCst},
  14. Arc,
  15. },
  16. };
  17. use tokio::task::spawn_blocking;
  18. pub const REVISION_WRITE_INTERVAL_IN_MILLIS: u64 = 600;
  19. pub struct RevisionCache {
  20. object_id: String,
  21. disk_cache: Arc<dyn RevisionDiskCache<Error = FlowyError>>,
  22. memory_cache: Arc<RevisionMemoryCache>,
  23. latest_rev_id: AtomicI64,
  24. }
  25. pub fn mk_revision_disk_cache(
  26. user_id: &str,
  27. pool: Arc<ConnectionPool>,
  28. ) -> Arc<dyn RevisionDiskCache<Error = FlowyError>> {
  29. Arc::new(SQLitePersistence::new(user_id, pool))
  30. }
  31. impl RevisionCache {
  32. pub fn new(user_id: &str, object_id: &str, pool: Arc<ConnectionPool>) -> RevisionCache {
  33. let disk_cache = Arc::new(SQLitePersistence::new(user_id, pool));
  34. let memory_cache = Arc::new(RevisionMemoryCache::new(object_id, Arc::new(disk_cache.clone())));
  35. let object_id = object_id.to_owned();
  36. Self {
  37. object_id,
  38. disk_cache,
  39. memory_cache,
  40. latest_rev_id: AtomicI64::new(0),
  41. }
  42. }
  43. pub async fn add(&self, revision: Revision, state: RevisionState, write_to_disk: bool) -> FlowyResult<()> {
  44. if self.memory_cache.contains(&revision.rev_id) {
  45. return Err(FlowyError::internal().context(format!("Duplicate revision: {} {:?}", revision.rev_id, state)));
  46. }
  47. let state = state.as_ref().clone();
  48. let rev_id = revision.rev_id;
  49. let record = RevisionRecord {
  50. revision,
  51. state,
  52. write_to_disk,
  53. };
  54. self.memory_cache.add(Cow::Owned(record)).await;
  55. self.set_latest_rev_id(rev_id);
  56. Ok(())
  57. }
  58. pub async fn ack(&self, rev_id: i64) {
  59. self.memory_cache.ack(&rev_id).await;
  60. }
  61. pub async fn get(&self, rev_id: i64) -> Option<RevisionRecord> {
  62. match self.memory_cache.get(&rev_id).await {
  63. None => match self
  64. .disk_cache
  65. .read_revision_records(&self.object_id, Some(vec![rev_id]))
  66. {
  67. Ok(mut records) => {
  68. let record = records.pop()?;
  69. assert!(records.is_empty());
  70. Some(record)
  71. }
  72. Err(e) => {
  73. tracing::error!("{}", e);
  74. None
  75. }
  76. },
  77. Some(revision) => Some(revision),
  78. }
  79. }
  80. pub fn batch_get(&self, doc_id: &str) -> FlowyResult<Vec<RevisionRecord>> {
  81. self.disk_cache.read_revision_records(doc_id, None)
  82. }
  83. pub async fn revisions_in_range(&self, range: RevisionRange) -> FlowyResult<Vec<Revision>> {
  84. let mut records = self.memory_cache.get_with_range(&range).await?;
  85. let range_len = range.len() as usize;
  86. if records.len() != range_len {
  87. let disk_cache = self.disk_cache.clone();
  88. let doc_id = self.object_id.clone();
  89. records = spawn_blocking(move || disk_cache.read_revision_records_with_range(&doc_id, &range))
  90. .await
  91. .map_err(internal_error)??;
  92. if records.len() != range_len {
  93. tracing::error!("Revisions len is not equal to range required");
  94. }
  95. }
  96. Ok(records
  97. .into_iter()
  98. .map(|record| record.revision)
  99. .collect::<Vec<Revision>>())
  100. }
  101. #[tracing::instrument(level = "debug", skip(self, doc_id, revisions))]
  102. pub async fn reset_with_revisions(&self, doc_id: &str, revisions: Vec<Revision>) -> FlowyResult<()> {
  103. let revision_records = revisions
  104. .to_vec()
  105. .into_iter()
  106. .map(|revision| RevisionRecord {
  107. revision,
  108. state: RevisionState::Sync,
  109. write_to_disk: false,
  110. })
  111. .collect::<Vec<_>>();
  112. let _ = self.memory_cache.reset_with_revisions(&revision_records).await?;
  113. let _ = self.disk_cache.reset_object(doc_id, revision_records)?;
  114. Ok(())
  115. }
  116. #[inline]
  117. fn set_latest_rev_id(&self, rev_id: i64) {
  118. let _ = self.latest_rev_id.fetch_update(SeqCst, SeqCst, |_e| Some(rev_id));
  119. }
  120. }
  121. impl RevisionMemoryCacheDelegate for Arc<SQLitePersistence> {
  122. #[tracing::instrument(level = "trace", skip(self, records), fields(checkpoint_result), err)]
  123. fn checkpoint_tick(&self, mut records: Vec<RevisionRecord>) -> FlowyResult<()> {
  124. let conn = &*self.pool.get().map_err(internal_error)?;
  125. records.retain(|record| record.write_to_disk);
  126. if !records.is_empty() {
  127. tracing::Span::current().record(
  128. "checkpoint_result",
  129. &format!("{} records were saved", records.len()).as_str(),
  130. );
  131. let _ = self.write_revision_records(records, conn)?;
  132. }
  133. Ok(())
  134. }
  135. fn receive_ack(&self, object_id: &str, rev_id: i64) {
  136. let changeset = RevisionChangeset {
  137. object_id: object_id.to_string(),
  138. rev_id: rev_id.into(),
  139. state: RevisionTableState::Ack,
  140. };
  141. match self.update_revision_record(vec![changeset]) {
  142. Ok(_) => {}
  143. Err(e) => tracing::error!("{}", e),
  144. }
  145. }
  146. }
  147. #[derive(Clone)]
  148. pub struct RevisionRecord {
  149. pub revision: Revision,
  150. pub state: RevisionState,
  151. pub write_to_disk: bool,
  152. }
  153. impl RevisionRecord {
  154. pub fn ack(&mut self) {
  155. self.state = RevisionState::Ack;
  156. }
  157. }