lib/aws/rails/sqs_active_job/executor.rb



# frozen_string_literal: true

require 'concurrent'

module Aws
  module Rails
    module SqsActiveJob
      # CLI runner for polling for SQS ActiveJobs
      class Executor

        DEFAULTS = {
           min_threads:     0,
           max_threads:     Concurrent.processor_count,
           auto_terminate:  true,
           idletime:        60, # 1 minute
           fallback_policy: :caller_runs # slow down the producer thread
        }.freeze

        def initialize(options = {})
          @executor = Concurrent::ThreadPoolExecutor.new(DEFAULTS.merge(options))
          @logger = options[:logger] || ActiveSupport::Logger.new(STDOUT)
        end

        # TODO: Consider catching the exception and sleeping instead of using :caller_runs
        def execute(message)
          @executor.post(message) do |message|
            begin
              job = JobRunner.new(message)
              @logger.info("Running job: #{job.id}[#{job.class_name}]")
              job.run
              message.delete
            rescue Aws::Json::ParseError => e
              @logger.error "Unable to parse message body: #{message.data.body}. Error: #{e}."
            rescue StandardError => e
              # message will not be deleted and will be retried
              job_msg = job ? "#{job.id}[#{job.class_name}]" : 'unknown job'
              @logger.info "Error processing job #{job_msg}: #{e}"
              @logger.debug e.backtrace.join("\n")
            end
          end
        end

        def shutdown(timeout=nil)
          @executor.shutdown
          clean_shutdown = @executor.wait_for_termination(timeout)
          if clean_shutdown
            @logger.info 'Clean shutdown complete.  All executing jobs finished.'
          else
            @logger.info "Timeout (#{timeout}) exceeded.  Some jobs may not have"\
              " finished cleanly.  Unfinished jobs will not be removed from"\
              " the queue and can be ru-run once their visibility timeout"\
              " passes."
          end
        end
      end
    end
  end
end