# frozen_string_literal: true
require 'fugit'
require 'cronex'
require 'globalid'
require 'sidekiq/cron/support'
module Sidekiq
module Cron
class Job
# How long we would like to store information about previous enqueues.
REMEMBER_THRESHOLD = 24 * 60 * 60
# Time format for enqueued jobs.
LAST_ENQUEUE_TIME_FORMAT = '%Y-%m-%d %H:%M:%S %z'
# Use serialize/deserialize key of GlobalID.
GLOBALID_KEY = "_sc_globalid"
attr_accessor :name, :namespace, :cron, :description, :klass, :message
attr_reader :last_enqueue_time, :fetch_missing_args, :source, :args
def initialize input_args = {}
args = Hash[input_args.map{ |k, v| [k.to_s, v] }]
@fetch_missing_args = args.delete('fetch_missing_args')
@fetch_missing_args = true if @fetch_missing_args.nil?
@name = args["name"]
@cron = args["cron"]
@description = args["description"] if args["description"]
@source = args["source"] == "schedule" ? "schedule" : "dynamic"
default_namespace = Sidekiq::Cron.configuration.default_namespace
@namespace = args["namespace"] || default_namespace
if Sidekiq::Cron::Namespace.available_namespaces_provided? && !Sidekiq::Cron::Namespace.all.include?(@namespace)
Sidekiq.logger.warn { "Cron Jobs - unexpected namespace #{@namespace} encountered. Assigning to default namespace." }
@namespace = default_namespace
end
# Get class from klass or class.
@klass = args["klass"] || args["class"]
# Set status of job.
@status = args['status'] || status_from_redis
# Set last enqueue time - from args or from existing job.
if args['last_enqueue_time'] && !args['last_enqueue_time'].empty?
@last_enqueue_time = parse_enqueue_time(args['last_enqueue_time'])
else
@last_enqueue_time = last_enqueue_time_from_redis
end
# Get right arguments for job.
@symbolize_args = args["symbolize_args"] == true || ("#{args["symbolize_args"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
@args = parse_args(args["args"])
@date_as_argument = args["date_as_argument"] == true || ("#{args["date_as_argument"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
@active_job = args["active_job"] == true || ("#{args["active_job"]}" =~ (/^(true|t|yes|y|1)$/i)) == 0 || false
@active_job_queue_name_prefix = args["queue_name_prefix"]
@active_job_queue_name_delimiter = args["queue_name_delimiter"]
# symbolize_args is only used when active_job is true
Sidekiq.logger.warn { "Cron Jobs - 'symbolize_args' is gonna be ignored, as it is only used when 'active_job' is true" } if @symbolize_args && !@active_job
if args["message"]
@message = args["message"]
message_data = Sidekiq.load_json(@message) || {}
@queue = message_data['queue'] || "default"
@retry = message_data['retry']
elsif @klass
message_data = {
"class" => @klass.to_s,
"args" => @args,
}
# Get right data for message,
# only if message wasn't specified before.
klass_data = get_job_options(@klass, @args)
message_data = klass_data.merge(message_data)
# Override queue and retry if set in config,
# only if message is hash - can be string (dumped JSON).
if args['queue']
@queue = message_data['queue'] = args['queue']
else
@queue = message_data['queue'] || "default"
end
if args['retry'] != nil
@retry = message_data['retry'] = args['retry']
else
@retry = message_data['retry']
end
@message = message_data
end
@queue_name_with_prefix = queue_name_with_prefix
end
# Crucial part of whole enqueuing job.
def should_enqueue? time
return false unless status == "enabled"
return false if past_scheduled_time?(time)
return false if enqueued_after?(time)
enqueue = Sidekiq.redis do |conn|
conn.zadd(job_enqueued_key, formatted_enqueue_time(time), formatted_last_time(time))
end
enqueue == true || enqueue == 1
end
# Remove previous information about run times,
# this will clear Redis and make sure that Redis will not overflow with memory.
def remove_previous_enqueues time
Sidekiq.redis do |conn|
conn.zremrangebyscore(job_enqueued_key, 0, "(#{(time.to_f - REMEMBER_THRESHOLD).to_s}")
end
end
# Test if job should be enqueued.
def test_and_enqueue_for_time! time
if should_enqueue?(time)
enqueue!
remove_previous_enqueues(time)
end
end
# Enqueue cron job to queue.
def enqueue! time = Time.now.utc
@last_enqueue_time = time
klass_const = Sidekiq::Cron::Support.safe_constantize(@klass.to_s)
jid =
if klass_const
if is_active_job?(klass_const)
enqueue_active_job(klass_const).try :provider_job_id
else
enqueue_sidekiq_worker(klass_const)
end
else
if @active_job
Sidekiq::Client.push(active_job_message)
else
Sidekiq::Client.push(sidekiq_worker_message)
end
end
save_last_enqueue_time
add_jid_history jid
Sidekiq.logger.debug { "enqueued #{@name}: #{@message}" }
end
def is_active_job?(klass = nil)
@active_job || defined?(::ActiveJob::Base) && begin
klass ||= Sidekiq::Cron::Support.safe_constantize(@klass.to_s)
klass ? klass < ::ActiveJob::Base : false
end
end
def date_as_argument?
!!@date_as_argument
end
def enqueue_args
args = date_as_argument? ? @args + [Time.now.to_f] : @args
deserialize_argument(args)
end
def enqueue_active_job(klass_const)
klass_const.set(queue: @queue).perform_later(*enqueue_args)
end
def enqueue_sidekiq_worker(klass_const)
klass_const.set(queue: queue_name_with_prefix, retry: @retry).perform_async(*enqueue_args)
end
# Sidekiq worker message.
def sidekiq_worker_message
message = @message.is_a?(String) ? Sidekiq.load_json(@message) : @message
message["args"] = enqueue_args
message
end
def queue_name_with_prefix
return @queue unless is_active_job?
if !"#{@active_job_queue_name_delimiter}".empty?
queue_name_delimiter = @active_job_queue_name_delimiter
elsif defined?(::ActiveJob::Base) && defined?(::ActiveJob::Base.queue_name_delimiter) && !::ActiveJob::Base.queue_name_delimiter.empty?
queue_name_delimiter = ::ActiveJob::Base.queue_name_delimiter
else
queue_name_delimiter = '_'
end
if !"#{@active_job_queue_name_prefix}".empty?
queue_name = "#{@active_job_queue_name_prefix}#{queue_name_delimiter}#{@queue}"
elsif defined?(::ActiveJob::Base) && defined?(::ActiveJob::Base.queue_name_prefix) && !"#{::ActiveJob::Base.queue_name_prefix}".empty?
queue_name = "#{::ActiveJob::Base.queue_name_prefix}#{queue_name_delimiter}#{@queue}"
else
queue_name = @queue
end
queue_name
end
# Active Job has different structure how it is loading data from Sidekiq
# queue, it creates a wrapper around job.
def active_job_message
{
'class' => 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper',
'wrapped' => @klass,
'queue' => @queue_name_with_prefix,
'description' => @description,
'args' => [{
'job_class' => @klass,
'job_id' => SecureRandom.uuid,
'queue_name' => @queue_name_with_prefix,
'arguments' => enqueue_args
}]
}
end
# Load cron jobs from Hash.
# Input structure should look like:
# {
# 'name_of_job' => {
# 'namespace' => 'MyNamespace',
# 'class' => 'MyClass',
# 'cron' => '1 * * * *',
# 'args' => '(OPTIONAL) [Array or Hash]',
# 'description' => '(OPTIONAL) Description of job'
# },
# 'My super iber cool job' => {
# 'class' => 'SecondClass',
# 'cron' => '*/5 * * * *'
# }
# }
#
def self.load_from_hash(hash, options = {})
array = hash.map do |key, job|
job['name'] = key
job
end
load_from_array(array, options)
end
# Like #load_from_hash.
# If exists old jobs in Redis but removed from args, destroy old jobs.
def self.load_from_hash!(hash, options = {})
destroy_removed_jobs(hash.keys)
load_from_hash(hash, options)
end
# Load cron jobs from Array.
# Input structure should look like:
# [
# {
# 'namespace' => 'MyNamespace',
# 'name' => 'name_of_job',
# 'class' => 'MyClass',
# 'cron' => '1 * * * *',
# 'args' => '(OPTIONAL) [Array or Hash]',
# 'description' => '(OPTIONAL) Description of job'
# },
# {
# 'name' => 'Cool Job for Second Class',
# 'class' => 'SecondClass',
# 'cron' => '*/5 * * * *'
# }
# ]
#
def self.load_from_array(array, options = {})
errors = {}
array.each do |job_data|
job = new(job_data.merge(options))
errors[job.name] = job.errors unless job.save
end
errors
end
# Like #load_from_array.
# If exists old jobs in Redis but removed from args, destroy old jobs.
def self.load_from_array!(array, options = {})
job_names = array.map { |job| job["name"] || job[:name] }
destroy_removed_jobs(job_names)
load_from_array(array, options)
end
# Get all cron jobs.
def self.all(namespace = Sidekiq::Cron.configuration.default_namespace)
job_hashes = nil
Sidekiq.redis do |conn|
job_keys = job_keys_from_namespace(namespace)
job_hashes = conn.pipelined do |pipeline|
job_keys.each do |job_key|
pipeline.hgetall(job_key)
end
end
end
job_hashes.compact.reject(&:empty?).collect do |h|
# No need to fetch missing args from Redis since we just got this hash from there
Sidekiq::Cron::Job.new(h.merge(fetch_missing_args: false))
end
end
def self.count(namespace = Sidekiq::Cron.configuration.default_namespace)
if namespace == '*'
Namespace.all_with_count.reduce(0) do |memo, namespace_count|
memo + namespace_count[:count]
end
else
Sidekiq.redis { |conn| conn.scard(jobs_key(namespace)) }
end
end
def self.find(name, namespace = Sidekiq::Cron.configuration.default_namespace)
# If name is hash try to get name from it.
name = name[:name] || name['name'] if name.is_a?(Hash)
return unless exists? name, namespace
output = nil
Sidekiq.redis do |conn|
if exists? name, namespace
output = Job.new conn.hgetall(redis_key(name, namespace))
end
end
output if output && output.valid?
end
# Create new instance of cron job.
def self.create hash
new(hash).save
end
# Destroy job by name.
def self.destroy(name, namespace = Sidekiq::Cron.configuration.default_namespace)
# If name is hash try to get name from it.
name = name[:name] || name['name'] if name.is_a?(Hash)
if (job = find(name, namespace))
job.destroy
else
false
end
end
def status
@status
end
def disable!
@status = "disabled"
save
end
def enable!
@status = "enabled"
save
end
def enabled?
@status == "enabled"
end
def disabled?
!enabled?
end
def pretty_message
JSON.pretty_generate Sidekiq.load_json(message)
rescue JSON::ParserError
message
end
def human_cron
Cronex::ExpressionDescriptor.new(cron).description
rescue
cron
end
def status_from_redis
out = "enabled"
if fetch_missing_args
Sidekiq.redis do |conn|
status = conn.hget redis_key, "status"
out = status if status
end
end
out
end
def last_enqueue_time_from_redis
out = nil
if fetch_missing_args
Sidekiq.redis do |conn|
out = parse_enqueue_time(conn.hget(redis_key, "last_enqueue_time")) rescue nil
end
end
out
end
def jid_history_from_redis
out =
Sidekiq.redis do |conn|
conn.lrange(jid_history_key, 0, -1) rescue nil
end
out && out.map do |jid_history_raw|
Sidekiq.load_json jid_history_raw
end
end
# Export job data to hash.
def to_hash
{
name: @name,
namespace: @namespace,
klass: @klass.to_s,
cron: @cron,
description: @description,
source: @source,
args: @args.is_a?(String) ? @args : Sidekiq.dump_json(@args || []),
date_as_argument: date_as_argument? ? "1" : "0",
message: @message.is_a?(String) ? @message : Sidekiq.dump_json(@message || {}),
status: @status,
active_job: @active_job ? "1" : "0",
queue_name_prefix: @active_job_queue_name_prefix,
queue_name_delimiter: @active_job_queue_name_delimiter,
retry: @retry.nil? || @retry.is_a?(Numeric) ? @retry : @retry.to_s,
last_enqueue_time: serialized_last_enqueue_time,
symbolize_args: symbolize_args? ? "1" : "0",
}
end
def errors
@errors ||= []
end
def valid?
# Clear previous errors.
@errors = []
errors << "'name' must be set" if @name.nil? || @name.size == 0
errors << "'namespace' must be set" if @namespace.nil? || @namespace.size == 0
errors << "'namespace' cannot be '*'" if @namespace == "*"
if @cron.nil? || @cron.size == 0
errors << "'cron' must be set"
else
begin
@parsed_cron = do_parse_cron(@cron)
rescue => e
errors << "'cron' -> #{@cron.inspect} -> #{e.class}: #{e.message}"
end
end
errors << "'klass' (or class) must be set" unless klass_valid
errors.empty?
end
def klass_valid
case @klass
when Class
true
when String
@klass.size > 0
else
end
end
def save
# If job is invalid, return false.
return false unless valid?
Sidekiq.redis do |conn|
# Add to set of all jobs
conn.sadd self.class.jobs_key(@namespace), [redis_key]
# Add information for this job!
conn.hset redis_key, to_hash.transform_values! { |v| v || '' }.flatten
# Add information about last time! - don't enqueue right after scheduler poller starts!
time = Time.now.utc
exists = conn.exists(job_enqueued_key)
unless exists == true || exists == 1
conn.zadd(job_enqueued_key, time.to_f.to_s, formatted_last_time(time).to_s)
Sidekiq.logger.info { "Cron Jobs - added job with name #{@name} in the namespace #{@namespace}" }
end
end
true
end
def save_last_enqueue_time
Sidekiq.redis do |conn|
# Update last enqueue time.
conn.hset redis_key, 'last_enqueue_time', serialized_last_enqueue_time
end
end
def add_jid_history(jid)
jid_history = {
jid: jid,
enqueued: @last_enqueue_time
}
@history_size ||= Sidekiq::Cron.configuration.cron_history_size.to_i - 1
Sidekiq.redis do |conn|
conn.lpush jid_history_key,
Sidekiq.dump_json(jid_history)
# Keep only last 10 entries in a fifo manner.
conn.ltrim jid_history_key, 0, @history_size
end
end
def destroy
Sidekiq.redis do |conn|
# Delete from set.
conn.srem self.class.jobs_key(@namespace), [redis_key]
# Delete ran timestamps.
conn.del job_enqueued_key
# Delete jid_history.
conn.del jid_history_key
# Delete main job.
conn.del redis_key
end
Sidekiq.logger.info { "Cron Jobs - deleted job with name #{@name} from namespace #{@namespace}" }
end
# Remove all job from cron.
def self.destroy_all!
all.each do |job|
job.destroy
end
Sidekiq.logger.info { "Cron Jobs - deleted all jobs" }
end
# Remove "removed jobs" between current jobs and new jobs
def self.destroy_removed_jobs new_job_names
current_jobs = Sidekiq::Cron::Job.all("*").filter_map { |j| j if j.source == "schedule" }
current_job_names = current_jobs.map(&:name)
removed_job_names = current_job_names - new_job_names
removed_job_names.each do |j|
job_to_destroy = current_jobs.detect { |job| job.name == j }
Sidekiq::Cron::Job.destroy(
job_to_destroy.name,
job_to_destroy.namespace
)
end
removed_job_names
end
# Parse cron specification '* * * * *' and returns
# time when last run should be performed
def last_time now = Time.now.utc
parsed_cron.previous_time(now.utc).utc
end
def formatted_enqueue_time now = Time.now.utc
last_time(now).getutc.to_f.to_s
end
def formatted_last_time now = Time.now.utc
last_time(now).getutc.iso8601
end
def self.exists?(name, namespace = Sidekiq::Cron.configuration.default_namespace)
out = Sidekiq.redis do |conn|
conn.exists(redis_key(name, namespace))
end
[true, 1].include?(out)
end
def exists?
self.class.exists? @name, @namespace
end
def sort_name
"#{status == "enabled" ? 0 : 1}_#{name}".downcase
end
def args=(args)
@args = parse_args(args)
end
def cron_expression_string
parsed_cron.to_cron_s
end
private
def parsed_cron
@parsed_cron ||= do_parse_cron(@cron)
end
def do_parse_cron(cron)
case Sidekiq::Cron.configuration.natural_cron_parsing_mode
when :single
Fugit.do_parse_cronish(cron)
when :strict
Fugit.parse_cron(cron) || # Ex. '11 1 * * 1'
Fugit.parse_nat(cron, :multi => :fail) || # Ex. 'every Monday at 01:11'
fail(ArgumentError.new("invalid cron string #{cron.inspect}"))
else
mode = Sidekiq::Cron.configuration.natural_cron_parsing_mode
raise ArgumentError, "Unknown natural cron parsing mode: #{mode.inspect}"
end
end
def enqueued_after?(time)
@last_enqueue_time && @last_enqueue_time.to_i >= last_time(time).to_i
end
# Try parsing inbound args into an array.
# Args from Redis will be encoded JSON,
# try to load JSON, then failover to string array.
def parse_args(args)
case args
when GlobalID::Identification
[convert_to_global_id_hash(args)]
when String
begin
parsed_args = Sidekiq.load_json(args)
symbolize_args? ? symbolize_args(parsed_args) : parsed_args
rescue JSON::ParserError
[*args]
end
when Hash
args = serialize_argument(args)
symbolize_args? ? [symbolize_args(args)] : [args]
when Array
args = serialize_argument(args)
symbolize_args? ? symbolize_args(args) : args
else
[*args]
end
end
def symbolize_args?
@symbolize_args
end
def symbolize_args(input)
if input.is_a?(Array)
input.map do |arg|
if arg.respond_to?(:symbolize_keys)
arg.symbolize_keys
else
arg
end
end
elsif input.is_a?(Hash) && input.respond_to?(:symbolize_keys)
input.symbolize_keys
else
input
end
end
def parse_enqueue_time(timestamp)
DateTime.strptime(timestamp, LAST_ENQUEUE_TIME_FORMAT).to_time.utc
rescue ArgumentError
DateTime.parse(timestamp).to_time.utc
end
def past_scheduled_time?(current_time)
last_cron_time = parsed_cron.previous_time(current_time).utc
period = Sidekiq::Cron.configuration.reschedule_grace_period
current_time.to_i - last_cron_time.to_i > period
end
def self.default_if_blank(namespace)
if namespace.nil? || namespace == ''
Sidekiq::Cron.configuration.default_namespace
else
namespace
end
end
def self.job_keys_from_namespace(namespace = Sidekiq::Cron.configuration.default_namespace)
Sidekiq.redis do |conn|
if namespace == '*'
namespaces = Sidekiq::Cron::Namespace.all.map { jobs_key(_1) }
namespaces.flat_map { |name| conn.smembers(name) }
else
conn.smembers(jobs_key(namespace))
end
end
end
def self.migrate_old_jobs_if_needed!
Sidekiq.redis do |conn|
old_job_keys = conn.smembers('cron_jobs')
old_job_keys.each do |old_job|
old_job_hash = conn.hgetall(old_job)
old_job_hash[:namespace] = Sidekiq::Cron.configuration.default_namespace
create(old_job_hash)
conn.srem('cron_jobs', old_job)
end
end
end
# Redis key for set of all cron jobs
def self.jobs_key(namespace = Sidekiq::Cron.configuration.default_namespace)
"cron_jobs:#{default_if_blank(namespace)}"
end
# Redis key for storing one cron job
def self.redis_key(name, namespace = Sidekiq::Cron.configuration.default_namespace)
"cron_job:#{default_if_blank(namespace)}:#{name}"
end
# Redis key for storing one cron job
def redis_key
self.class.redis_key @name, @namespace
end
# Redis key for storing one cron job run times
# (when poller added job to queue)
def self.job_enqueued_key(name, namespace = Sidekiq::Cron.configuration.default_namespace)
"cron_job:#{default_if_blank(namespace)}:#{name}:enqueued"
end
def self.jid_history_key(name, namespace = Sidekiq::Cron.configuration.default_namespace)
"cron_job:#{default_if_blank(namespace)}:#{name}:jid_history"
end
# Redis key for storing one cron job run times
# (when poller added job to queue)
def job_enqueued_key
self.class.job_enqueued_key @name, @namespace
end
def jid_history_key
self.class.jid_history_key @name, @namespace
end
def serialized_last_enqueue_time
@last_enqueue_time&.strftime(LAST_ENQUEUE_TIME_FORMAT)
end
def convert_to_global_id_hash(argument)
{ GLOBALID_KEY => argument.to_global_id.to_s }
rescue URI::GID::MissingModelIdError
raise "Unable to serialize #{argument.class} " \
"without an id. (Maybe you forgot to call save?)"
end
def deserialize_argument(argument)
case argument
when String
argument
when Array
argument.map { |arg| deserialize_argument(arg) }
when Hash
if serialized_global_id?(argument)
deserialize_global_id argument
else
argument.transform_values { |v| deserialize_argument(v) }
end
else
argument
end
end
def serialized_global_id?(hash)
hash.size == 1 && hash.include?(GLOBALID_KEY)
end
def deserialize_global_id(hash)
GlobalID::Locator.locate hash[GLOBALID_KEY]
end
def serialize_argument(argument)
case argument
when GlobalID::Identification
convert_to_global_id_hash(argument)
when Array
argument.map { |arg| serialize_argument(arg) }
when Hash
argument.each_with_object({}) do |(key, value), hash|
hash[key] = serialize_argument(value)
end
else
argument
end
end
def get_job_options(klass, args)
klass = klass.is_a?(Class) ? klass : Sidekiq::Cron::Support.safe_constantize(klass)
if klass.nil?
# Unknown class
{"queue"=>"default"}
elsif is_active_job?(klass)
job = klass.new(args)
{"queue"=>job.queue_name}
else
klass.get_sidekiq_options
end
end
end
end
end