migration.rs 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576
  1. use crate::services::delta_migration::DeltaRevisionMigration;
  2. use crate::services::rev_sqlite::{DeltaRevisionSql, SQLiteDocumentRevisionPersistence};
  3. use crate::DocumentDatabase;
  4. use bytes::Bytes;
  5. use flowy_client_sync::make_operations_from_revisions;
  6. use flowy_error::FlowyResult;
  7. use flowy_revision_persistence::{RevisionDiskCache, SyncRecord};
  8. use flowy_sqlite::kv::KV;
  9. use lib_infra::util::md5;
  10. use revision_model::Revision;
  11. use std::sync::Arc;
  12. const V1_MIGRATION: &str = "DOCUMENT_V1_MIGRATION";
  13. pub(crate) struct DocumentMigration {
  14. user_id: String,
  15. database: Arc<dyn DocumentDatabase>,
  16. }
  17. impl DocumentMigration {
  18. pub fn new(user_id: &str, database: Arc<dyn DocumentDatabase>) -> Self {
  19. let user_id = user_id.to_owned();
  20. Self { user_id, database }
  21. }
  22. pub fn run_v1_migration(&self) -> FlowyResult<()> {
  23. let key = migration_flag_key(&self.user_id, V1_MIGRATION);
  24. if KV::get_bool(&key) {
  25. return Ok(());
  26. }
  27. let pool = self.database.db_pool()?;
  28. let conn = &*pool.get()?;
  29. let disk_cache = SQLiteDocumentRevisionPersistence::new(&self.user_id, pool);
  30. let documents = DeltaRevisionSql::read_all_documents(&self.user_id, conn)?;
  31. tracing::debug!("[Migration]: try migrate {} documents", documents.len());
  32. for revisions in documents {
  33. if revisions.is_empty() {
  34. continue;
  35. }
  36. let document_id = revisions.first().unwrap().object_id.clone();
  37. if let Ok(delta) = make_operations_from_revisions(revisions) {
  38. match DeltaRevisionMigration::run(delta) {
  39. Ok(transaction) => {
  40. let bytes = Bytes::from(transaction.to_bytes()?);
  41. let md5 = format!("{:x}", md5::compute(&bytes));
  42. let revision = Revision::new(&document_id, 0, 1, bytes, md5);
  43. let record = SyncRecord::new(revision);
  44. match disk_cache.create_revision_records(vec![record]) {
  45. Ok(_) => {},
  46. Err(err) => {
  47. tracing::error!(
  48. "[Document Migration]: Save revisions to disk failed {:?}",
  49. err
  50. );
  51. },
  52. }
  53. },
  54. Err(err) => {
  55. tracing::error!(
  56. "[Document Migration]: Migrate revisions to transaction failed {:?}",
  57. err
  58. );
  59. },
  60. }
  61. }
  62. }
  63. //
  64. KV::set_bool(&key, true);
  65. Ok(())
  66. }
  67. }
  68. fn migration_flag_key(user_id: &str, version: &str) -> String {
  69. md5(format!("{}{}", user_id, version,))
  70. }