migration.rs 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. use std::sync::Arc;
  2. use chrono::NaiveDateTime;
  3. use diesel::{RunQueryDsl, SqliteConnection};
  4. use collab_integrate::RocksCollabDB;
  5. use flowy_error::FlowyResult;
  6. use flowy_sqlite::schema::user_data_migration_records;
  7. use flowy_sqlite::ConnectionPool;
  8. use crate::services::entities::Session;
  9. pub struct UserLocalDataMigration {
  10. session: Session,
  11. collab_db: Arc<RocksCollabDB>,
  12. sqlite_pool: Arc<ConnectionPool>,
  13. }
  14. impl UserLocalDataMigration {
  15. pub fn new(
  16. session: Session,
  17. collab_db: Arc<RocksCollabDB>,
  18. sqlite_pool: Arc<ConnectionPool>,
  19. ) -> Self {
  20. Self {
  21. session,
  22. collab_db,
  23. sqlite_pool,
  24. }
  25. }
  26. /// Executes a series of migrations.
  27. ///
  28. /// This function applies each migration in the `migrations` vector that hasn't already been executed.
  29. /// It retrieves the current migration records from the database, and for each migration in the `migrations` vector,
  30. /// checks whether it has already been run. If it hasn't, the function runs the migration and adds it to the list of applied migrations.
  31. ///
  32. /// The function does not apply a migration if its name is already in the list of applied migrations.
  33. /// If a migration name is duplicated, the function logs an error message and continues with the next migration.
  34. ///
  35. /// # Arguments
  36. ///
  37. /// * `migrations` - A vector of boxed dynamic `UserDataMigration` objects representing the migrations to be applied.
  38. ///
  39. pub fn run(self, migrations: Vec<Box<dyn UserDataMigration>>) -> FlowyResult<Vec<String>> {
  40. let mut applied_migrations = vec![];
  41. let conn = self.sqlite_pool.get()?;
  42. let record = get_all_records(&conn)?;
  43. let mut duplicated_names = vec![];
  44. for migration in migrations {
  45. if !record
  46. .iter()
  47. .any(|record| record.migration_name == migration.name())
  48. {
  49. let migration_name = migration.name().to_string();
  50. if !duplicated_names.contains(&migration_name) {
  51. migration.run(&self.session, &self.collab_db)?;
  52. applied_migrations.push(migration.name().to_string());
  53. save_record(&conn, &migration_name);
  54. duplicated_names.push(migration_name);
  55. } else {
  56. tracing::error!("Duplicated migration name: {}", migration_name);
  57. }
  58. }
  59. }
  60. Ok(applied_migrations)
  61. }
  62. }
  63. pub trait UserDataMigration {
  64. /// Migration with the same name will be skipped
  65. fn name(&self) -> &str;
  66. fn run(&self, user: &Session, collab_db: &Arc<RocksCollabDB>) -> FlowyResult<()>;
  67. }
  68. fn save_record(conn: &SqliteConnection, migration_name: &str) {
  69. let new_record = NewUserDataMigrationRecord {
  70. migration_name: migration_name.to_string(),
  71. };
  72. diesel::insert_into(user_data_migration_records::table)
  73. .values(&new_record)
  74. .execute(conn)
  75. .expect("Error inserting new migration record");
  76. }
  77. fn get_all_records(conn: &SqliteConnection) -> FlowyResult<Vec<UserDataMigrationRecord>> {
  78. Ok(
  79. user_data_migration_records::table
  80. .load::<UserDataMigrationRecord>(conn)
  81. .unwrap_or_default(),
  82. )
  83. }
  84. #[derive(Clone, Default, Queryable, Identifiable)]
  85. #[table_name = "user_data_migration_records"]
  86. pub struct UserDataMigrationRecord {
  87. pub id: i32,
  88. pub migration_name: String,
  89. pub executed_at: NaiveDateTime,
  90. }
  91. #[derive(Insertable)]
  92. #[table_name = "user_data_migration_records"]
  93. pub struct NewUserDataMigrationRecord {
  94. pub migration_name: String,
  95. }