reset.rs 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. use crate::disk::{RevisionDiskCache, SyncRecord};
  2. use crate::{RevisionLoader, RevisionPersistence, RevisionPersistenceConfiguration};
  3. use bytes::Bytes;
  4. use flowy_error::{FlowyError, FlowyResult};
  5. use flowy_http_model::revision::Revision;
  6. use serde::{Deserialize, Serialize};
  7. use std::str::FromStr;
  8. use std::sync::Arc;
  9. pub trait RevisionResettable {
  10. fn target_id(&self) -> &str;
  11. // String in json format
  12. fn reset_data(&self, revisions: Vec<Revision>) -> FlowyResult<Bytes>;
  13. // String in json format
  14. fn default_target_rev_str(&self) -> FlowyResult<String>;
  15. fn read_record(&self) -> Option<String>;
  16. fn set_record(&self, record: String);
  17. }
  18. pub struct RevisionStructReset<T, C> {
  19. user_id: String,
  20. target: T,
  21. disk_cache: Arc<dyn RevisionDiskCache<C, Error = FlowyError>>,
  22. }
  23. impl<T, C> RevisionStructReset<T, C>
  24. where
  25. T: RevisionResettable,
  26. C: 'static,
  27. {
  28. pub fn new(user_id: &str, object: T, disk_cache: Arc<dyn RevisionDiskCache<C, Error = FlowyError>>) -> Self {
  29. Self {
  30. user_id: user_id.to_owned(),
  31. target: object,
  32. disk_cache,
  33. }
  34. }
  35. pub async fn run(&self) -> FlowyResult<()> {
  36. match self.target.read_record() {
  37. None => {
  38. self.reset_object().await?;
  39. self.save_migrate_record()?;
  40. }
  41. Some(s) => {
  42. let mut record = MigrationObjectRecord::from_str(&s).map_err(|e| FlowyError::serde().context(e))?;
  43. let rev_str = self.target.default_target_rev_str()?;
  44. if record.len < rev_str.len() {
  45. self.reset_object().await?;
  46. record.len = rev_str.len();
  47. self.target.set_record(record.to_string());
  48. }
  49. }
  50. }
  51. Ok(())
  52. }
  53. async fn reset_object(&self) -> FlowyResult<()> {
  54. let configuration = RevisionPersistenceConfiguration::new(2, false);
  55. let rev_persistence = Arc::new(RevisionPersistence::from_disk_cache(
  56. &self.user_id,
  57. self.target.target_id(),
  58. self.disk_cache.clone(),
  59. configuration,
  60. ));
  61. let revisions = RevisionLoader {
  62. object_id: self.target.target_id().to_owned(),
  63. user_id: self.user_id.clone(),
  64. cloud: None,
  65. rev_persistence,
  66. }
  67. .load_revisions()
  68. .await?;
  69. let bytes = self.target.reset_data(revisions)?;
  70. let revision = Revision::initial_revision(self.target.target_id(), bytes);
  71. let record = SyncRecord::new(revision);
  72. tracing::trace!("Reset {} revision record object", self.target.target_id());
  73. let _ = self
  74. .disk_cache
  75. .delete_and_insert_records(self.target.target_id(), None, vec![record]);
  76. Ok(())
  77. }
  78. fn save_migrate_record(&self) -> FlowyResult<()> {
  79. let rev_str = self.target.default_target_rev_str()?;
  80. let record = MigrationObjectRecord {
  81. object_id: self.target.target_id().to_owned(),
  82. len: rev_str.len(),
  83. };
  84. self.target.set_record(record.to_string());
  85. Ok(())
  86. }
  87. }
  88. #[derive(Serialize, Deserialize)]
  89. struct MigrationObjectRecord {
  90. object_id: String,
  91. len: usize,
  92. }
  93. impl FromStr for MigrationObjectRecord {
  94. type Err = serde_json::Error;
  95. fn from_str(s: &str) -> Result<Self, Self::Err> {
  96. serde_json::from_str::<MigrationObjectRecord>(s)
  97. }
  98. }
  99. impl ToString for MigrationObjectRecord {
  100. fn to_string(&self) -> String {
  101. serde_json::to_string(self).unwrap_or_else(|_| "".to_string())
  102. }
  103. }