class Sidekiq::Fetcher
to assign to a ready Processor.
from the queues. It gets the message and hands it to the Manager
The Fetcher blocks on Redis, waiting for a message to process
#
def self.done!
Can't find a clean way to get the fetcher to just stop processing
Ugh. Say hello to a bloody hack.
def self.done! @done = true end
def self.done?
def self.done? @done end
def self.reset # testing only
def self.reset # testing only @done = nil end
def self.strategy
def self.strategy Sidekiq.options[:fetch] || BasicFetch end
def fetch
forever and we can't loop forever. Instead we reschedule
Because we have to shut down cleanly, we can't block
finishes a message.
then issues a new fetch request every time a Processor
request for each idle processor when Sidekiq starts and
Fetching is straightforward: the Manager makes a fetch
def fetch watchdog('Fetcher#fetch died') do return if Sidekiq::Fetcher.done? begin work = @strategy.retrieve_work ::Sidekiq.logger.info("Redis is online, #{Time.now.to_f - @down.to_f} sec downtime") if @down @down = nil if work @mgr.async.assign(work) else after(0) { fetch } end rescue => ex handle_fetch_exception(ex) end end end
def handle_fetch_exception(ex)
def handle_fetch_exception(ex) if !@down logger.error("Error fetching message: #{ex}") ex.backtrace.each do |bt| logger.error(bt) end end @down ||= Time.now pause after(0) { fetch } rescue Task::TerminatedError # If redis is down when we try to shut down, all the fetch backlog # raises these errors. Haven't been able to figure out what I'm doing wrong. end
def initialize(mgr, options)
def initialize(mgr, options) @down = nil @mgr = mgr @strategy = Fetcher.strategy.new(options) end
def pause
def pause sleep(TIMEOUT) end