require'rufus/scheduler'require'thwait'require'sidekiq/util'require'sidekiq-scheduler/manager'moduleSidekiqclassSchedulerextendSidekiq::UtilREGISTERED_JOBS_THRESHOLD_IN_SECONDS=24*60*60RUFUS_METADATA_KEYS=%w(description at cron every in interval)# We expect rufus jobs to have #paramsRufus::Scheduler::Job.module_evaldoalias_method:params,:optsendclass<<self# Set to enable or disable the scheduler.attr_accessor:enabled# Set to update the schedule in runtime in a given time period.attr_accessor:dynamic# Set to schedule jobs only when will be pushed to queues listened by sidekiqattr_accessor:listened_queues_only# the Rufus::Scheduler jobs that are scheduleddefscheduled_jobs@@scheduled_jobsenddefprint_scheduleifrufus_schedulerlogger.info"Scheduling Info\tLast Run"scheduler_jobs=rufus_scheduler.all_jobsscheduler_jobs.eachdo|_,v|logger.info"#{v.t}\t#{v.last}\t"endendend# Pulls the schedule from Sidekiq.schedule and loads it into the# rufus scheduler instancedefload_schedule!ifenabledlogger.info'Loading Schedule'# Load schedule from redis for the first time if dynamicifdynamicSidekiq.reload_schedule!rufus_scheduler.every('5s')doupdate_scheduleendendlogger.info'Schedule empty! Set Sidekiq.schedule'ifSidekiq.schedule.empty?@@scheduled_jobs={}Sidekiq.schedule.eachdo|name,config|if!listened_queues_only||enabled_queue?(config['queue'])load_schedule_job(name,config)elselogger.info{"Ignoring #{name}, job's queue is not enabled."}endendSidekiq.redis{|r|r.del(:schedules_changed)}logger.info'Schedules Loaded'elselogger.info'SidekiqScheduler is disabled'endend# modify interval type value to value with options if options availabledefoptionizate_interval_value(value)args=valueifargs.is_a?(::Array)returnargs.firstifargs.size>2||!args.last.is_a?(::Hash)# symbolize keys of hash for optionsargs[1]=args[1].inject({})do|m,i|key,value=im[(key.to_symrescuekey)||key]=valuemendendargsend# Loads a job schedule into the Rufus::Scheduler and stores it in @@scheduled_jobsdefload_schedule_job(name,config)# If rails_env is set in the config, enforce ENV['RAILS_ENV'] as# required for the jobs to be scheduled. If rails_env is missing, the# job should be scheduled regardless of what ENV['RAILS_ENV'] is set# to.ifconfig['rails_env'].nil?||rails_env_matches?(config)logger.info"Scheduling #{name}#{config}"interval_defined=falseinterval_types=%w{cron every at in interval}interval_types.eachdo|interval_type|config_interval_type=config[interval_type]if!config_interval_type.nil?&&config_interval_type.length>0args=optionizate_interval_value(config_interval_type)rufus_job=new_job(name,interval_type,config,args)@@scheduled_jobs[name]=rufus_jobupdate_job_next_time(name,rufus_job.next_time)interval_defined=truebreakendendunlessinterval_definedlogger.info"no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping"endendend# Pushes the job into Sidekiq if not already pushed for the given time## @param [String] job_name The job's name# @param [Time] time The time when the job got cleared for triggering# @param [Hash] config Job's config hashdefidempotent_job_enqueue(job_name,time,config)registered=register_job_instance(job_name,time)ifregisteredlogger.info"queueing #{config['class']} (#{job_name})"handle_errors{enqueue_job(config)}remove_elder_job_instances(job_name)elselogger.debug{"Ignoring #{job_name} job as it has been already enqueued"}endend# Pushes job's next time execution## @param [String] name The job's name# @param [Time] next_time The job's next time executiondefupdate_job_next_time(name,next_time)Sidekiq.redisdo|r|next_time?r.hset(next_times_key,name,next_time):r.hdel(next_times_key,name)endend# Returns true if the given schedule config hash matches the current# ENV['RAILS_ENV']defrails_env_matches?(config)config['rails_env']&&ENV['RAILS_ENV']&&config['rails_env'].gsub(/\s/,'').split(',').include?(ENV['RAILS_ENV'])enddefhandle_errorsbeginyieldrescueStandardError=>elogger.info"#{e.class.name}: #{e.message}"endend# Enqueue a job based on a config hashdefenqueue_job(job_config)config=prepare_arguments(job_config.dup)ifactive_job_enqueue?(config['class'])enque_with_active_job(config)elseenque_with_sidekiq(config)endenddefrufus_scheduler_options@rufus_scheduler_options||={}enddefrufus_scheduler_options=(options)@rufus_scheduler_options=optionsenddefrufus_scheduler@rufus_scheduler||=new_rufus_schedulerend# Stops old rufus scheduler and creates a new one. Returns the new# rufus schedulerdefclear_schedule!rufus_scheduler.stop@rufus_scheduler=nil@@scheduled_jobs={}rufus_schedulerenddefreload_schedule!ifenabledlogger.info'Reloading Schedule'clear_schedule!load_schedule!elselogger.info'SidekiqScheduler is disabled'endenddefupdate_scheduleifSidekiq.redis{|r|r.scard(:schedules_changed)}>0logger.info'Updating schedule'Sidekiq.reload_schedule!whileschedule_name=Sidekiq.redis{|r|r.spop(:schedules_changed)}ifSidekiq.schedule.keys.include?(schedule_name)unschedule_job(schedule_name)load_schedule_job(schedule_name,Sidekiq.schedule[schedule_name])elseunschedule_job(schedule_name)endendlogger.info'Schedules Loaded'endenddefunschedule_job(name)ifscheduled_jobs[name]logger.debug"Removing schedule #{name}"scheduled_jobs[name].unschedulescheduled_jobs.delete(name)endenddefenque_with_active_job(config)initialize_active_job(config['class'],config['args']).enqueue(config)enddefenque_with_sidekiq(config)Sidekiq::Client.push(sanitize_job_config(config))enddefinitialize_active_job(klass,args)ifargs.is_a?(Array)klass.new(*args)elseklass.new(args)endend# Returns true if the enqueuing needs to be done for an ActiveJob# class false otherwise.## @param [Class] klass the class to check is decendant from ActiveJob## @return [Boolean]defactive_job_enqueue?(klass)klass.is_a?(Class)&&defined?(ActiveJob::Enqueuing)&&klass.included_modules.include?(ActiveJob::Enqueuing)end# Convert the given arguments in the format expected to be enqueued.## @param [Hash] config the options to be converted# @option config [String] class the job class# @option config [Hash/Array] args the arguments to be passed to the job# class## @return [Hash]defprepare_arguments(config)config['class']=try_to_constantize(config['class'])ifconfig['args'].is_a?(Hash)config['args'].symbolize_keys!ifconfig['args'].respond_to?(:symbolize_keys!)elseconfig['args']=Array(config['args'])endconfigenddeftry_to_constantize(klass)klass.is_a?(String)?klass.constantize:klassrescueNameErrorklassend# Returns true if a job's queue is being listened on by sidekiq## @param [String] job_queue Job's queue name## @return [Boolean]defenabled_queue?(job_queue)queues=Sidekiq.options[:queues]queues.empty?||queues.include?(job_queue)end# Registers a queued job instance## @param [String] job_name The job's name# @param [Time] time Time at which the job was cleared by the scheduler## @return [Boolean] true if the job was registered, false when otherwisedefregister_job_instance(job_name,time)pushed_job_key=pushed_job_key(job_name)registered,_=Sidekiq.redisdo|r|r.pipelineddor.zadd(pushed_job_key,time.to_i,time.to_i)r.expire(pushed_job_key,REGISTERED_JOBS_THRESHOLD_IN_SECONDS)endendregisteredenddefremove_elder_job_instances(job_name)Sidekiq.redisdo|r|r.zremrangebyscore(pushed_job_key(job_name),0,Time.now.to_i-REGISTERED_JOBS_THRESHOLD_IN_SECONDS)endend# Returns the key of the Redis sorted set used to store job enqueues## @param [String] job_name The name of the job## @return [String]defpushed_job_key(job_name)"sidekiq-scheduler:pushed:#{job_name}"end# Returns the key of the Redis hash for job's execution times hash#defnext_times_key'sidekiq-scheduler:next_times'endprivatedefnew_rufus_schedulerRufus::Scheduler.new(rufus_scheduler_options).tapdo|scheduler|scheduler.define_singleton_method(:on_post_trigger)do|job,triggered_time|Sidekiq::Scheduler.update_job_next_time(job.tags[0],job.next_time)endendenddefnew_job(name,interval_type,config,args)opts={:job=>true,:tags=>[name]}rufus_scheduler.send(interval_type,*args,opts)do|job,time|idempotent_job_enqueue(name,time,sanitize_job_config(config))endenddefsanitize_job_config(config)config.reject{|k,_|RUFUS_METADATA_KEYS.include?(k)}endendendend