migration.rs 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. use crate::services::delta_migration::DeltaRevisionMigration;
  2. use crate::services::rev_sqlite::{DeltaRevisionSql, SQLiteDocumentRevisionPersistence};
  3. use crate::DocumentDatabase;
  4. use bytes::Bytes;
  5. use flowy_database::kv::KV;
  6. use flowy_error::FlowyResult;
  7. use flowy_http_model::revision::Revision;
  8. use flowy_http_model::util::md5;
  9. use flowy_revision::disk::{RevisionDiskCache, SyncRecord};
  10. use flowy_sync::util::make_operations_from_revisions;
  11. use std::sync::Arc;
  12. const V1_MIGRATION: &str = "DOCUMENT_V1_MIGRATION";
  13. pub(crate) struct DocumentMigration {
  14. user_id: String,
  15. database: Arc<dyn DocumentDatabase>,
  16. }
  17. impl DocumentMigration {
  18. pub fn new(user_id: &str, database: Arc<dyn DocumentDatabase>) -> Self {
  19. let user_id = user_id.to_owned();
  20. Self { user_id, database }
  21. }
  22. pub fn run_v1_migration(&self) -> FlowyResult<()> {
  23. let key = migration_flag_key(&self.user_id, V1_MIGRATION);
  24. if KV::get_bool(&key) {
  25. return Ok(());
  26. }
  27. let pool = self.database.db_pool()?;
  28. let conn = &*pool.get()?;
  29. let disk_cache = SQLiteDocumentRevisionPersistence::new(&self.user_id, pool);
  30. let documents = DeltaRevisionSql::read_all_documents(&self.user_id, conn)?;
  31. tracing::debug!("[Document Migration]: try migrate {} documents", documents.len());
  32. for revisions in documents {
  33. if revisions.is_empty() {
  34. continue;
  35. }
  36. let document_id = revisions.first().unwrap().object_id.clone();
  37. match make_operations_from_revisions(revisions) {
  38. Ok(delta) => match DeltaRevisionMigration::run(delta) {
  39. Ok(transaction) => {
  40. let bytes = Bytes::from(transaction.to_bytes()?);
  41. let md5 = format!("{:x}", md5::compute(&bytes));
  42. let revision = Revision::new(&document_id, 0, 1, bytes, md5);
  43. let record = SyncRecord::new(revision);
  44. match disk_cache.create_revision_records(vec![record]) {
  45. Ok(_) => {}
  46. Err(err) => {
  47. tracing::error!("[Document Migration]: Save revisions to disk failed {:?}", err);
  48. }
  49. }
  50. }
  51. Err(err) => {
  52. tracing::error!(
  53. "[Document Migration]: Migrate revisions to transaction failed {:?}",
  54. err
  55. );
  56. }
  57. },
  58. Err(e) => {
  59. tracing::error!("[Document migration]: Make delta from revisions failed: {:?}", e);
  60. }
  61. }
  62. }
  63. //
  64. KV::set_bool(&key, true);
  65. tracing::debug!("Run document v1 migration");
  66. Ok(())
  67. }
  68. }
  69. fn migration_flag_key(user_id: &str, version: &str) -> String {
  70. md5(format!("{}{}", user_id, version,))
  71. }