class Sidekiq::Capsule
end
end
cap.queues = %w(single)
cap.concurrency = 1
config.capsule(“single-threaded”) do |cap|
Sidekiq.configure_server do |config|
the jobs with one thread, meaning the jobs will be processed serially.
This capsule will pull jobs from the “single” queue and process
Capsules in their initializer.
One “default” Capsule is started but the user may declare additional
process one or more queues with a given concurrency.
A Sidekiq::Capsule is the set of resources necessary to
def client_middleware
Avoid if possible and add middleware globally so all
Allow the middleware to be different per-capsule.
def client_middleware @client_chain ||= config.client_middleware.copy_for(self) yield @client_chain if block_given? @client_chain end
def fetcher
def fetcher @fetcher ||= begin inst = (config[:fetch_class] || Sidekiq::BasicFetch).new(self) inst.setup(config[:fetch_setup]) if inst.respond_to?(:setup) inst end end
def initialize(name, config)
def initialize(name, config) @name = name @config = config @queues = ["default"] @weights = {"default" => 0} @concurrency = config[:concurrency] @mode = :strict end
def local_redis_pool
def local_redis_pool # connection pool is lazy, it will not create connections unless you actually need them # so don't be skimpy! @redis ||= config.new_redis_pool(@concurrency, name) end
def logger
def logger config.logger end
def lookup(name)
def lookup(name) config.lookup(name) end
def queues=(val)
- :weighted - queues have arbitrary weight between 1 and N
- :strict - all queues have 0 weight and are checked strictly in order
Sidekiq checks queues in three modes:
def queues=(val) @weights = {} @queues = Array(val).each_with_object([]) do |qstr, memo| arr = qstr arr = qstr.split(",") if qstr.is_a?(String) name, weight = arr @weights[name] = weight.to_i [weight.to_i, 1].max.times do memo << name end end @mode = if @weights.values.all?(&:zero?) :strict elsif @weights.values.all? { |x| x == 1 } :random else :weighted end end
def redis
def redis raise ArgumentError, "requires a block" unless block_given? redis_pool.with do |conn| retryable = true begin yield conn rescue RedisClientAdapter::BaseError => ex # 2550 Failover can cause the server to become a replica, need # to disconnect and reopen the socket to get back to the primary. # 4495 Use the same logic if we have a "Not enough replicas" error from the primary # 4985 Use the same logic when a blocking command is force-unblocked # The same retry logic is also used in client.rb if retryable && ex.message =~ /READONLY|NOREPLICAS|UNBLOCKED/ conn.close retryable = false retry end raise end end end
def redis_pool
def redis_pool Thread.current[:sidekiq_redis_pool] || local_redis_pool end
def server_middleware
def server_middleware @server_chain ||= config.server_middleware.copy_for(self) yield @server_chain if block_given? @server_chain end
def stop
def stop fetcher&.bulk_requeue([]) end