class Sidekiq::Manager


the shutdown process. The other tasks are performed by other threads.
Note that only the last task requires its own Thread since it has to monitor
5. stop: hard stop the Processors by deadline.
4. quiet: shutdown idle Processors.
3. processor_died: Handle job failure, throw away Processor, create new one.
1. start: Spin up Processors.
Tasks:
the lifecycle of the Processors.
The Manager is the central coordination point in Sidekiq, controlling
#

def hard_shutdown

def hard_shutdown
  # We've reached the timeout and we still have busy threads.
  # They must die but their jobs shall live on.
  cleanup = nil
  @plock.synchronize do
    cleanup = @workers.dup
  end
  if cleanup.size > 0
    jobs = cleanup.map { |p| p.job }.compact
    logger.warn { "Terminating #{cleanup.size} busy threads" }
    logger.debug { "Jobs still in progress #{jobs.inspect}" }
    # Re-enqueue unfinished jobs
    # NOTE: You may notice that we may push a job back to redis before
    # the thread is terminated. This is ok because Sidekiq's
    # contract says that jobs are run AT LEAST once. Process termination
    # is delayed until we're certain the jobs are back in Redis because
    # it is worse to lose a job than to run it twice.
    capsule.fetcher.bulk_requeue(jobs)
  end
  cleanup.each do |processor|
    processor.kill
  end
  # when this method returns, we immediately call `exit` which may not give
  # the remaining threads time to run `ensure` blocks, etc. We pause here up
  # to 3 seconds to give threads a minimal amount of time to run `ensure` blocks.
  deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + 3
  wait_for(deadline) { @workers.empty? }
end

def initialize(capsule)

def initialize(capsule)
  @config = @capsule = capsule
  @count = capsule.concurrency
  raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
  @done = false
  @workers = Set.new
  @plock = Mutex.new
  @count.times do
    @workers << Processor.new(@config, &method(:processor_result))
  end
end

def processor_result(processor, reason = nil)

def processor_result(processor, reason = nil)
  @plock.synchronize do
    @workers.delete(processor)
    unless @done
      p = Processor.new(@config, &method(:processor_result))
      @workers << p
      p.start
    end
  end
end

def quiet

def quiet
  return if @done
  @done = true
  logger.info { "Terminating quiet threads for #{capsule.name} capsule" }
  @workers.each(&:terminate)
end

def start

def start
  @workers.each(&:start)
end

def stop(deadline)

def stop(deadline)
  quiet
  # some of the shutdown events can be async,
  # we don't have any way to know when they're done but
  # give them a little time to take effect
  sleep PAUSE_TIME
  return if @workers.empty?
  logger.info { "Pausing to allow jobs to finish..." }
  wait_for(deadline) { @workers.empty? }
  return if @workers.empty?
  hard_shutdown
ensure
  capsule.stop
end

def stopped?

def stopped?
  @done
end

def wait_for(deadline, &condblock)

Wait for the orblock to be true or the deadline passed.
def wait_for(deadline, &condblock)
  remaining = deadline - ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
  while remaining > PAUSE_TIME
    return if condblock.call
    sleep PAUSE_TIME
    remaining = deadline - ::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
  end
end