class Aws::Rails::SqsActiveJob::Poller
def poll
def poll queue_url = Aws::Rails::SqsActiveJob.config.queue_url_for(@options[:queue]) @logger.info "Polling on: #{@options[:queue]} => #{queue_url}" client = Aws::Rails::SqsActiveJob.config.client @poller = Aws::SQS::QueuePoller.new(queue_url, client: client) poller_options = { skip_delete: true, max_number_of_messages: @options[:max_messages], visibility_timeout: @options[:visibility_timeout] } # Limit max_number_of_messages for FIFO queues to 1 # this ensures jobs with the same message_group_id are processed # in order # Jobs with different message_group_id will be processed in # parallel and may be out of order. if Aws::Rails::SqsActiveJob.fifo?(queue_url) poller_options[:max_number_of_messages] = 1 end single_message = poller_options[:max_number_of_messages] == 1 @poller.poll(poller_options) do |msgs| msgs = [msgs] if single_message @logger.info "Processing batch of #{msgs.length} messages" msgs.each do |msg| @executor.execute(Aws::SQS::Message.new( queue_url: queue_url, receipt_handle: msg.receipt_handle, data: msg, client: client )) end end end