class WolfCore::JobSchedulerDataSource
def at_expression(run_at)
def at_expression(run_at) time = case run_at when Time then run_at.utc when DateTime then run_at.to_time.utc when String then Time.parse(run_at).utc else raise ArgumentError, "run_at debe ser Time, DateTime o String ISO8601" end "at(#{time.strftime("%Y-%m-%dT%H:%M:%S")})" end
def build_options(config)
def build_options(config) { description: config[:description], group_name: config[:group_name] || default_group_name, timezone: config[:timezone] || default_timezone, client_token: config[:client_token] } end
def build_target(target_arn:, role_arn:, payload: nil)
def build_target(target_arn:, role_arn:, payload: nil) input = case payload when nil then nil when String then payload else payload.respond_to?(:to_json) ? payload.to_json : payload.to_s end target = { arn: target_arn, role_arn: role_arn } target[:input] = input if input target end
def cancel_schedule(job_id:, options: {})
def cancel_schedule(job_id:, options: {}) Result.try do response = @client.delete_schedule( name: job_id, group_name: options[:group_name] || default_group_name ) Result.success(data: { response: response }) end end
def create_schedule(name:, schedule_expression:, target:, options: {})
def create_schedule(name:, schedule_expression:, target:, options: {}) params = { name: name, group_name: options[:group_name], schedule_expression: schedule_expression, schedule_expression_timezone: options[:timezone], flexible_time_window: { mode: "OFF" }, target: target, description: options[:description], state: "ENABLED", client_token: options[:client_token] } @client.create_schedule(**params) end
def expression_from_schedule!(schedule)
def expression_from_schedule!(schedule) expr = schedule[:cron_expression] || schedule[:cron] || schedule[:rate_expression] || schedule[:rate] raise ArgumentError, "Se requiere schedule[:cron] o schedule[:rate]" unless expr expr end
def get_schedule(job_id:, options: {})
def get_schedule(job_id:, options: {}) Result.try do response = @client.get_schedule( name: job_id, group_name: options[:group_name] || default_group_name ) Result.success(data: { schedule: response }) end end
def initialize(region: "us-east-1", client: nil, default_group_name: "default", default_timezone: "UTC")
def initialize(region: "us-east-1", client: nil, default_group_name: "default", default_timezone: "UTC") @client = client || Aws::Scheduler::Client.new(region: region) @default_group_name = default_group_name @default_timezone = default_timezone end
def normalize_target(target)
def normalize_target(target) arn = target[:arn] || target[:target_arn] role = target[:role_arn] payload = target[:payload] raise ArgumentError, "target[:arn] o target[:target_arn] es requerido" unless arn raise ArgumentError, "target[:role_arn] es requerido" unless role build_target(target_arn: arn, role_arn: role, payload: payload) end
def schedule_at(job_id:, run_at:, target:, config: {})
def schedule_at(job_id:, run_at:, target:, config: {}) Result.try do job_id = shorten_job_id(job_id) if config[:can_shorten_job_id] expr = at_expression(run_at) normalized_target = normalize_target(target) response = schedule_with_expression(job_id: job_id, expr: expr, target: normalized_target, config: config) Result.success(data: { response: response }) end end
def schedule_recurring(job_id:, schedule:, target:, config: {})
def schedule_recurring(job_id:, schedule:, target:, config: {}) Result.try do job_id = shorten_job_id(job_id) if config[:can_shorten_job_id] expr = expression_from_schedule!(schedule) normalized_target = normalize_target(target) response = schedule_with_expression(job_id: job_id, expr: expr, target: normalized_target, config: config) Result.success(data: { response: response }) end end
def schedule_with_expression(job_id:, expr:, target:, config: {})
def schedule_with_expression(job_id:, expr:, target:, config: {}) options = build_options(config) create_schedule(name: job_id, schedule_expression: expr, target: target, options: options) end
def shorten_job_id(job_id)
def shorten_job_id(job_id) return job_id if job_id.length <= 64 hash = Digest::SHA1.hexdigest(job_id)[0, 8] prefix_length = 64 - 1 - hash.length # reserve space for '-' and hash prefix = job_id[0, prefix_length] "#{prefix}-#{hash}" end