require'rufus/scheduler'require'json'require'sidekiq-scheduler/rufus_utils'require'sidekiq-scheduler/redis_manager'moduleSidekiqSchedulerclassScheduler# We expect rufus jobs to have #paramsRufus::Scheduler::Job.module_evaldoalias_method:params,:optsend# Set to enable or disable the scheduler.attr_accessor:enabled# Set to update the schedule in runtime in a given time period.attr_accessor:dynamic# Set to update the schedule in runtime dynamically per this period.attr_accessor:dynamic_every# Set to schedule jobs only when will be pushed to queues listened by sidekiqattr_accessor:listened_queues_only# Set custom options for rufus scheduler, like max_work_threads.attr_accessor:rufus_scheduler_optionsclass<<selfdefinstance@instance=newunless@instance@instanceenddefinstance=(value)@instance=valueenddefmethod_missing(method,*arguments,&block)instance_methods.include?(method)?instance.public_send(method,*arguments):superendenddefinitialize(options={})self.enabled=options[:enabled]self.dynamic=options[:dynamic]self.dynamic_every=options[:dynamic_every]self.listened_queues_only=options[:listened_queues_only]self.rufus_scheduler_options=options[:rufus_scheduler_options]||{}end# the Rufus::Scheduler jobs that are scheduleddefscheduled_jobs@scheduled_jobsenddefprint_scheduleifrufus_schedulerSidekiq.logger.info"Scheduling Info\tLast Run"scheduler_jobs=rufus_scheduler.all_jobsscheduler_jobs.each_valuedo|v|Sidekiq.logger.info"#{v.t}\t#{v.last}\t"endendend# Pulls the schedule from Sidekiq.schedule and loads it into the# rufus scheduler instancedefload_schedule!ifenabledSidekiq.logger.info'Loading Schedule'# Load schedule from redis for the first time if dynamicifdynamicSidekiq.reload_schedule!@current_changed_score=Time.now.to_frufus_scheduler.every(dynamic_every)doupdate_scheduleendendSidekiq.logger.info'Schedule empty! Set Sidekiq.schedule'ifSidekiq.schedule.empty?@scheduled_jobs={}queues=sidekiq_queuesSidekiq.schedule.eachdo|name,config|if!listened_queues_only||enabled_queue?(config['queue'].to_s,queues)load_schedule_job(name,config)elseSidekiq.logger.info{"Ignoring #{name}, job's queue is not enabled."}endendSidekiq.logger.info'Schedules Loaded'elseSidekiq.logger.info'SidekiqScheduler is disabled'endend# Loads a job schedule into the Rufus::Scheduler and stores it in @scheduled_jobsdefload_schedule_job(name,config)# If rails_env is set in the config, enforce ENV['RAILS_ENV'] as# required for the jobs to be scheduled. If rails_env is missing, the# job should be scheduled regardless of what ENV['RAILS_ENV'] is set# to.ifconfig['rails_env'].nil?||rails_env_matches?(config)Sidekiq.logger.info"Scheduling #{name}#{config}"interval_defined=falseinterval_types=%w(cron every at in interval)interval_types.eachdo|interval_type|config_interval_type=config[interval_type]if!config_interval_type.nil?&&config_interval_type.length>0schedule,options=SidekiqScheduler::RufusUtils.normalize_schedule_options(config_interval_type)rufus_job=new_job(name,interval_type,config,schedule,options)@scheduled_jobs[name]=rufus_jobSidekiqScheduler::Utils.update_job_next_time(name,rufus_job.next_time)interval_defined=truebreakendendunlessinterval_definedSidekiq.logger.info"no #{interval_types.join(' / ')} found for #{config['class']} (#{name}) - skipping"endendend# Pushes the job into Sidekiq if not already pushed for the given time## @param [String] job_name The job's name# @param [Time] time The time when the job got cleared for triggering# @param [Hash] config Job's config hashdefidempotent_job_enqueue(job_name,time,config)registered=SidekiqScheduler::RedisManager.register_job_instance(job_name,time)ifregisteredSidekiq.logger.info"queueing #{config['class']} (#{job_name})"handle_errors{enqueue_job(config,time)}SidekiqScheduler::RedisManager.remove_elder_job_instances(job_name)elseSidekiq.logger.debug{"Ignoring #{job_name} job as it has been already enqueued"}endend# Enqueue a job based on a config hash## @param job_config [Hash] the job configuration# @param time [Time] time the job is enqueueddefenqueue_job(job_config,time=Time.now)config=prepare_arguments(job_config.dup)ifconfig.delete('include_metadata')config['args']=arguments_with_metadata(config['args'],scheduled_at: time.to_f)endifSidekiqScheduler::Utils.active_job_enqueue?(config['class'])SidekiqScheduler::Utils.enqueue_with_active_job(config)elseSidekiqScheduler::Utils.enqueue_with_sidekiq(config)endenddefrufus_scheduler@rufus_scheduler||=SidekiqScheduler::Utils.new_rufus_scheduler(rufus_scheduler_options)end# Stops old rufus scheduler and creates a new one. Returns the new# rufus scheduler## @param [Symbol] stop_option The option to be passed to Rufus::Scheduler#stopdefclear_schedule!(stop_option=:wait)if@rufus_scheduler@rufus_scheduler.stop(stop_option)@rufus_scheduler=nilend@@scheduled_jobs={}rufus_schedulerenddefreload_schedule!ifenabledSidekiq.logger.info'Reloading Schedule'clear_schedule!load_schedule!elseSidekiq.logger.info'SidekiqScheduler is disabled'endenddefupdate_schedulelast_changed_score,@current_changed_score=@current_changed_score,Time.now.to_fschedule_changes=SidekiqScheduler::RedisManager.get_schedule_changes(last_changed_score,@current_changed_score)ifschedule_changes.size>0Sidekiq.logger.info'Updating schedule'Sidekiq.reload_schedule!schedule_changes.eachdo|schedule_name|ifSidekiq.schedule.keys.include?(schedule_name)unschedule_job(schedule_name)load_schedule_job(schedule_name,Sidekiq.schedule[schedule_name])elseunschedule_job(schedule_name)endendSidekiq.logger.info'Schedule updated'endenddefjob_enabled?(name)job=Sidekiq.schedule[name]schedule_state(name).fetch('enabled',job.fetch('enabled',true))ifjobenddeftoggle_job_enabled(name)state=schedule_state(name)state['enabled']=!job_enabled?(name)set_schedule_state(name,state)endprivatedefnew_job(name,interval_type,config,schedule,options)options=options.merge({:job=>true,:tags=>[name]})rufus_scheduler.send(interval_type,schedule,options)do|job,time|idempotent_job_enqueue(name,time,SidekiqScheduler::Utils.sanitize_job_config(config))ifjob_enabled?(name)endenddefunschedule_job(name)ifscheduled_jobs[name]Sidekiq.logger.debug"Removing schedule #{name}"scheduled_jobs[name].unschedulescheduled_jobs.delete(name)endend# Retrieves a schedule state## @param name [String] with the schedule's name# @return [Hash] with the schedule's statedefschedule_state(name)state=SidekiqScheduler::RedisManager.get_job_state(name)state?JSON.parse(state):{}end# Saves a schedule state## @param name [String] with the schedule's name# @param name [Hash] with the schedule's statedefset_schedule_state(name,state)SidekiqScheduler::RedisManager.set_job_state(name,state)end# Adds a Hash with schedule metadata as the last argument to call the worker.# It currently returns the schedule time as a Float number representing the milisencods# since epoch.## @example with hash argument# arguments_with_metadata({value: 1}, scheduled_at: Time.now)# #=> [{value: 1}, {scheduled_at: <miliseconds since epoch>}]## @param args [Array|Hash]# @param metadata [Hash]# @return [Array] arguments with added metadatadefarguments_with_metadata(args,metadata)ifargs.is_a?Array[*args,metadata]else[args,metadata]endenddefsidekiq_queuesSidekiq.options[:queues].map(&:to_s)end# Returns true if a job's queue is included in the array of queues## If queues are empty, returns true.## @param [String] job_queue Job's queue name# @param [Array<String>] queues## @return [Boolean]defenabled_queue?(job_queue,queues)queues.empty?||queues.include?(job_queue)end# Convert the given arguments in the format expected to be enqueued.## @param [Hash] config the options to be converted# @option config [String] class the job class# @option config [Hash/Array] args the arguments to be passed to the job# class## @return [Hash]defprepare_arguments(config)config['class']=SidekiqScheduler::Utils.try_to_constantize(config['class'])ifconfig['args'].is_a?(Hash)config['args'].symbolize_keys!ifconfig['args'].respond_to?(:symbolize_keys!)elseconfig['args']=Array(config['args'])endconfigend# Returns true if the given schedule config hash matches the current ENV['RAILS_ENV']# @param [Hash] config The schedule job configuration## @return [Boolean] true if the schedule config matches the current ENV['RAILS_ENV']defrails_env_matches?(config)config['rails_env']&&ENV['RAILS_ENV']&&config['rails_env'].gsub(/\s/,'').split(',').include?(ENV['RAILS_ENV'])enddefhandle_errorsbeginyieldrescueStandardError=>eSidekiq.logger.info"#{e.class.name}: #{e.message}"endendendend