migration.rs 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. use crate::manager::GridUser;
  2. use crate::services::persistence::GridDatabase;
  3. use flowy_database::kv::KV;
  4. use flowy_error::FlowyResult;
  5. use flowy_grid_data_model::revision::GridRevision;
  6. use flowy_revision::disk::{RevisionRecord, SQLiteGridRevisionPersistence};
  7. use flowy_revision::{mk_grid_block_revision_disk_cache, RevisionLoader, RevisionPersistence};
  8. use flowy_sync::client_grid::{make_grid_rev_json_str, GridRevisionPad};
  9. use flowy_sync::entities::revision::Revision;
  10. use lib_ot::core::PlainTextDeltaBuilder;
  11. use serde::{Deserialize, Serialize};
  12. use std::str::FromStr;
  13. use std::sync::Arc;
  14. pub(crate) struct GridMigration {
  15. user: Arc<dyn GridUser>,
  16. database: Arc<dyn GridDatabase>,
  17. }
  18. impl GridMigration {
  19. pub fn new(user: Arc<dyn GridUser>, database: Arc<dyn GridDatabase>) -> Self {
  20. Self { user, database }
  21. }
  22. pub async fn migration_grid_if_need(&self, grid_id: &str) -> FlowyResult<()> {
  23. match KV::get_str(grid_id) {
  24. None => {
  25. let _ = self.reset_grid_rev(grid_id).await?;
  26. let _ = self.save_migrate_record(grid_id)?;
  27. }
  28. Some(s) => {
  29. let mut record = MigrationGridRecord::from_str(&s)?;
  30. let empty_json = self.empty_grid_rev_json()?;
  31. if record.len < empty_json.len() {
  32. let _ = self.reset_grid_rev(grid_id).await?;
  33. record.len = empty_json.len();
  34. KV::set_str(grid_id, record.to_string());
  35. }
  36. }
  37. }
  38. Ok(())
  39. }
  40. async fn reset_grid_rev(&self, grid_id: &str) -> FlowyResult<()> {
  41. let user_id = self.user.user_id()?;
  42. let pool = self.database.db_pool()?;
  43. let grid_rev_pad = self.get_grid_revision_pad(grid_id).await?;
  44. let json = grid_rev_pad.json_str()?;
  45. let delta_data = PlainTextDeltaBuilder::new().insert(&json).build().to_delta_bytes();
  46. let revision = Revision::initial_revision(&user_id, grid_id, delta_data);
  47. let record = RevisionRecord::new(revision);
  48. //
  49. let disk_cache = mk_grid_block_revision_disk_cache(&user_id, pool);
  50. let _ = disk_cache.delete_and_insert_records(grid_id, None, vec![record]);
  51. Ok(())
  52. }
  53. fn save_migrate_record(&self, grid_id: &str) -> FlowyResult<()> {
  54. let empty_json_str = self.empty_grid_rev_json()?;
  55. let record = MigrationGridRecord {
  56. grid_id: grid_id.to_owned(),
  57. len: empty_json_str.len(),
  58. };
  59. KV::set_str(grid_id, record.to_string());
  60. Ok(())
  61. }
  62. fn empty_grid_rev_json(&self) -> FlowyResult<String> {
  63. let empty_grid_rev = GridRevision::default();
  64. let empty_json = make_grid_rev_json_str(&empty_grid_rev)?;
  65. Ok(empty_json)
  66. }
  67. async fn get_grid_revision_pad(&self, grid_id: &str) -> FlowyResult<GridRevisionPad> {
  68. let pool = self.database.db_pool()?;
  69. let user_id = self.user.user_id()?;
  70. let disk_cache = SQLiteGridRevisionPersistence::new(&user_id, pool);
  71. let rev_persistence = Arc::new(RevisionPersistence::new(&user_id, grid_id, disk_cache));
  72. let (revisions, _) = RevisionLoader {
  73. object_id: grid_id.to_owned(),
  74. user_id,
  75. cloud: None,
  76. rev_persistence,
  77. }
  78. .load()
  79. .await?;
  80. let pad = GridRevisionPad::from_revisions(revisions)?;
  81. Ok(pad)
  82. }
  83. }
  84. #[derive(Serialize, Deserialize)]
  85. struct MigrationGridRecord {
  86. grid_id: String,
  87. len: usize,
  88. }
  89. impl FromStr for MigrationGridRecord {
  90. type Err = serde_json::Error;
  91. fn from_str(s: &str) -> Result<Self, Self::Err> {
  92. serde_json::from_str::<MigrationGridRecord>(s)
  93. }
  94. }
  95. impl ToString for MigrationGridRecord {
  96. fn to_string(&self) -> String {
  97. serde_json::to_string(self).unwrap_or_else(|_| "".to_string())
  98. }
  99. }