class Sidekiq::Processor
chain and then calls Sidekiq::Worker#perform.
processes it. It instantiates the worker, runs the middleware
The Processor receives a message from the Manager and actually
#
def self.default_middleware
def self.default_middleware Middleware::Chain.new do |m| m.add Middleware::Server::Logging m.add Middleware::Server::RetryJobs if defined?(::ActiveRecord::Base) require 'sidekiq/middleware/server/active_record' m.add Sidekiq::Middleware::Server::ActiveRecord end end end
def cloned(ary)
the message fails, what is pushed back onto Redis hasn't
Deep clone the arguments passed to the worker so that if
def cloned(ary) Marshal.load(Marshal.dump(ary)) end
def initialize(boss)
def initialize(boss) @boss = boss end
def inspect
def inspect "<Processor##{object_id.to_s(16)}>" end
def process(work)
def process(work) msgstr = work.message queue = work.queue_name @boss.async.real_thread(proxy_id, Thread.current) ack = true begin msg = Sidekiq.load_json(msgstr) klass = msg['class'].constantize worker = klass.new worker.jid = msg['jid'] stats(worker, msg, queue) do Sidekiq.server_middleware.invoke(worker, msg, queue) do worker.perform(*cloned(msg['args'])) end end rescue Sidekiq::Shutdown # Had to force kill this job because it didn't finish # within the timeout. Don't acknowledge the work since # we didn't properly finish it. ack = false rescue Exception => ex handle_exception(ex, msg || { :message => msgstr }) raise ensure work.acknowledge if ack end @boss.async.processor_done(current_actor) end
def retry_and_suppress_exceptions(max_retries = 2)
If an exception occurs in the block passed to this method, that block will be retried up to max_retries times.
def retry_and_suppress_exceptions(max_retries = 2) retry_count = 0 begin yield rescue => e retry_count += 1 if retry_count <= max_retries Sidekiq.logger.debug {"Suppressing and retrying error: #{e.inspect}"} sleep(1) retry else handle_exception(e, { :message => "Exhausted #{max_retries} retries"}) end end end
def stats(worker, msg, queue)
def stats(worker, msg, queue) # Do not conflate errors from the job with errors caused by updating # stats so calling code can react appropriately retry_and_suppress_exceptions do hash = Sidekiq.dump_json({:queue => queue, :payload => msg, :run_at => Time.now.to_i }) Sidekiq.redis do |conn| conn.multi do conn.hmset("#{identity}:workers", thread_identity, hash) conn.expire("#{identity}:workers", 60*60*4) end end end begin yield rescue Exception retry_and_suppress_exceptions do Sidekiq.redis do |conn| failed = "stat:failed:#{Time.now.utc.to_date}" result = conn.multi do conn.incrby("stat:failed", 1) conn.incrby(failed, 1) end conn.expire(failed, STATS_TIMEOUT) if result.last == 1 end end raise ensure retry_and_suppress_exceptions do Sidekiq.redis do |conn| processed = "stat:processed:#{Time.now.utc.to_date}" result = conn.multi do conn.hdel("#{identity}:workers", thread_identity) conn.incrby("stat:processed", 1) conn.incrby(processed, 1) end conn.expire(processed, STATS_TIMEOUT) if result.last == 1 end end end end
def thread_identity
def thread_identity @str ||= Thread.current.object_id.to_s(36) end