class Sidekiq::Scheduler
def active_job_enqueue?(klass)
-
(Boolean)
-
Parameters:
-
klass
(Class
) -- the class to check is decendant from ActiveJob
def active_job_enqueue?(klass) klass.is_a?(Class) && defined?(ActiveJob::Enqueuing) && klass.included_modules.include?(ActiveJob::Enqueuing) end
def clear_schedule!
Stops old rufus scheduler and creates a new one. Returns the new
def clear_schedule! rufus_scheduler.stop @rufus_scheduler = nil @@scheduled_jobs = {} rufus_scheduler end
def enabled_queue?(job_queue)
-
(Boolean)
-
Parameters:
-
job_queue
(String
) -- Job's queue name
def enabled_queue?(job_queue) queues = Sidekiq.options[:queues] queues.empty? || queues.include?(job_queue) end
def enque_with_active_job(config)
def enque_with_active_job(config) initialize_active_job(config['class'], config['args']).enqueue(config) end
def enque_with_sidekiq(config)
def enque_with_sidekiq(config) Sidekiq::Client.push(sanitize_job_config(config)) end
def enqueue_job(job_config)
def enqueue_job(job_config) config = prepare_arguments(job_config.dup) if active_job_enqueue?(config['class']) enque_with_active_job(config) else enque_with_sidekiq(config) end end
def handle_errors
def handle_errors begin yield rescue StandardError => e logger.info "#{e.class.name}: #{e.message}" end end
def idempotent_job_enqueue(job_name, time, config)
-
config
(Hash
) -- Job's config hash -
time
(Time
) -- The time when the job got cleared for triggering -
job_name
(String
) -- The job's name
def idempotent_job_enqueue(job_name, time, config) registered = register_job_instance(job_name, time) if registered logger.info "queueing #{config['class']} (#{job_name})" handle_errors { enqueue_job(config) } remove_elder_job_instances(job_name) else logger.debug { "Ignoring #{job_name} job as it has been already enqueued" } end end
def initialize_active_job(klass, args)
def initialize_active_job(klass, args) if args.is_a?(Array) klass.new(*args) else klass.new(args) end end
def load_schedule!
Pulls the schedule from Sidekiq.schedule and loads it into the
def load_schedule! if enabled logger.info 'Loading Schedule' # Load schedule from redis for the first time if dynamic if dynamic Sidekiq.reload_schedule! rufus_scheduler.every('5s') do update_schedule end end logger.info 'Schedule empty! Set Sidekiq.schedule' if Sidekiq.schedule.empty? @@scheduled_jobs = {} Sidekiq.schedule.each do |name, config| if !listened_queues_only || enabled_queue?(config['queue']) load_schedule_job(name, config) else logger.info { "Ignoring #{name}, job's queue is not enabled." } end end Sidekiq.redis { |r| r.del(:schedules_changed) } logger.info 'Schedules Loaded' else logger.info 'SidekiqScheduler is disabled' end end
def load_schedule_job(name, config)
def load_schedule_job(name, config) # If rails_env is set in the config, enforce ENV['RAILS_ENV'] as # required for the jobs to be scheduled. If rails_env is missing, the # job should be scheduled regardless of what ENV['RAILS_ENV'] is set # to. if config['rails_env'].nil? || rails_env_matches?(config) logger.info "Scheduling #{name} #{config}" interval_defined = false interval_types = %w{cron every at in interval} interval_types.each do |interval_type| config_interval_type = config[interval_type] if !config_interval_type.nil? && config_interval_type.length > 0 args = optionizate_interval_value(config_interval_type) rufus_job = new_job(name, interval_type, config, args) @@scheduled_jobs[name] = rufus_job update_job_next_time(name, rufus_job.next_time) interval_defined = true break end end unless interval_defined logger.info "no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping" end end end
def new_job(name, interval_type, config, args)
def new_job(name, interval_type, config, args) opts = { :job => true, :tags => [name] } rufus_scheduler.send(interval_type, *args, opts) do |job, time| idempotent_job_enqueue(name, time, sanitize_job_config(config)) end end
def new_rufus_scheduler
def new_rufus_scheduler Rufus::Scheduler.new(rufus_scheduler_options).tap do |scheduler| scheduler.define_singleton_method(:on_post_trigger) do |job, triggered_time| Sidekiq::Scheduler.update_job_next_time(job.tags[0], job.next_time) end end end
def next_times_key
Returns the key of the Redis hash for job's execution times hash
def next_times_key 'sidekiq-scheduler:next_times' end
def optionizate_interval_value(value)
def optionizate_interval_value(value) args = value if args.is_a?(::Array) return args.first if args.size > 2 || !args.last.is_a?(::Hash) # symbolize keys of hash for options args[1] = args[1].inject({}) do |m, i| key, value = i m[(key.to_sym rescue key) || key] = value m end end args end
def prepare_arguments(config)
-
(Hash)
-
Options Hash:
(**config)
-
args
(Hash/Array
) -- the arguments to be passed to the job -
class
(String
) -- the job class
Parameters:
-
config
(Hash
) -- the options to be converted
def prepare_arguments(config) config['class'] = try_to_constantize(config['class']) if config['args'].is_a?(Hash) config['args'].symbolize_keys! if config['args'].respond_to?(:symbolize_keys!) else config['args'] = Array(config['args']) end config end
def print_schedule
def print_schedule if rufus_scheduler logger.info "Scheduling Info\tLast Run" scheduler_jobs = rufus_scheduler.all_jobs scheduler_jobs.each do |_, v| logger.info "#{v.t}\t#{v.last}\t" end end end
def pushed_job_key(job_name)
-
(String)
-
Parameters:
-
job_name
(String
) -- The name of the job
def pushed_job_key(job_name) "sidekiq-scheduler:pushed:#{job_name}" end
def rails_env_matches?(config)
Returns true if the given schedule config hash matches the current
def rails_env_matches?(config) config['rails_env'] && ENV['RAILS_ENV'] && config['rails_env'].gsub(/\s/, '').split(',').include?(ENV['RAILS_ENV']) end
def register_job_instance(job_name, time)
-
(Boolean)
- true if the job was registered, false when otherwise
Parameters:
-
time
(Time
) -- Time at which the job was cleared by the scheduler -
job_name
(String
) -- The job's name
def register_job_instance(job_name, time) pushed_job_key = pushed_job_key(job_name) registered, _ = Sidekiq.redis do |r| r.pipelined do r.zadd(pushed_job_key, time.to_i, time.to_i) r.expire(pushed_job_key, REGISTERED_JOBS_THRESHOLD_IN_SECONDS) end end registered end
def reload_schedule!
def reload_schedule! if enabled logger.info 'Reloading Schedule' clear_schedule! load_schedule! else logger.info 'SidekiqScheduler is disabled' end end
def remove_elder_job_instances(job_name)
def remove_elder_job_instances(job_name) Sidekiq.redis do |r| r.zremrangebyscore(pushed_job_key(job_name), 0, Time.now.to_i - REGISTERED_JOBS_THRESHOLD_IN_SECONDS) end end
def rufus_scheduler
def rufus_scheduler @rufus_scheduler ||= new_rufus_scheduler end
def rufus_scheduler_options
def rufus_scheduler_options @rufus_scheduler_options ||= {} end
def rufus_scheduler_options=(options)
def rufus_scheduler_options=(options) @rufus_scheduler_options = options end
def sanitize_job_config(config)
def sanitize_job_config(config) config.reject { |k, _| RUFUS_METADATA_KEYS.include?(k) } end
def scheduled_jobs
def scheduled_jobs @@scheduled_jobs end
def try_to_constantize(klass)
def try_to_constantize(klass) klass.is_a?(String) ? klass.constantize : klass rescue NameError klass end
def unschedule_job(name)
def unschedule_job(name) if scheduled_jobs[name] logger.debug "Removing schedule #{name}" scheduled_jobs[name].unschedule scheduled_jobs.delete(name) end end
def update_job_next_time(name, next_time)
-
next_time
(Time
) -- The job's next time execution -
name
(String
) -- The job's name
def update_job_next_time(name, next_time) Sidekiq.redis do |r| next_time ? r.hset(next_times_key, name, next_time) : r.hdel(next_times_key, name) end end
def update_schedule
def update_schedule if Sidekiq.redis { |r| r.scard(:schedules_changed) } > 0 logger.info 'Updating schedule' Sidekiq.reload_schedule! while schedule_name = Sidekiq.redis { |r| r.spop(:schedules_changed) } if Sidekiq.schedule.keys.include?(schedule_name) unschedule_job(schedule_name) load_schedule_job(schedule_name, Sidekiq.schedule[schedule_name]) else unschedule_job(schedule_name) end end logger.info 'Schedules Loaded' end end