rev_snapshot.rs 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. #![allow(clippy::all)]
  2. #![allow(dead_code)]
  3. #![allow(unused_variables)]
  4. use crate::{RevIdCounter, RevisionMergeable, RevisionObjectDeserializer, RevisionPersistence};
  5. use bytes::Bytes;
  6. use flowy_error::FlowyResult;
  7. use revision_model::Revision;
  8. use std::sync::atomic::AtomicI64;
  9. use std::sync::atomic::Ordering::SeqCst;
  10. use std::sync::Arc;
  11. pub trait RevisionSnapshotPersistence: Send + Sync {
  12. fn should_generate_snapshot_from_range(&self, start_rev_id: i64, current_rev_id: i64) -> bool {
  13. (current_rev_id - start_rev_id) >= AUTO_GEN_SNAPSHOT_PER_10_REVISION
  14. }
  15. fn write_snapshot(&self, rev_id: i64, data: Vec<u8>) -> FlowyResult<()>;
  16. fn read_snapshot(&self, rev_id: i64) -> FlowyResult<Option<RevisionSnapshotData>>;
  17. fn read_last_snapshot(&self) -> FlowyResult<Option<RevisionSnapshotData>>;
  18. }
  19. pub trait RevisionSnapshotDataGenerator: Send + Sync {
  20. fn generate_snapshot_data(&self) -> Option<RevisionSnapshotData>;
  21. }
  22. const AUTO_GEN_SNAPSHOT_PER_10_REVISION: i64 = 10;
  23. pub struct RevisionSnapshotController<Connection> {
  24. object_id: String,
  25. rev_snapshot_persistence: Arc<dyn RevisionSnapshotPersistence>,
  26. rev_snapshot_data: Option<Arc<dyn RevisionSnapshotDataGenerator>>,
  27. rev_id_counter: Arc<RevIdCounter>,
  28. rev_persistence: Arc<RevisionPersistence<Connection>>,
  29. rev_compress: Arc<dyn RevisionMergeable>,
  30. start_rev_id: AtomicI64,
  31. }
  32. impl<Connection> RevisionSnapshotController<Connection>
  33. where
  34. Connection: 'static,
  35. {
  36. pub fn new<D>(
  37. object_id: &str,
  38. disk_cache: D,
  39. rev_id_counter: Arc<RevIdCounter>,
  40. revision_persistence: Arc<RevisionPersistence<Connection>>,
  41. revision_compress: Arc<dyn RevisionMergeable>,
  42. ) -> Self
  43. where
  44. D: RevisionSnapshotPersistence + 'static,
  45. {
  46. let rev_snapshot_persistence = Arc::new(disk_cache);
  47. Self {
  48. object_id: object_id.to_string(),
  49. rev_snapshot_persistence,
  50. rev_id_counter,
  51. start_rev_id: AtomicI64::new(0),
  52. rev_snapshot_data: None,
  53. rev_persistence: revision_persistence,
  54. rev_compress: revision_compress,
  55. }
  56. }
  57. pub async fn set_snapshot_data_generator(
  58. &mut self,
  59. generator: Arc<dyn RevisionSnapshotDataGenerator>,
  60. ) {
  61. self.rev_snapshot_data = Some(generator);
  62. }
  63. pub async fn generate_snapshot(&self) {
  64. if let Some((rev_id, bytes)) = self.generate_snapshot_data() {
  65. if let Err(e) = self
  66. .rev_snapshot_persistence
  67. .write_snapshot(rev_id, bytes.to_vec())
  68. {
  69. tracing::error!("Save snapshot failed: {}", e);
  70. }
  71. }
  72. }
  73. /// Find the nearest revision base on the passed-in rev_id
  74. #[tracing::instrument(level = "trace", skip_all)]
  75. pub fn restore_from_snapshot<B>(&self, rev_id: i64) -> Option<(B::Output, Revision)>
  76. where
  77. B: RevisionObjectDeserializer,
  78. {
  79. tracing::info!("[Restore] Try to find if {} has snapshot", self.object_id);
  80. let snapshot = self.rev_snapshot_persistence.read_last_snapshot().ok()??;
  81. let snapshot_rev_id = snapshot.rev_id;
  82. let revision = Revision::new(
  83. &self.object_id,
  84. snapshot.base_rev_id,
  85. snapshot.rev_id,
  86. snapshot.data,
  87. "".to_owned(),
  88. );
  89. tracing::info!(
  90. "[Restore] Try to restore from snapshot: {}, {}",
  91. snapshot.base_rev_id,
  92. snapshot.rev_id
  93. );
  94. let object = B::deserialize_revisions(&self.object_id, vec![revision.clone()]).ok()?;
  95. tracing::info!(
  96. "[Restore] Restore {} from snapshot with rev_id: {}",
  97. self.object_id,
  98. snapshot_rev_id
  99. );
  100. Some((object, revision))
  101. }
  102. pub fn generate_snapshot_if_need(&self) {
  103. let current_rev_id = self.rev_id_counter.value();
  104. let start_rev_id = self.get_start_rev_id();
  105. if current_rev_id <= start_rev_id {
  106. return;
  107. }
  108. if self
  109. .rev_snapshot_persistence
  110. .should_generate_snapshot_from_range(start_rev_id, current_rev_id)
  111. {
  112. if let Some((rev_id, bytes)) = self.generate_snapshot_data() {
  113. let disk_cache = self.rev_snapshot_persistence.clone();
  114. tokio::spawn(async move {
  115. let _ = disk_cache.write_snapshot(rev_id, bytes.to_vec());
  116. });
  117. }
  118. self.set_start_rev_id(current_rev_id);
  119. }
  120. }
  121. fn generate_snapshot_data(&self) -> Option<(i64, Bytes)> {
  122. let revisions = self
  123. .rev_persistence
  124. .load_all_records(&self.object_id)
  125. .map(|records| {
  126. records
  127. .into_iter()
  128. .map(|record| record.revision)
  129. .collect::<Vec<Revision>>()
  130. })
  131. .ok()?;
  132. if revisions.is_empty() {
  133. return None;
  134. }
  135. let data = self.rev_compress.combine_revisions(revisions).ok()?;
  136. let rev_id = self.rev_id_counter.value();
  137. Some((rev_id, data))
  138. }
  139. fn get_start_rev_id(&self) -> i64 {
  140. self.start_rev_id.load(SeqCst)
  141. }
  142. fn set_start_rev_id(&self, rev_id: i64) {
  143. let _ = self
  144. .start_rev_id
  145. .fetch_update(SeqCst, SeqCst, |_| Some(rev_id));
  146. }
  147. }
  148. impl<Connection> std::ops::Deref for RevisionSnapshotController<Connection> {
  149. type Target = Arc<dyn RevisionSnapshotPersistence>;
  150. fn deref(&self) -> &Self::Target {
  151. &self.rev_snapshot_persistence
  152. }
  153. }
  154. #[derive(Debug, PartialEq, Eq, Clone)]
  155. pub struct RevisionSnapshotData {
  156. pub rev_id: i64,
  157. pub base_rev_id: i64,
  158. pub timestamp: i64,
  159. pub data: Bytes,
  160. }