lib/aws/rails/sqs_active_job/executor.rb
# frozen_string_literal: true require 'concurrent' module Aws module Rails module SqsActiveJob # CLI runner for polling for SQS ActiveJobs class Executor DEFAULTS = { min_threads: 0, max_threads: Integer(Concurrent.available_processor_count || Concurrent.processor_count), auto_terminate: true, idletime: 60, # 1 minute fallback_policy: :abort # Concurrent::RejectedExecutionError must be handled }.freeze def initialize(options = {}) @executor = Concurrent::ThreadPoolExecutor.new(DEFAULTS.merge(options)) @retry_standard_errors = options[:retry_standard_errors] @logger = options[:logger] || ActiveSupport::Logger.new($stdout) @task_complete = Concurrent::Event.new end def execute(message) post_task(message) rescue Concurrent::RejectedExecutionError # no capacity, wait for a task to complete @task_complete.reset @task_complete.wait retry end def shutdown(timeout = nil) @executor.shutdown clean_shutdown = @executor.wait_for_termination(timeout) if clean_shutdown @logger.info 'Clean shutdown complete. All executing jobs finished.' else @logger.info "Timeout (#{timeout}) exceeded. Some jobs may not have " \ 'finished cleanly. Unfinished jobs will not be removed from ' \ 'the queue and can be ru-run once their visibility timeout ' \ 'passes.' end end private def post_task(message) @executor.post(message) do |message| job = JobRunner.new(message) @logger.info("Running job: #{job.id}[#{job.class_name}]") job.run message.delete rescue Aws::Json::ParseError => e @logger.error "Unable to parse message body: #{message.data.body}. Error: #{e}." rescue StandardError => e job_msg = job ? "#{job.id}[#{job.class_name}]" : 'unknown job' @logger.info "Error processing job #{job_msg}: #{e}" @logger.debug e.backtrace.join("\n") if @retry_standard_errors && !job.exception_executions? @logger.info( 'retry_standard_errors is enabled and job has not ' \ "been retried by Rails. Leaving #{job_msg} in the queue." ) else message.delete end ensure @task_complete.set end end end end end end