class Sidekiq::Fetcher

to assign to a ready Processor.
from the queues. It gets the message and hands it to the Manager
The Fetcher blocks on Redis, waiting for a message to process
#

def self.done!

its mailbox when shutdown starts.
Can't find a clean way to get the fetcher to just stop processing
Ugh. Say hello to a bloody hack.
def self.done!
  @done = true
end

def self.done?

def self.done?
  @done
end

def self.reset # testing only

testing only
def self.reset # testing only
  @done = nil
end

def self.strategy

def self.strategy
  Sidekiq.options[:fetch] || BasicFetch
end

def fetch

a new fetch if the current fetch turned up nothing.
forever and we can't loop forever. Instead we reschedule
Because we have to shut down cleanly, we can't block

finishes a message.
then issues a new fetch request every time a Processor
request for each idle processor when Sidekiq starts and
Fetching is straightforward: the Manager makes a fetch
def fetch
  watchdog('Fetcher#fetch died') do
    return if Sidekiq::Fetcher.done?
    begin
      work = @strategy.retrieve_work
      ::Sidekiq.logger.info("Redis is online, #{Time.now.to_f - @down.to_f} sec downtime") if @down
      @down = nil
      if work
        @mgr.async.assign(work)
      else
        after(0) { fetch }
      end
    rescue => ex
      handle_fetch_exception(ex)
    end
  end
end

def handle_fetch_exception(ex)

def handle_fetch_exception(ex)
  if !@down
    logger.error("Error fetching message: #{ex}")
    ex.backtrace.each do |bt|
      logger.error(bt)
    end
  end
  @down ||= Time.now
  pause
  after(0) { fetch }
rescue Task::TerminatedError
  # If redis is down when we try to shut down, all the fetch backlog
  # raises these errors.  Haven't been able to figure out what I'm doing wrong.
end

def initialize(mgr, options)

def initialize(mgr, options)
  @down = nil
  @mgr = mgr
  @strategy = Fetcher.strategy.new(options)
end

def pause

def pause
  sleep(TIMEOUT)
end