lib/sidekiq-scheduler/utils.rb
require 'set' module SidekiqScheduler module Utils RUFUS_METADATA_KEYS = %w(description at cron every in interval enabled) # Stringify keys belonging to a hash. # # Also stringifies nested keys and keys of hashes inside arrays, and sets # # @param [Object] object # # @return [Object] def self.stringify_keys(object) if object.is_a?(Hash) Hash[[*object.map { |k, v| [k.to_s, stringify_keys(v) ]} ]] elsif object.is_a?(Array) || object.is_a?(Set) object.map { |v| stringify_keys(v) } else object end end # Symbolize keys belonging to a hash. # # Also symbolizes nested keys and keys of hashes inside arrays, and sets # # @param [Object] object # # @return [Object] def self.symbolize_keys(object) if object.is_a?(Hash) Hash[[*object.map { |k, v| [k.to_sym, symbolize_keys(v) ]} ]] elsif object.is_a?(Array) || object.is_a?(Set) object.map { |v| symbolize_keys(v) } else object end end # Constantize a given string. # # @param [String] klass The string to constantize # # @return [Class] the class corresponding to the klass param def self.try_to_constantize(klass) klass.is_a?(String) ? Object.const_get(klass) : klass rescue NameError klass end # Initializes active_job using the passed parameters. # # @param [Class] klass The class to initialize # @param [Array, Hash] args The parameters passed to the klass initializer # # @return [Object] instance of the class klass def self.initialize_active_job(klass, args, keyword_argument = false) if args.is_a?(Array) klass.new(*args) elsif args.is_a?(Hash) && keyword_argument klass.new(**symbolize_keys(args)) else klass.new(args) end end # Returns true if the enqueuing needs to be done for an ActiveJob # class false otherwise. # # @param [Class] klass the class to check is descendant from ActiveJob # # @return [Boolean] def self.active_job_enqueue?(klass) klass.is_a?(Class) && defined?(ActiveJob::Enqueuing) && klass.included_modules.include?(ActiveJob::Enqueuing) end # Enqueues the job using the Sidekiq client. # # @param [Hash] config The job configuration def self.enqueue_with_sidekiq(config) Sidekiq::Client.push(sanitize_job_config(config)) end # Enqueues the job using the ActiveJob. # # @param [Hash] config The job configuration def self.enqueue_with_active_job(config) options = { queue: config['queue'] }.keep_if { |_, v| !v.nil? } initialize_active_job(config['class'], config['args'], config['keyword_argument']).enqueue(options) end # Removes the hash values associated to the rufus metadata keys. # # @param [Hash] config The job configuration # # @return [Hash] the sanitized job config def self.sanitize_job_config(config) config.reject { |k, _| RUFUS_METADATA_KEYS.include?(k) } end # Creates a new instance of rufus scheduler. # # @return [Rufus::Scheduler] the scheduler instance def self.new_rufus_scheduler(options = {}) Rufus::Scheduler.new(options).tap do |scheduler| scheduler.define_singleton_method(:on_post_trigger) do |job, triggered_time| if (job_name = job.tags[0]) SidekiqScheduler::Utils.update_job_last_time(job_name, triggered_time) SidekiqScheduler::Utils.update_job_next_time(job_name, job.next_time) end end end end # Pushes job's next time execution # # @param [String] name The job's name # @param [Time] next_time The job's next time execution def self.update_job_next_time(name, next_time) if next_time SidekiqScheduler::RedisManager.set_job_next_time(name, next_time) else SidekiqScheduler::RedisManager.remove_job_next_time(name) end end # Pushes job's last execution time # # @param [String] name The job's name # @param [Time] last_time The job's last execution time def self.update_job_last_time(name, last_time) SidekiqScheduler::RedisManager.set_job_last_time(name, last_time) if last_time end # Try to figure out when the cron job was supposed to run. # # Rufus calls the scheduler block with the current time and not the time the block was scheduled to run. # This means under certain conditions you could have a job get scheduled multiple times because `time.to_i` is used # to key the job in redis. If one server is under load and Rufus tries to run the jobs 1 seconds after the other # server then the job will be queued twice. # This method essentially makes a best guess at when this job was supposed to run and return that. # # @param [Fugit::Cron] cron # @param [Time] time # # @return [Time] def self.calc_cron_run_time(cron, time) time = time.floor # remove sub seconds to prevent rounding errors. return time if cron.match?(time) # If the time is a perfect match then return it. next_t = cron.next_time(time).to_t previous_t = cron.previous_time(time).to_t # The `time` var is some point between `previous_t` and `next_t`. # Figure out how far off we are from each side in seconds. next_diff = next_t - time previous_diff = time - previous_t if next_diff == previous_diff # In the event `time` is exactly between `previous_t` and `next_t` the diff will not be equal to # `cron.rough_frequency`. In that case we round down. cron.rough_frequency == next_diff ? time : previous_t elsif next_diff > previous_diff # We are closer to the previous run time so return that. previous_t else # We are closer to the next run time so return that. next_t end end end end