migration.rs 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. use crate::editor::DeltaRevisionMigration;
  2. use crate::DocumentDatabase;
  3. use bytes::Bytes;
  4. use flowy_database::kv::KV;
  5. use flowy_error::FlowyResult;
  6. use flowy_revision::disk::{DeltaRevisionSql, RevisionDiskCache, RevisionRecord, SQLiteDocumentRevisionPersistence};
  7. use flowy_sync::entities::revision::{md5, Revision};
  8. use flowy_sync::util::make_operations_from_revisions;
  9. use std::sync::Arc;
  10. const V1_MIGRATION: &str = "DOCUMENT_V1_MIGRATION";
  11. pub(crate) struct DocumentMigration {
  12. user_id: String,
  13. database: Arc<dyn DocumentDatabase>,
  14. }
  15. impl DocumentMigration {
  16. pub fn new(user_id: &str, database: Arc<dyn DocumentDatabase>) -> Self {
  17. let user_id = user_id.to_owned();
  18. Self { user_id, database }
  19. }
  20. pub fn run_v1_migration(&self) -> FlowyResult<()> {
  21. let key = migration_flag_key(&self.user_id, V1_MIGRATION);
  22. if KV::get_bool(&key) {
  23. return Ok(());
  24. }
  25. let pool = self.database.db_pool()?;
  26. let conn = &*pool.get()?;
  27. let disk_cache = SQLiteDocumentRevisionPersistence::new(&self.user_id, pool);
  28. let documents = DeltaRevisionSql::read_all_documents(&self.user_id, conn)?;
  29. tracing::debug!("[Document Migration]: try migrate {} documents", documents.len());
  30. for revisions in documents {
  31. if revisions.is_empty() {
  32. continue;
  33. }
  34. let document_id = revisions.first().unwrap().object_id.clone();
  35. match make_operations_from_revisions(revisions) {
  36. Ok(delta) => match DeltaRevisionMigration::run(delta) {
  37. Ok(transaction) => {
  38. let bytes = Bytes::from(transaction.to_bytes()?);
  39. let md5 = format!("{:x}", md5::compute(&bytes));
  40. let revision = Revision::new(&document_id, 0, 1, bytes, &self.user_id, md5);
  41. let record = RevisionRecord::new(revision);
  42. match disk_cache.create_revision_records(vec![record]) {
  43. Ok(_) => {}
  44. Err(err) => {
  45. tracing::error!("[Document Migration]: Save revisions to disk failed {:?}", err);
  46. }
  47. }
  48. }
  49. Err(err) => {
  50. tracing::error!(
  51. "[Document Migration]: Migrate revisions to transaction failed {:?}",
  52. err
  53. );
  54. }
  55. },
  56. Err(e) => {
  57. tracing::error!("[Document migration]: Make delta from revisions failed: {:?}", e);
  58. }
  59. }
  60. }
  61. //
  62. KV::set_bool(&key, true);
  63. tracing::debug!("Run document v1 migration");
  64. Ok(())
  65. }
  66. }
  67. fn migration_flag_key(user_id: &str, version: &str) -> String {
  68. md5(format!("{}{}", user_id, version,))
  69. }