rev_snapshot.rs 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. #![allow(clippy::all)]
  2. #![allow(dead_code)]
  3. #![allow(unused_variables)]
  4. use crate::{RevIdCounter, RevisionMergeable, RevisionObjectDeserializer, RevisionPersistence};
  5. use bytes::Bytes;
  6. use flowy_error::FlowyResult;
  7. use revision_model::Revision;
  8. use std::sync::atomic::AtomicI64;
  9. use std::sync::atomic::Ordering::SeqCst;
  10. use std::sync::Arc;
  11. pub trait RevisionSnapshotPersistence: Send + Sync {
  12. fn should_generate_snapshot_from_range(&self, start_rev_id: i64, current_rev_id: i64) -> bool {
  13. (current_rev_id - start_rev_id) >= AUTO_GEN_SNAPSHOT_PER_10_REVISION
  14. }
  15. fn write_snapshot(&self, rev_id: i64, data: Vec<u8>) -> FlowyResult<()>;
  16. fn read_snapshot(&self, rev_id: i64) -> FlowyResult<Option<RevisionSnapshotData>>;
  17. fn read_last_snapshot(&self) -> FlowyResult<Option<RevisionSnapshotData>>;
  18. // fn generate_snapshot_data(&self) -> Option<RevisionSnapshotData>;
  19. }
  20. const AUTO_GEN_SNAPSHOT_PER_10_REVISION: i64 = 10;
  21. pub struct RevisionSnapshotController<Connection> {
  22. user_id: String,
  23. object_id: String,
  24. rev_snapshot_persistence: Arc<dyn RevisionSnapshotPersistence>,
  25. rev_id_counter: Arc<RevIdCounter>,
  26. rev_persistence: Arc<RevisionPersistence<Connection>>,
  27. rev_compress: Arc<dyn RevisionMergeable>,
  28. start_rev_id: AtomicI64,
  29. }
  30. impl<Connection> RevisionSnapshotController<Connection>
  31. where
  32. Connection: 'static,
  33. {
  34. pub fn new<D>(
  35. user_id: &str,
  36. object_id: &str,
  37. disk_cache: D,
  38. rev_id_counter: Arc<RevIdCounter>,
  39. revision_persistence: Arc<RevisionPersistence<Connection>>,
  40. revision_compress: Arc<dyn RevisionMergeable>,
  41. ) -> Self
  42. where
  43. D: RevisionSnapshotPersistence + 'static,
  44. {
  45. let rev_snapshot_persistence = Arc::new(disk_cache);
  46. Self {
  47. user_id: user_id.to_string(),
  48. object_id: object_id.to_string(),
  49. rev_snapshot_persistence,
  50. rev_id_counter,
  51. start_rev_id: AtomicI64::new(0),
  52. rev_persistence: revision_persistence,
  53. rev_compress: revision_compress,
  54. }
  55. }
  56. pub async fn generate_snapshot(&self) {
  57. if let Some((rev_id, bytes)) = self.generate_snapshot_data() {
  58. if let Err(e) = self.rev_snapshot_persistence.write_snapshot(rev_id, bytes.to_vec()) {
  59. tracing::error!("Save snapshot failed: {}", e);
  60. }
  61. }
  62. }
  63. /// Find the nearest revision base on the passed-in rev_id
  64. #[tracing::instrument(level = "trace", skip_all)]
  65. pub fn restore_from_snapshot<B>(&self, rev_id: i64) -> Option<(B::Output, Revision)>
  66. where
  67. B: RevisionObjectDeserializer,
  68. {
  69. tracing::info!("Try to find if {} has snapshot", self.object_id);
  70. let snapshot = self.rev_snapshot_persistence.read_last_snapshot().ok()??;
  71. let snapshot_rev_id = snapshot.rev_id;
  72. let revision = Revision::new(
  73. &self.object_id,
  74. snapshot.base_rev_id,
  75. snapshot.rev_id,
  76. snapshot.data,
  77. "".to_owned(),
  78. );
  79. tracing::info!(
  80. "Try to restore from snapshot: {}, {}",
  81. snapshot.base_rev_id,
  82. snapshot.rev_id
  83. );
  84. let object = B::deserialize_revisions(&self.object_id, vec![revision.clone()]).ok()?;
  85. tracing::info!(
  86. "Restore {} from snapshot with rev_id: {}",
  87. self.object_id,
  88. snapshot_rev_id
  89. );
  90. Some((object, revision))
  91. }
  92. pub fn generate_snapshot_if_need(&self) {
  93. let current_rev_id = self.rev_id_counter.value();
  94. let start_rev_id = self.get_start_rev_id();
  95. if current_rev_id <= start_rev_id {
  96. return;
  97. }
  98. if self
  99. .rev_snapshot_persistence
  100. .should_generate_snapshot_from_range(start_rev_id, current_rev_id)
  101. {
  102. if let Some((rev_id, bytes)) = self.generate_snapshot_data() {
  103. let disk_cache = self.rev_snapshot_persistence.clone();
  104. tokio::spawn(async move {
  105. let _ = disk_cache.write_snapshot(rev_id, bytes.to_vec());
  106. });
  107. }
  108. self.set_start_rev_id(current_rev_id);
  109. }
  110. }
  111. fn generate_snapshot_data(&self) -> Option<(i64, Bytes)> {
  112. let revisions = self
  113. .rev_persistence
  114. .load_all_records(&self.object_id)
  115. .map(|records| {
  116. records
  117. .into_iter()
  118. .map(|record| record.revision)
  119. .collect::<Vec<Revision>>()
  120. })
  121. .ok()?;
  122. if revisions.is_empty() {
  123. return None;
  124. }
  125. let data = self.rev_compress.combine_revisions(revisions).ok()?;
  126. let rev_id = self.rev_id_counter.value();
  127. Some((rev_id, data))
  128. }
  129. fn get_start_rev_id(&self) -> i64 {
  130. self.start_rev_id.load(SeqCst)
  131. }
  132. fn set_start_rev_id(&self, rev_id: i64) {
  133. let _ = self.start_rev_id.fetch_update(SeqCst, SeqCst, |_| Some(rev_id));
  134. }
  135. }
  136. impl<Connection> std::ops::Deref for RevisionSnapshotController<Connection> {
  137. type Target = Arc<dyn RevisionSnapshotPersistence>;
  138. fn deref(&self) -> &Self::Target {
  139. &self.rev_snapshot_persistence
  140. }
  141. }
  142. #[derive(Debug, PartialEq, Eq, Clone)]
  143. pub struct RevisionSnapshotData {
  144. pub rev_id: i64,
  145. pub base_rev_id: i64,
  146. pub timestamp: i64,
  147. pub data: Bytes,
  148. }