class SidekiqScheduler::Manager
from Redis onto the work queues
manages the scheduled jobs pushed messages
The delayed job router in the system. This
#
def clear_scheduled_work
def clear_scheduled_work queues = redis.zrange('delayed_queue_schedule', 0, -1).to_a redis.del(*queues.map { |t| "delayed:#{t}" }) unless queues.empty? redis.del('delayed_queue_schedule') end
def find_next_timestamp
def find_next_timestamp timestamp = redis.zrangebyscore('delayed_queue_schedule', '-inf', Time.now.to_i, :limit => [0, 1]) if timestamp.is_a?(Array) timestamp = timestamp.first end timestamp.to_i unless timestamp.nil? end
def find_scheduled_work(timestamp)
def find_scheduled_work(timestamp) loop do break logger.debug("Finished processing queue for timestamp #{timestamp}") unless msg = redis.lpop("delayed:#{timestamp}") item = MultiJson.decode(msg) queue = item.delete('queue') Sidekiq::Client.push(queue, item) end Sidekiq::Client.remove_scheduler_queue(timestamp) end
def initialize(options={})
def initialize(options={}) logger.info "Booting sidekiq scheduler #{SidekiqScheduler::VERSION} with Redis at #{redis.client.location}" logger.debug { options.inspect } @enabled = options[:scheduler] @resolution = options[:resolution] || 5 end
def reset
def reset clear_scheduled_work end
def schedule(run_loop = false)
def schedule(run_loop = false) watchdog("Fatal error in sidekiq, scheduler loop died") do return if stopped? # Dispatch loop loop do break logger.debug('no scheduler queues to process') unless timestamp = find_next_timestamp find_scheduled_work(timestamp) end # This is the polling loop that ensures we check Redis every # second for work, even if there was nothing to do this time # around. after(@resolution) do schedule(run_loop) end if run_loop end end
def start
def start schedule(true) end
def stop
def stop @enabled = false end
def stopped?
def stopped? !@enabled end