class ReeMigrator::ApplyMigrations

def call(connection, migrations_yml_path, schema_migrations_path, data_migrations_path)

def call(connection, migrations_yml_path, schema_migrations_path, data_migrations_path)
  logger.info("Parsing migrations.yml from #{migrations_yml_path}")
  migrations = YAML.load(File.read(migrations_yml_path))
  return [] if is_blank(migrations)
  applied_schema_migrations = indexed_migrations(connection, SCHEMA)
  applied_data_migrations = indexed_migrations(connection, DATA)
  schema_migrations = Dir.glob(File.join(schema_migrations_path, RUBY_EXT))
  data_migrations  = Dir.glob(File.join(data_migrations_path, RUBY_EXT))
  migrations = migrations.map do |migration|
    if !migration.is_a?(Hash) || !(migration.keys - [SCHEMA, DATA]).empty?
      raise InvalidMigrationYmlErr.new(
        "Invalid migrations.yml. Example of valid format:\n- schema: SCHEMA_MIGRATION_FILE_NAME.rb\n- data: DATA_MIGRATION_FILE_NAME.rb"
      )
    end
    migration_path = if migration.has_key?(SCHEMA)
      run_migration(
        connection,
        :schema,
        migration[SCHEMA],
        applied_schema_migrations,
        schema_migrations,
        schema_migrations_path
      )
    elsif migration.has_key?(DATA)
      run_migration(
        connection,
        :data,
        migration[DATA],
        applied_data_migrations,
        data_migrations,
        data_migrations_path
      )
    end
    migration_path
  end.compact
  migrations
end

def indexed_migrations(connection, type)

def indexed_migrations(connection, type)
  migrations = connection[:migrations]
    .select(:filename)
    .where(type: type)
    .all
  index_by(migrations) { _1[:filename] }
end

def run_migration(connection, migration_type, migration_name, applied_migrations, migrations, migrations_path)

def run_migration(connection, migration_type, migration_name, applied_migrations, migrations, migrations_path)
  return nil if applied_migrations.include?(migration_name)
  migration_path = migrations.detect { _1.include?(migration_name) }
  raise MigrationNotFoundErr.new(
    "schema migration file for #{migration_name} not found in #{migrations_path}"
  ) if !migration_path
  apply_migration(connection, migration_path, migration_type)
  migration_path
end