class Sidekiq::Processor

chain and then calls Sidekiq::Worker#perform.
processes it. It instantiates the worker, runs the middleware
The Processor receives a message from the Manager and actually
#

def self.default_middleware

def self.default_middleware
  Middleware::Chain.new do |m|
    m.add Middleware::Server::Logging
    m.add Middleware::Server::RetryJobs
    m.add Middleware::Server::ActiveRecord
    m.add Middleware::Server::Timeout
  end
end

def cloned(ary)

been mutated by the worker.
the message fails, what is pushed back onto Redis hasn't
Clone the arguments passed to the worker so that if
def cloned(ary)
  ary.map do |val|
    SINGLETON_CLASSES.include?(val.class) ? val : val.clone
  end
end

def initialize(boss)

def initialize(boss)
  @boss = boss
end

def inspect

See http://github.com/tarcieri/celluloid/issues/22
def inspect
  "#<Processor #{to_s}>"
end

def process(msgstr, queue)

def process(msgstr, queue)
  defer do
    begin
      msg = Sidekiq.load_json(msgstr)
      klass  = msg['class'].constantize
      worker = klass.new
      stats(worker, msg, queue) do
        Sidekiq.server_middleware.invoke(worker, msg, queue) do
          worker.perform(*cloned(msg['args']))
        end
      end
      rescue Exception => ex
        handle_exception(ex, msg || { :message => msgstr })
      raise
    end
  end
  @boss.processor_done!(current_actor)
end

def stats(worker, msg, queue)

def stats(worker, msg, queue)
  redis do |conn|
    conn.multi do
      conn.sadd('workers', self)
      conn.setex("worker:#{self}:started", EXPIRY, Time.now.to_s)
      hash = {:queue => queue, :payload => msg, :run_at => Time.now.to_i }
      conn.setex("worker:#{self}", EXPIRY, Sidekiq.dump_json(hash))
    end
  end
  dying = false
  begin
    yield
  rescue Exception
    dying = true
    redis do |conn|
      conn.multi do
        conn.incrby("stat:failed", 1)
      end
    end
    raise
  ensure
    redis do |conn|
      conn.multi do
        conn.srem("workers", self)
        conn.del("worker:#{self}")
        conn.del("worker:#{self}:started")
        conn.incrby("stat:processed", 1)
      end
    end
  end
end

def to_s

def to_s
  @str ||= "#{hostname}:#{process_id}-#{Thread.current.object_id}:default"
end