class Sidekiq::Middleware::Server::RetryJobs
end
sidekiq_options :retry => 10
include Sidekiq::Worker
class MyWorker
or limit the number of retries for a particular worker with:
end
end
chain.add Middleware::Server::RetryJobs, :max_retries => 7
config.server_middleware do |chain|
Sidekiq.configure_server do |config|
adding the middleware using the options hash:
of retries. You can pass a value for the max number of retry attempts when
The default number of retry attempts is 25 which works out to about 3 weeks
to the job and everyone is using an error service, right?
We don’t store the backtrace by default as that can add a lot of overhead
* ‘backtrace’ - the number of lines of error backtrace to store
* ‘retried_at’ - the last time it was retried
* ‘failed_at’ - the first time it failed
* ‘error_class’ - the exception class
* ‘error_message’ - the message from the exception
* ‘retry_count’ - number of times we’ve retried so far.
* ‘queue’ - the queue to use
We’ll add a bit more data to the job to support retries:
‘true’, Sidekiq retries 25 times)
The job will be retried this number of times before giving up. (If simply
{ ‘class’ => ‘HardWorker’, ‘args’ => [1, 2, ‘foo’], ‘retry’ => 5 }
The ‘retry’ option also accepts a number (in place of ‘true’):
{ ‘class’ => ‘HardWorker’, ‘args’ => [1, 2, ‘foo’], ‘retry’ => true }
A job looks like:
5. After 6 months on the DJQ, Sidekiq will discard the job.
manually in the Web UI.
job to the Dead Job Queue (aka morgue) where it must be dealt with
4. once retries are exhausted, sidekiq will give up and move the
reprocessed successfully.
3. after a few days, a developer deploys a fix. the job is
an exponential delay, the job continues to fail
2. sidekiq retries jobs in the retry queue multiple times with
the job and pushes it onto a retry queue
1. bug causes job processing to fail, sidekiq’s middleware captures
0. push some code changes with a bug in it
Sidekiq’s retry support assumes a typical development lifecycle:
Automatically retry jobs that fail in Sidekiq.
#
def call(worker, msg, queue)
def call(worker, msg, queue) yield rescue Sidekiq::Shutdown # ignore, will be pushed back onto queue during hard_shutdown raise rescue Exception => e # ignore, will be pushed back onto queue during hard_shutdown raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e) raise e unless msg['retry'] max_retry_attempts = retry_attempts_from(msg['retry'], @max_retries) msg['queue'] = if msg['retry_queue'] msg['retry_queue'] else queue end # App code can stuff all sorts of crazy binary data into the error message # that won't convert to JSON. m = e.message[0..10_000] if m.respond_to?(:scrub!) m.force_encoding("utf-8") m.scrub! end msg['error_message'] = m msg['error_class'] = e.class.name count = if msg['retry_count'] msg['retried_at'] = Time.now.to_f msg['retry_count'] += 1 else msg['failed_at'] = Time.now.to_f msg['retry_count'] = 0 end if msg['backtrace'] == true msg['error_backtrace'] = e.backtrace elsif msg['backtrace'] == false # do nothing elsif msg['backtrace'].to_i != 0 msg['error_backtrace'] = e.backtrace[0..msg['backtrace'].to_i] end if count < max_retry_attempts delay = delay_for(worker, count) logger.debug { "Failure! Retry #{count} in #{delay} seconds" } retry_at = Time.now.to_f + delay payload = Sidekiq.dump_json(msg) Sidekiq.redis do |conn| conn.zadd('retry', retry_at.to_s, payload) end else # Goodbye dear message, you (re)tried your best I'm sure. retries_exhausted(worker, msg) end raise e end
def delay_for(worker, count)
def delay_for(worker, count) worker.sidekiq_retry_in_block? && retry_in(worker, count) || seconds_to_delay(count) end
def exception_caused_by_shutdown?(e)
def exception_caused_by_shutdown?(e) # In Ruby 2.1.0 only, check if exception is a result of shutdown. return false unless defined?(e.cause) e.cause.instance_of?(Sidekiq::Shutdown) || exception_caused_by_shutdown?(e.cause) end
def initialize(options = {})
def initialize(options = {}) @max_retries = options.fetch(:max_retries, DEFAULT_MAX_RETRY_ATTEMPTS) end
def retries_exhausted(worker, msg)
def retries_exhausted(worker, msg) logger.debug { "Dropping message after hitting the retry maximum: #{msg}" } begin if worker.sidekiq_retries_exhausted_block? worker.sidekiq_retries_exhausted_block.call(msg) end rescue => e handle_exception(e, { :context => "Error calling retries_exhausted for #{worker.class}", :job => msg }) end send_to_morgue(msg) unless msg['dead'] == false end
def retry_attempts_from(msg_retry, default)
def retry_attempts_from(msg_retry, default) if msg_retry.is_a?(Fixnum) msg_retry else default end end
def retry_in(worker, count)
def retry_in(worker, count) begin worker.sidekiq_retry_in_block.call(count) rescue Exception => e handle_exception(e, { :context => "Failure scheduling retry using the defined `sidekiq_retry_in` in #{worker.class.name}, falling back to default" }) nil end end
def seconds_to_delay(count)
def seconds_to_delay(count) (count ** 4) + 15 + (rand(30)*(count+1)) end
def send_to_morgue(msg)
def send_to_morgue(msg) Sidekiq.logger.info { "Adding dead #{msg['class']} job #{msg['jid']}" } payload = Sidekiq.dump_json(msg) now = Time.now.to_f Sidekiq.redis do |conn| conn.multi do conn.zadd('dead', now, payload) conn.zremrangebyscore('dead', '-inf', now - DeadSet::TIMEOUT) conn.zremrangebyrank('dead', 0, -DeadSet::MAX_JOBS) end end end