rev_snapshot.rs 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. #![allow(clippy::all)]
  2. #![allow(dead_code)]
  3. #![allow(unused_variables)]
  4. use crate::{RevIdCounter, RevisionMergeable, RevisionObjectDeserializer, RevisionPersistence};
  5. use bytes::Bytes;
  6. use flowy_error::FlowyResult;
  7. use flowy_http_model::revision::Revision;
  8. use std::sync::atomic::AtomicI64;
  9. use std::sync::atomic::Ordering::SeqCst;
  10. use std::sync::Arc;
  11. pub trait RevisionSnapshotDiskCache: Send + Sync {
  12. fn should_generate_snapshot_from_range(&self, start_rev_id: i64, current_rev_id: i64) -> bool {
  13. (current_rev_id - start_rev_id) >= AUTO_GEN_SNAPSHOT_PER_10_REVISION
  14. }
  15. fn write_snapshot(&self, rev_id: i64, data: Vec<u8>) -> FlowyResult<()>;
  16. fn read_snapshot(&self, rev_id: i64) -> FlowyResult<Option<RevisionSnapshot>>;
  17. fn read_last_snapshot(&self) -> FlowyResult<Option<RevisionSnapshot>>;
  18. }
  19. /// Do nothing but just used to clam the rust compiler about the generic parameter `SP` of `RevisionManager`
  20. ///
  21. pub struct PhantomSnapshotPersistence();
  22. impl RevisionSnapshotDiskCache for PhantomSnapshotPersistence {
  23. fn write_snapshot(&self, rev_id: i64, data: Vec<u8>) -> FlowyResult<()> {
  24. Ok(())
  25. }
  26. fn read_snapshot(&self, rev_id: i64) -> FlowyResult<Option<RevisionSnapshot>> {
  27. Ok(None)
  28. }
  29. fn read_last_snapshot(&self) -> FlowyResult<Option<RevisionSnapshot>> {
  30. Ok(None)
  31. }
  32. }
  33. const AUTO_GEN_SNAPSHOT_PER_10_REVISION: i64 = 10;
  34. pub struct RevisionSnapshotController<Connection> {
  35. user_id: String,
  36. object_id: String,
  37. rev_snapshot_persistence: Arc<dyn RevisionSnapshotDiskCache>,
  38. rev_id_counter: Arc<RevIdCounter>,
  39. rev_persistence: Arc<RevisionPersistence<Connection>>,
  40. rev_compress: Arc<dyn RevisionMergeable>,
  41. start_rev_id: AtomicI64,
  42. }
  43. impl<Connection> RevisionSnapshotController<Connection>
  44. where
  45. Connection: 'static,
  46. {
  47. pub fn new<D>(
  48. user_id: &str,
  49. object_id: &str,
  50. disk_cache: D,
  51. rev_id_counter: Arc<RevIdCounter>,
  52. revision_persistence: Arc<RevisionPersistence<Connection>>,
  53. revision_compress: Arc<dyn RevisionMergeable>,
  54. ) -> Self
  55. where
  56. D: RevisionSnapshotDiskCache + 'static,
  57. {
  58. let disk_cache = Arc::new(disk_cache);
  59. Self {
  60. user_id: user_id.to_string(),
  61. object_id: object_id.to_string(),
  62. rev_snapshot_persistence: disk_cache,
  63. rev_id_counter,
  64. start_rev_id: AtomicI64::new(0),
  65. rev_persistence: revision_persistence,
  66. rev_compress: revision_compress,
  67. }
  68. }
  69. pub async fn generate_snapshot(&self) {
  70. if let Some((rev_id, bytes)) = self.generate_snapshot_data() {
  71. if let Err(e) = self.rev_snapshot_persistence.write_snapshot(rev_id, bytes.to_vec()) {
  72. tracing::error!("Save snapshot failed: {}", e);
  73. }
  74. }
  75. }
  76. /// Find the nearest revision base on the passed-in rev_id
  77. #[tracing::instrument(level = "trace", skip_all)]
  78. pub fn restore_from_snapshot<B>(&self, rev_id: i64) -> Option<(B::Output, Revision)>
  79. where
  80. B: RevisionObjectDeserializer,
  81. {
  82. tracing::info!("Try to find if {} has snapshot", self.object_id);
  83. let snapshot = self.rev_snapshot_persistence.read_last_snapshot().ok()??;
  84. let snapshot_rev_id = snapshot.rev_id;
  85. let revision = Revision::new(
  86. &self.object_id,
  87. snapshot.base_rev_id,
  88. snapshot.rev_id,
  89. snapshot.data,
  90. "".to_owned(),
  91. );
  92. tracing::info!(
  93. "Try to restore from snapshot: {}, {}",
  94. snapshot.base_rev_id,
  95. snapshot.rev_id
  96. );
  97. let object = B::deserialize_revisions(&self.object_id, vec![revision.clone()]).ok()?;
  98. tracing::info!(
  99. "Restore {} from snapshot with rev_id: {}",
  100. self.object_id,
  101. snapshot_rev_id
  102. );
  103. Some((object, revision))
  104. }
  105. pub fn generate_snapshot_if_need(&self) {
  106. let current_rev_id = self.rev_id_counter.value();
  107. let start_rev_id = self.get_start_rev_id();
  108. if current_rev_id <= start_rev_id {
  109. return;
  110. }
  111. if self
  112. .rev_snapshot_persistence
  113. .should_generate_snapshot_from_range(start_rev_id, current_rev_id)
  114. {
  115. if let Some((rev_id, bytes)) = self.generate_snapshot_data() {
  116. let disk_cache = self.rev_snapshot_persistence.clone();
  117. tokio::spawn(async move {
  118. let _ = disk_cache.write_snapshot(rev_id, bytes.to_vec());
  119. });
  120. }
  121. self.set_start_rev_id(current_rev_id);
  122. }
  123. }
  124. fn generate_snapshot_data(&self) -> Option<(i64, Bytes)> {
  125. let revisions = self
  126. .rev_persistence
  127. .load_all_records(&self.object_id)
  128. .map(|records| {
  129. records
  130. .into_iter()
  131. .map(|record| record.revision)
  132. .collect::<Vec<Revision>>()
  133. })
  134. .ok()?;
  135. if revisions.is_empty() {
  136. return None;
  137. }
  138. let data = self.rev_compress.combine_revisions(revisions).ok()?;
  139. let rev_id = self.rev_id_counter.value();
  140. Some((rev_id, data))
  141. }
  142. fn get_start_rev_id(&self) -> i64 {
  143. self.start_rev_id.load(SeqCst)
  144. }
  145. fn set_start_rev_id(&self, rev_id: i64) {
  146. let _ = self.start_rev_id.fetch_update(SeqCst, SeqCst, |_| Some(rev_id));
  147. }
  148. }
  149. impl<Connection> std::ops::Deref for RevisionSnapshotController<Connection> {
  150. type Target = Arc<dyn RevisionSnapshotDiskCache>;
  151. fn deref(&self) -> &Self::Target {
  152. &self.rev_snapshot_persistence
  153. }
  154. }
  155. #[derive(Debug, PartialEq, Eq, Clone)]
  156. pub struct RevisionSnapshot {
  157. pub rev_id: i64,
  158. pub base_rev_id: i64,
  159. pub timestamp: i64,
  160. pub data: Bytes,
  161. }