rev_snapshot.rs 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. #![allow(clippy::all)]
  2. #![allow(dead_code)]
  3. #![allow(unused_variables)]
  4. use crate::{RevIdCounter, RevisionMergeable, RevisionObjectDeserializer, RevisionPersistence};
  5. use bytes::Bytes;
  6. use flowy_error::FlowyResult;
  7. use revision_model::Revision;
  8. use std::sync::atomic::AtomicI64;
  9. use std::sync::atomic::Ordering::SeqCst;
  10. use std::sync::Arc;
  11. pub trait RevisionSnapshotPersistence: Send + Sync {
  12. fn should_generate_snapshot_from_range(&self, start_rev_id: i64, current_rev_id: i64) -> bool {
  13. (current_rev_id - start_rev_id) >= AUTO_GEN_SNAPSHOT_PER_10_REVISION
  14. }
  15. fn write_snapshot(&self, rev_id: i64, data: Vec<u8>) -> FlowyResult<()>;
  16. fn read_snapshot(&self, rev_id: i64) -> FlowyResult<Option<RevisionSnapshotData>>;
  17. fn read_last_snapshot(&self) -> FlowyResult<Option<RevisionSnapshotData>>;
  18. }
  19. pub trait RevisionSnapshotDataGenerator: Send + Sync {
  20. fn generate_snapshot_data(&self) -> Option<RevisionSnapshotData>;
  21. }
  22. const AUTO_GEN_SNAPSHOT_PER_10_REVISION: i64 = 10;
  23. pub struct RevisionSnapshotController<Connection> {
  24. user_id: String,
  25. object_id: String,
  26. rev_snapshot_persistence: Arc<dyn RevisionSnapshotPersistence>,
  27. rev_snapshot_data: Option<Arc<dyn RevisionSnapshotDataGenerator>>,
  28. rev_id_counter: Arc<RevIdCounter>,
  29. rev_persistence: Arc<RevisionPersistence<Connection>>,
  30. rev_compress: Arc<dyn RevisionMergeable>,
  31. start_rev_id: AtomicI64,
  32. }
  33. impl<Connection> RevisionSnapshotController<Connection>
  34. where
  35. Connection: 'static,
  36. {
  37. pub fn new<D>(
  38. user_id: &str,
  39. object_id: &str,
  40. disk_cache: D,
  41. rev_id_counter: Arc<RevIdCounter>,
  42. revision_persistence: Arc<RevisionPersistence<Connection>>,
  43. revision_compress: Arc<dyn RevisionMergeable>,
  44. ) -> Self
  45. where
  46. D: RevisionSnapshotPersistence + 'static,
  47. {
  48. let rev_snapshot_persistence = Arc::new(disk_cache);
  49. Self {
  50. user_id: user_id.to_string(),
  51. object_id: object_id.to_string(),
  52. rev_snapshot_persistence,
  53. rev_id_counter,
  54. start_rev_id: AtomicI64::new(0),
  55. rev_snapshot_data: None,
  56. rev_persistence: revision_persistence,
  57. rev_compress: revision_compress,
  58. }
  59. }
  60. pub async fn set_snapshot_data_generator(
  61. &mut self,
  62. generator: Arc<dyn RevisionSnapshotDataGenerator>,
  63. ) {
  64. self.rev_snapshot_data = Some(generator);
  65. }
  66. pub async fn generate_snapshot(&self) {
  67. if let Some((rev_id, bytes)) = self.generate_snapshot_data() {
  68. if let Err(e) = self
  69. .rev_snapshot_persistence
  70. .write_snapshot(rev_id, bytes.to_vec())
  71. {
  72. tracing::error!("Save snapshot failed: {}", e);
  73. }
  74. }
  75. }
  76. /// Find the nearest revision base on the passed-in rev_id
  77. #[tracing::instrument(level = "trace", skip_all)]
  78. pub fn restore_from_snapshot<B>(&self, rev_id: i64) -> Option<(B::Output, Revision)>
  79. where
  80. B: RevisionObjectDeserializer,
  81. {
  82. tracing::info!("[Restore] Try to find if {} has snapshot", self.object_id);
  83. let snapshot = self.rev_snapshot_persistence.read_last_snapshot().ok()??;
  84. let snapshot_rev_id = snapshot.rev_id;
  85. let revision = Revision::new(
  86. &self.object_id,
  87. snapshot.base_rev_id,
  88. snapshot.rev_id,
  89. snapshot.data,
  90. "".to_owned(),
  91. );
  92. tracing::info!(
  93. "[Restore] Try to restore from snapshot: {}, {}",
  94. snapshot.base_rev_id,
  95. snapshot.rev_id
  96. );
  97. let object = B::deserialize_revisions(&self.object_id, vec![revision.clone()]).ok()?;
  98. tracing::info!(
  99. "[Restore] Restore {} from snapshot with rev_id: {}",
  100. self.object_id,
  101. snapshot_rev_id
  102. );
  103. Some((object, revision))
  104. }
  105. pub fn generate_snapshot_if_need(&self) {
  106. let current_rev_id = self.rev_id_counter.value();
  107. let start_rev_id = self.get_start_rev_id();
  108. if current_rev_id <= start_rev_id {
  109. return;
  110. }
  111. if self
  112. .rev_snapshot_persistence
  113. .should_generate_snapshot_from_range(start_rev_id, current_rev_id)
  114. {
  115. if let Some((rev_id, bytes)) = self.generate_snapshot_data() {
  116. let disk_cache = self.rev_snapshot_persistence.clone();
  117. tokio::spawn(async move {
  118. let _ = disk_cache.write_snapshot(rev_id, bytes.to_vec());
  119. });
  120. }
  121. self.set_start_rev_id(current_rev_id);
  122. }
  123. }
  124. fn generate_snapshot_data(&self) -> Option<(i64, Bytes)> {
  125. let revisions = self
  126. .rev_persistence
  127. .load_all_records(&self.object_id)
  128. .map(|records| {
  129. records
  130. .into_iter()
  131. .map(|record| record.revision)
  132. .collect::<Vec<Revision>>()
  133. })
  134. .ok()?;
  135. if revisions.is_empty() {
  136. return None;
  137. }
  138. let data = self.rev_compress.combine_revisions(revisions).ok()?;
  139. let rev_id = self.rev_id_counter.value();
  140. Some((rev_id, data))
  141. }
  142. fn get_start_rev_id(&self) -> i64 {
  143. self.start_rev_id.load(SeqCst)
  144. }
  145. fn set_start_rev_id(&self, rev_id: i64) {
  146. let _ = self
  147. .start_rev_id
  148. .fetch_update(SeqCst, SeqCst, |_| Some(rev_id));
  149. }
  150. }
  151. impl<Connection> std::ops::Deref for RevisionSnapshotController<Connection> {
  152. type Target = Arc<dyn RevisionSnapshotPersistence>;
  153. fn deref(&self) -> &Self::Target {
  154. &self.rev_snapshot_persistence
  155. }
  156. }
  157. #[derive(Debug, PartialEq, Eq, Clone)]
  158. pub struct RevisionSnapshotData {
  159. pub rev_id: i64,
  160. pub base_rev_id: i64,
  161. pub timestamp: i64,
  162. pub data: Bytes,
  163. }