module SidekiqScheduler::Utils

def self.active_job_enqueue?(klass)

Returns:
  • (Boolean) -

Parameters:
  • klass (Class) -- the class to check is descendant from ActiveJob
def self.active_job_enqueue?(klass)
  klass.is_a?(Class) && defined?(ActiveJob::Enqueuing) &&
    klass.included_modules.include?(ActiveJob::Enqueuing)
end

def self.calc_cron_run_time(cron, time)

Returns:
  • (Time) -

Parameters:
  • time (Time) --
  • cron (Fugit::Cron) --
def self.calc_cron_run_time(cron, time)
  time = time.floor # remove sub seconds to prevent rounding errors.
  return time if cron.match?(time) # If the time is a perfect match then return it.
  next_t = cron.next_time(time).to_t
  previous_t = cron.previous_time(time).to_t
  # The `time` var is some point between `previous_t` and `next_t`.
  # Figure out how far off we are from each side in seconds.
  next_diff = next_t - time
  previous_diff = time - previous_t
  if next_diff == previous_diff
    # In the event `time` is exactly between `previous_t` and `next_t` the diff will not be equal to
    # `cron.rough_frequency`. In that case we round down.
    cron.rough_frequency == next_diff ? time : previous_t
  elsif next_diff > previous_diff
    # We are closer to the previous run time so return that.
    previous_t
  else
    # We are closer to the next run time so return that.
    next_t
  end
end

def self.enqueue_with_active_job(config)

Parameters:
  • config (Hash) -- The job configuration
def self.enqueue_with_active_job(config)
  options = {
    queue: config['queue']
  }.keep_if { |_, v| !v.nil? }
  initialize_active_job(config['class'], config['args'], config['keyword_argument']).enqueue(options)
end

def self.enqueue_with_sidekiq(config)

Parameters:
  • config (Hash) -- The job configuration
def self.enqueue_with_sidekiq(config)
  Sidekiq::Client.push(sanitize_job_config(config))
end

def self.initialize_active_job(klass, args, keyword_argument = false)

Returns:
  • (Object) - instance of the class klass

Parameters:
  • args (Array, Hash) -- The parameters passed to the klass initializer
  • klass (Class) -- The class to initialize
def self.initialize_active_job(klass, args, keyword_argument = false)
  if args.is_a?(Array)
    klass.new(*args)
  elsif args.is_a?(Hash) && keyword_argument
    klass.new(**symbolize_keys(args))
  else
    klass.new(args)
  end
end

def self.new_rufus_scheduler(options = {})

Returns:
  • (Rufus::Scheduler) - the scheduler instance
def self.new_rufus_scheduler(options = {})
  Rufus::Scheduler.new(options).tap do |scheduler|
    scheduler.define_singleton_method(:on_post_trigger) do |job, triggered_time|
      if (job_name = job.tags[0])
        SidekiqScheduler::Utils.update_job_last_time(job_name, triggered_time)
        SidekiqScheduler::Utils.update_job_next_time(job_name, job.next_time)
      end
    end
  end
end

def self.sanitize_job_config(config)

Returns:
  • (Hash) - the sanitized job config

Parameters:
  • config (Hash) -- The job configuration
def self.sanitize_job_config(config)
  config.reject { |k, _| RUFUS_METADATA_KEYS.include?(k) }
end

def self.stringify_keys(object)

Returns:
  • (Object) -

Parameters:
  • object (Object) --
def self.stringify_keys(object)
  if object.is_a?(Hash)
    Hash[[*object.map { |k, v| [k.to_s, stringify_keys(v) ]} ]]
  elsif object.is_a?(Array) || object.is_a?(Set)
    object.map { |v| stringify_keys(v) }
  else
    object
  end
end

def self.symbolize_keys(object)

Returns:
  • (Object) -

Parameters:
  • object (Object) --
def self.symbolize_keys(object)
  if object.is_a?(Hash)
    Hash[[*object.map { |k, v| [k.to_sym, symbolize_keys(v) ]} ]]
  elsif object.is_a?(Array) || object.is_a?(Set)
    object.map { |v| symbolize_keys(v) }
  else
    object
  end
end

def self.try_to_constantize(klass)

Returns:
  • (Class) - the class corresponding to the klass param

Parameters:
  • klass (String) -- The string to constantize
def self.try_to_constantize(klass)
  klass.is_a?(String) ? Object.const_get(klass) : klass
rescue NameError
  klass
end

def self.update_job_last_time(name, last_time)

Parameters:
  • last_time (Time) -- The job's last execution time
  • name (String) -- The job's name
def self.update_job_last_time(name, last_time)
  SidekiqScheduler::RedisManager.set_job_last_time(name, last_time) if last_time
end

def self.update_job_next_time(name, next_time)

Parameters:
  • next_time (Time) -- The job's next time execution
  • name (String) -- The job's name
def self.update_job_next_time(name, next_time)
  if next_time
    SidekiqScheduler::RedisManager.set_job_next_time(name, next_time)
  else
    SidekiqScheduler::RedisManager.remove_job_next_time(name)
  end
end