class CanvasSync::JobBatches::Pool
def self.from_pid(pid)
def self.from_pid(pid) raise "PID must be given" unless pid.present? new(pid) end
def self.job_checked_in(status, options)
def self.job_checked_in(status, options) pid = options['pool_id'] from_pid(pid).job_checked_in(status, options) end
def self.redis(&blk)
def self.redis(&blk) Batch.redis &blk end
def <<(job_desc)
def <<(job_desc) add_job(job_desc) end
def active_count
def active_count redis.hincrby(redis_key, "active_count", 0) end
def add_job(job_desc)
def add_job(job_desc) add_jobs([job_desc]) end
def add_jobs(job_descs)
def add_jobs(job_descs) job_descs.each do |job_desc| wrapper = Batch.new wrapper.description = "Pool Job Wrapper (PID: #{pid})" checkin_event = (on_failed_job == :wait) ? :success : :complete wrapper.on(checkin_event, "#{self.class.to_s}.job_checked_in", pool_id: pid) wrapper.jobs {} job_desc = job_desc.symbolize_keys job_desc = job_desc.merge!( job: job_desc[:job].to_s, pool_wrapper_batch: wrapper.bid, ) push_job_to_pool(job_desc) end refill_allotment end
def cleanup_redis
def cleanup_redis Batch.logger.debug {"Cleaning redis of pool #{pid}"} redis do |r| r.zrem("pools", pid) r.unlink( "#{redis_key}", "#{redis_key}-jobs", ) end end
def flush_pending_attrs
def flush_pending_attrs super redis.zadd("pools", created_at, pid) end
def initialize(pooolid = nil, **kwargs)
def initialize(pooolid = nil, **kwargs) if pooolid @existing = true @pid = pooolid else @pid = SecureRandom.urlsafe_base64(10) initialize_new(**kwargs) end end
def initialize_new(concurrency: nil, order: :fifo, clean_when_empty: true, on_failed_job: :wait, description: nil)
def initialize_new(concurrency: nil, order: :fifo, clean_when_empty: true, on_failed_job: :wait, description: nil) self.created_at = Time.now.utc.to_f self.description = description self.order = order self.concurrency = concurrency self.clean_when_empty = clean_when_empty self.on_failed_job = on_failed_job flush_pending_attrs end
def job_checked_in(status, options)
def job_checked_in(status, options) active_count, pending_count = redis do |r| return unless r.exists?(redis_key) # Make sure this is loaded outside of the pipeline self.order redis.multi do |r| r.hincrby(redis_key, "active_count", -1) self.pending_count(r) end end added_count = refill_allotment if active_count == 0 && added_count == 0 && pending_count == 0 if clean_when_empty && redis.hget(redis_key, 'keep_open') != 'true' cleanup_redis end end end
def keep_open!
def keep_open! if block_given? begin keep_open! yield ensure let_close! end else redis.hset(redis_key, 'keep_open', true) end end
def let_close!
def let_close! _, active_count = redis.multi do |r| r.hset(redis_key, 'keep_open', false) r.hincrby(redis_key, "active_count", 0) end if active_count == 0 && pending_count == 0 cleanup_redis if clean_when_empty end end
def pending_count(r = redis)
def pending_count(r = redis) jobs_key = "#{redis_key}-jobs" order = self.order || 'fifo' case order.to_sym when :fifo, :lifo r.llen(jobs_key) when :random r.scard(jobs_key) when :priority r.zcard(jobs_key) end end
def pop_job_from_pool
def pop_job_from_pool jobs_key = "#{redis_key}-jobs" order = self.order job_json = case order.to_sym when :fifo redis.lpop(jobs_key) when :lifo redis.rpop(jobs_key) when :random redis.spop(jobs_key) when :priority redis.zpopmax(jobs_key) end return nil unless job_json.present? ::ActiveJob::Arguments.deserialize(JSON.parse(job_json))[0]&.symbolize_keys end
def push_job_to_pool(job_desc)
def push_job_to_pool(job_desc) jobs_key = "#{redis_key}-jobs" # This allows duplicate jobs when a Redis Set is used job_desc[:_pool_random_key_] = SecureRandom.urlsafe_base64(10) job_json = JSON.unparse(::ActiveJob::Arguments.serialize([job_desc])) order = self.order redis.multi do |r| case order.to_sym when :fifo, :lifo r.rpush(jobs_key, job_json) when :random r.sadd(jobs_key, job_json) when :priority r.zadd(jobs_key, job_desc[:priority] || 0, job_json) end r.expire(jobs_key, Batch::BID_EXPIRE_TTL) end end
def redis_key
def redis_key "POOLID-#{pid}" end
def refill_allotment
def refill_allotment jobs_added = 0 limit = concurrency.to_i redis do |r| current_count = 0 while true current_count = HINCR_MAX.call(r, [redis_key], ["active_count", limit]).to_i if current_count < limit job_desc = pop_job_from_pool if job_desc.present? Batch.new(job_desc[:pool_wrapper_batch]).jobs do ChainBuilder.enqueue_job(job_desc) end jobs_added += 1 else r.hincrby(redis_key, "active_count", -1) break end else break end end r.expire(redis_key, Batch::BID_EXPIRE_TTL) r.expire("#{redis_key}-jobs", Batch::BID_EXPIRE_TTL) end jobs_added end