class Aws::Rails::Middleware::ElasticBeanstalkSQSD
Middleware to handle requests from the SQS Daemon present on Elastic Beanstalk worker environments.
def _execute_job(job, job_name)
def _execute_job(job, job_name) ::ActiveJob::Base.execute(job) rescue NameError => e @logger.error("Job #{job_name} could not resolve to a class that inherits from Active Job.") @logger.error("Error: #{e}") raise e end
def _execute_periodic_task(job_name)
def _execute_periodic_task(job_name) job = job_name.constantize.new job.perform_now rescue NameError => e @logger.error("Periodic task #{job_name} could not resolve to an Active Job class " \ '- check the cron name spelling and set the path as / in cron.yaml.') @logger.error("Error: #{e}.") raise e end
def app_runs_in_docker_container?
def app_runs_in_docker_container? @app_runs_in_docker_container ||= in_docker_container_with_cgroup1? || in_docker_container_with_cgroup2? end
def build_default_docker_ips
def build_default_docker_ips default_gw_ips = ['172.17.0.1'] if File.exist?('/proc/net/route') File.open('/proc/net/route').each_line do |line| fields = line.strip.split next if fields.size != 11 # Destination == 0.0.0.0 and Flags & RTF_GATEWAY != 0 next unless fields[1] == '00000000' && fields[3].hex.anybits?(0x2) default_gw_ips << IPAddr.new_ntoh([fields[2].hex].pack('L')).to_s end end default_gw_ips end
def call(env)
def call(env) request = ::ActionDispatch::Request.new(env) # Pass through unless user agent is the SQS Daemon return @app.call(env) unless from_sqs_daemon?(request) @logger.debug('aws-sdk-rails middleware detected call from Elastic Beanstalk SQS Daemon.') # Only accept requests from this user agent if it is from localhost or a docker host in case of forgery. unless request.local? || sent_from_docker_host?(request) @logger.warn('SQSD request detected from untrusted address; returning 403 forbidden.') return forbidden_response end # Execute job or periodic task based on HTTP request context periodic_task?(request) ? execute_periodic_task(request) : execute_job(request) end
def default_docker_ips
def default_docker_ips @default_docker_ips ||= build_default_docker_ips end
def execute_job(request)
def execute_job(request) # Jobs queued from the SQS adapter contain the JSON message in the request body. job = ::ActiveSupport::JSON.decode(request.body.string) job_name = job['job_class'] @logger.debug("Executing job: #{job_name}") _execute_job(job, job_name) [200, { 'Content-Type' => 'text/plain' }, ["Successfully ran job #{job_name}."]] rescue NameError internal_error_response end
def execute_periodic_task(request)
def execute_periodic_task(request) # The beanstalk worker SQS Daemon will add the 'X-Aws-Sqsd-Taskname' for periodic tasks set in cron.yaml. job_name = request.headers['X-Aws-Sqsd-Taskname'] @logger.debug("Creating and executing periodic task: #{job_name}") _execute_periodic_task(job_name) [200, { 'Content-Type' => 'text/plain' }, ["Successfully ran periodic task #{job_name}."]] rescue NameError internal_error_response end
def forbidden_response
def forbidden_response message = 'Request with aws-sqsd user agent was made from untrusted address.' [403, { 'Content-Type' => 'text/plain' }, [message]] end
def from_sqs_daemon?(request)
def from_sqs_daemon?(request) current_user_agent = request.headers['User-Agent'] !current_user_agent.nil? && current_user_agent.start_with?('aws-sqsd') end
def in_docker_container_with_cgroup1?
def in_docker_container_with_cgroup1? File.exist?('/proc/1/cgroup') && File.read('/proc/1/cgroup') =~ %r{/docker/} end
def in_docker_container_with_cgroup2?
def in_docker_container_with_cgroup2? File.exist?('/proc/self/mountinfo') && File.read('/proc/self/mountinfo') =~ %r{/docker/containers/} end
def initialize(app)
def initialize(app) @app = app @logger = ::Rails.logger end
def internal_error_response
def internal_error_response message = 'Failed to execute job - see Rails log for more details.' [500, { 'Content-Type' => 'text/plain' }, [message]] end
def ip_originates_from_docker_host?(request)
def ip_originates_from_docker_host?(request) default_docker_ips.include?(request.remote_ip) || default_docker_ips.include?(request.remote_addr) end
def periodic_task?(request)
The beanstalk worker SQS Daemon will add the custom 'X-Aws-Sqsd-Taskname' header
def periodic_task?(request) request.headers['X-Aws-Sqsd-Taskname'].present? && request.fullpath == '/' end
def sent_from_docker_host?(request)
def sent_from_docker_host?(request) app_runs_in_docker_container? && ip_originates_from_docker_host?(request) end