class Aws::Rails::Middleware::ElasticBeanstalkSQSD

Middleware to handle requests from the SQS Daemon present on Elastic Beanstalk worker environments.

def _execute_job(job, job_name)

def _execute_job(job, job_name)
  ::ActiveJob::Base.execute(job)
rescue NameError => e
  @logger.error("Job #{job_name} could not resolve to a class that inherits from Active Job.")
  @logger.error("Error: #{e}")
  raise e
end

def _execute_periodic_task(job_name)

def _execute_periodic_task(job_name)
  job = job_name.constantize.new
  job.perform_now
rescue NameError => e
  @logger.error("Periodic task #{job_name} could not resolve to an Active Job class " \
                '- check the cron name spelling and set the path as / in cron.yaml.')
  @logger.error("Error: #{e}.")
  raise e
end

def app_runs_in_docker_container?

def app_runs_in_docker_container?
  @app_runs_in_docker_container ||= in_docker_container_with_cgroup1? || in_docker_container_with_cgroup2?
end

def build_default_docker_ips

rubocop:disable Metrics/AbcSize
def build_default_docker_ips
  default_gw_ips = ['172.17.0.1']
  if File.exist?('/proc/net/route')
    File.open('/proc/net/route').each_line do |line|
      fields = line.strip.split
      next if fields.size != 11
      # Destination == 0.0.0.0 and Flags & RTF_GATEWAY != 0
      next unless fields[1] == '00000000' && fields[3].hex.anybits?(0x2)
      default_gw_ips << IPAddr.new_ntoh([fields[2].hex].pack('L')).to_s
    end
  end
  default_gw_ips
end

def call(env)

def call(env)
  request = ::ActionDispatch::Request.new(env)
  # Pass through unless user agent is the SQS Daemon
  return @app.call(env) unless from_sqs_daemon?(request)
  @logger.debug('aws-sdk-rails middleware detected call from Elastic Beanstalk SQS Daemon.')
  # Only accept requests from this user agent if it is from localhost or a docker host in case of forgery.
  unless request.local? || sent_from_docker_host?(request)
    @logger.warn('SQSD request detected from untrusted address; returning 403 forbidden.')
    return forbidden_response
  end
  # Execute job or periodic task based on HTTP request context
  periodic_task?(request) ? execute_periodic_task(request) : execute_job(request)
end

def default_docker_ips

def default_docker_ips
  @default_docker_ips ||= build_default_docker_ips
end

def execute_job(request)

def execute_job(request)
  # Jobs queued from the SQS adapter contain the JSON message in the request body.
  job = ::ActiveSupport::JSON.decode(request.body.string)
  job_name = job['job_class']
  @logger.debug("Executing job: #{job_name}")
  _execute_job(job, job_name)
  [200, { 'Content-Type' => 'text/plain' }, ["Successfully ran job #{job_name}."]]
rescue NameError
  internal_error_response
end

def execute_periodic_task(request)

def execute_periodic_task(request)
  # The beanstalk worker SQS Daemon will add the 'X-Aws-Sqsd-Taskname' for periodic tasks set in cron.yaml.
  job_name = request.headers['X-Aws-Sqsd-Taskname']
  @logger.debug("Creating and executing periodic task: #{job_name}")
  _execute_periodic_task(job_name)
  [200, { 'Content-Type' => 'text/plain' }, ["Successfully ran periodic task #{job_name}."]]
rescue NameError
  internal_error_response
end

def forbidden_response

def forbidden_response
  message = 'Request with aws-sqsd user agent was made from untrusted address.'
  [403, { 'Content-Type' => 'text/plain' }, [message]]
end

def from_sqs_daemon?(request)

The beanstalk worker SQS Daemon sets a specific User-Agent headers that begins with 'aws-sqsd'.
def from_sqs_daemon?(request)
  current_user_agent = request.headers['User-Agent']
  !current_user_agent.nil? && current_user_agent.start_with?('aws-sqsd')
end

def in_docker_container_with_cgroup1?

def in_docker_container_with_cgroup1?
  File.exist?('/proc/1/cgroup') && File.read('/proc/1/cgroup') =~ %r{/docker/}
end

def in_docker_container_with_cgroup2?

def in_docker_container_with_cgroup2?
  File.exist?('/proc/self/mountinfo') && File.read('/proc/self/mountinfo') =~ %r{/docker/containers/}
end

def initialize(app)

def initialize(app)
  @app = app
  @logger = ::Rails.logger
end

def internal_error_response

def internal_error_response
  message = 'Failed to execute job - see Rails log for more details.'
  [500, { 'Content-Type' => 'text/plain' }, [message]]
end

def ip_originates_from_docker_host?(request)

def ip_originates_from_docker_host?(request)
  default_docker_ips.include?(request.remote_ip) ||
    default_docker_ips.include?(request.remote_addr)
end

def periodic_task?(request)

for periodic tasks set in cron.yaml.
The beanstalk worker SQS Daemon will add the custom 'X-Aws-Sqsd-Taskname' header
def periodic_task?(request)
  request.headers['X-Aws-Sqsd-Taskname'].present? && request.fullpath == '/'
end

def sent_from_docker_host?(request)

def sent_from_docker_host?(request)
  app_runs_in_docker_container? && ip_originates_from_docker_host?(request)
end