class CanvasSync::JobBatches::Batch
def self.with_batch(batch)
def self.with_batch(batch) batch = self.new(batch) if batch.is_a?(String) parent = Thread.current[CURRENT_BATCH_THREAD_KEY] Thread.current[CURRENT_BATCH_THREAD_KEY] = batch yield ensure Thread.current[CURRENT_BATCH_THREAD_KEY] = parent end
def self.without_batch(&blk)
def self.without_batch(&blk) with_batch(nil, &blk) end
def append_jobs(jids)
def append_jobs(jids) jids = jids.uniq return unless jids.size > 0 redis do |r| tme = Time.now.utc.to_f added = r.zadd(@bidkey + "-jids", jids.map{|jid| [tme, jid] }, nx: true) r.multi do |r| r.hincrby(@bidkey, "pending", added) r.hincrby(@bidkey, "job_count", added) r.expire(@bidkey, BID_EXPIRE_TTL) r.expire(@bidkey + "-jids", BID_EXPIRE_TTL) end end end
def assert_batch_is_open
def assert_batch_is_open unless defined?(@closed) @closed = redis.hget(@bidkey, 'success') == 'true' end raise "Cannot add jobs to Batch #{} bid - it has already entered the callback-stage" if @closed end
def bid_hierarchy(bid, depth: 4, per_depth: 5, slice: nil)
def bid_hierarchy(bid, depth: 4, per_depth: 5, slice: nil) args = [bid, depth, per_depth] args << slice if slice redis do |r| BID_HIERARCHY.call(r, [], args) end end
def cleanup_redis(bid)
def cleanup_redis(bid) logger.debug {"Cleaning redis of batch #{bid}"} redis do |r| r.zrem("batches", bid) r.zrem("BID-ROOT-bids", bid) r.unlink( "BID-#{bid}", "BID-#{bid}-callbacks-complete", "BID-#{bid}-callbacks-success", "BID-#{bid}-failed", "BID-#{bid}-batches-success", "BID-#{bid}-batches-complete", "BID-#{bid}-batches-failed", "BID-#{bid}-bids", "BID-#{bid}-jids", "BID-#{bid}-pending_callbacks", ) end end
def context
def context return @context if defined?(@context) if (@initialized || @existing) @context = ContextHash.new(bid) else @context = ContextHash.new(bid, {}) end end
def context=(value)
def context=(value) raise "context is read-only once the batch has been started" if (@initialized || @existing) # && !allow_context_changes raise "context must be a Hash" unless value.is_a?(Hash) || value.nil? return nil if value.nil? && @context.nil? value = {} if value.nil? value = value.local if value.is_a?(ContextHash) @context ||= ContextHash.new(bid, {}) @context.set_local(value) # persist_bid_attr('context', JSON.unparse(@context.local)) end
def current
def current Thread.current[CURRENT_BATCH_THREAD_KEY] end
def current_context
def current_context current&.context || {} end
def delete_prematurely!(bid)
def delete_prematurely!(bid) child_bids = redis do |r| r.zrange("BID-#{bid}-bids", 0, -1) end child_bids.each do |cbid| delete_prematurely!(cbid) end cleanup_redis(bid) end
def enqueue_callbacks(event, bid)
def enqueue_callbacks(event, bid) batch_key = "BID-#{bid}" callback_key = "#{batch_key}-callbacks-#{event}" callbacks, queue, parent_bid, callback_params = redis do |r| return unless r.exists?(batch_key) return if r.hget(batch_key, 'keep_open') == 'true' r.multi do |r| r.smembers(callback_key) r.hget(batch_key, "callback_queue") r.hget(batch_key, "parent_bid") r.hget(batch_key, "callback_params") end end queue ||= "default" parent_bid = !parent_bid || parent_bid.empty? ? nil : parent_bid # Basically parent_bid.blank? callback_params = JSON.parse(callback_params) if callback_params.present? callback_args = callbacks.reduce([]) do |memo, jcb| cb = JSON.load(jcb) memo << [cb['callback'], event.to_s, cb['opts'], bid, parent_bid] end opts = {"bid" => bid, "event" => event} should_schedule_batch = callback_args.present? && !callback_params.present? already_processed = redis do |r| SCHEDULE_CALLBACK.call(r, [batch_key], [event.to_s, should_schedule_batch.to_s, BID_EXPIRE_TTL]) end return if already_processed == 'true' if should_schedule_batch logger.debug {"Enqueue callback bid: #{bid} event: #{event} args: #{callback_args.inspect}"} with_batch(parent_bid) do cb_batch = self.new cb_batch.callback_params = { for_bid: bid, event: event, } opts['callback_bid'] = cb_batch.bid logger.debug {"Adding callback batch: #{cb_batch.bid} for batch: #{bid}"} cb_batch.jobs do push_callbacks(callback_args, queue) end end end if callback_params.present? opts['origin'] = callback_params end logger.debug {"Run batch finalizer bid: #{bid} event: #{event} args: #{callback_args.inspect}"} finalizer = Batch::Callback::Finalize.new status = Status.new bid finalizer.dispatch(status, opts) end
def flush_pending_attrs
def flush_pending_attrs super redis.zadd("batches", created_at, bid) end
def increment_job_queue(jid)
def increment_job_queue(jid) assert_batch_is_open append_jobs([jid]) end
def initialize(existing_bid = nil)
def initialize(existing_bid = nil) @bid = existing_bid || SecureRandom.urlsafe_base64(10) @existing = !(!existing_bid || existing_bid.empty?) # Basically existing_bid.present? @initialized = false @bidkey = "BID-" + @bid.to_s self.created_at = Time.now.utc.to_f unless @existing end
def invalidate_all
def invalidate_all redis.setex("invalidated-bid-#{bid}", BID_EXPIRE_TTL, 1) end
def jobs
def jobs raise NoBlockGivenError unless block_given? if !@existing && !@initialized parent_bid = Thread.current[CURRENT_BATCH_THREAD_KEY]&.bid redis.multi do |r| r.hset(@bidkey, "parent_bid", parent_bid.to_s) if parent_bid r.expire(@bidkey, BID_EXPIRE_TTL) if parent_bid r.hincrby("BID-#{parent_bid}", "children", 1) r.expire("BID-#{parent_bid}", BID_EXPIRE_TTL) r.zadd("BID-#{parent_bid}-bids", created_at, bid) else r.zadd("BID-ROOT-bids", created_at, bid) end end flush_pending_attrs @context&.save! @initialized = true else assert_batch_is_open end begin parent = Thread.current[CURRENT_BATCH_THREAD_KEY] Thread.current[CURRENT_BATCH_THREAD_KEY] = self yield ensure Thread.current[CURRENT_BATCH_THREAD_KEY] = parent end nil end
def keep_open!
def keep_open! if block_given? begin keep_open! yield ensure let_close! end else redis.hset(@bidkey, 'keep_open', "true") end end
def let_close!
def let_close! _, failed, pending, children, complete, success = redis.multi do |r| r.hset(@bidkey, 'keep_open', "false") r.scard("BID-#{bid}-failed") r.hincrby("BID-#{bid}", "pending", 0) r.hincrby("BID-#{bid}", "children", 0) r.scard("BID-#{bid}-batches-complete") r.scard("BID-#{bid}-batches-success") end all_success = pending.to_i.zero? && children == success # if complete or successfull call complete callback (the complete callback may then call successful) if (pending.to_i == failed.to_i && children == complete) || all_success self.class.enqueue_callbacks(:complete, bid) self.class.enqueue_callbacks(:success, bid) if all_success end end
def logger
def logger defined?(::Sidekiq) ? ::Sidekiq.logger : Rails.logger end
def on(event, callback, options = {})
def on(event, callback, options = {}) return unless Callback::VALID_CALLBACKS.include?(event.to_s) callback_key = "#{@bidkey}-callbacks-#{event}" redis.multi do |r| r.sadd(callback_key, JSON.unparse({ callback: callback, opts: options })) r.expire(callback_key, BID_EXPIRE_TTL) end end
def parent
def parent if parent_bid Batch.new(parent_bid) end end
def parent_bid
def parent_bid redis.hget(@bidkey, "parent_bid") end
def process_dead_job(bid, jid)
Dead jobs are a Sidekiq feature.
def process_dead_job(bid, jid) _, dead_count = redis do |r| return unless r.exists?("BID-#{bid}") r.multi do |r| r.sadd("BID-#{bid}-dead", jid) r.scard("BID-#{bid}-dead") r.expire("BID-#{bid}-dead", BID_EXPIRE_TTL) end end enqueue_callbacks(:death, bid) end
def process_failed_job(bid, jid)
def process_failed_job(bid, jid) _, pending, failed, children, complete, parent_bid = redis do |r| return unless r.exists?("BID-#{bid}") r.multi do |r| r.sadd("BID-#{bid}-failed", jid) r.hincrby("BID-#{bid}", "pending", 0) r.scard("BID-#{bid}-failed") r.hincrby("BID-#{bid}", "children", 0) r.scard("BID-#{bid}-batches-complete") r.hget("BID-#{bid}", "parent_bid") r.expire("BID-#{bid}-failed", BID_EXPIRE_TTL) end end if pending.to_i == failed.to_i && children == complete enqueue_callbacks(:complete, bid) end end
def process_successful_job(bid, jid)
def process_successful_job(bid, jid) _, failed, pending, children, complete, success, parent_bid, keep_open = redis do |r| return unless r.exists?("BID-#{bid}") r.multi do |r| r.srem("BID-#{bid}-failed", jid) r.scard("BID-#{bid}-failed") r.hincrby("BID-#{bid}", "pending", -1) r.hincrby("BID-#{bid}", "children", 0) r.scard("BID-#{bid}-batches-complete") r.scard("BID-#{bid}-batches-success") r.hget("BID-#{bid}", "parent_bid") r.hget("BID-#{bid}", "keep_open") r.hincrby("BID-#{bid}", "successful-jobs", 1) r.zrem("BID-#{bid}-jids", jid) r.expire("BID-#{bid}", BID_EXPIRE_TTL) end end all_success = pending.to_i.zero? && children == success # if complete or successfull call complete callback (the complete callback may then call successful) if (pending.to_i == failed.to_i && children == complete) || all_success enqueue_callbacks(:complete, bid) enqueue_callbacks(:success, bid) if all_success end end
def push_callbacks(args, queue)
def push_callbacks(args, queue) Batch::Callback::worker_class.enqueue_all(args, queue) end
def redis(&blk)
def redis(&blk) return RedisProxy.new unless block_given? if Thread.current[:job_batches_redis] yield Thread.current[:job_batches_redis] elsif defined?(::Sidekiq) ::Sidekiq.redis do |r| Thread.current[:job_batches_redis] = r yield r ensure Thread.current[:job_batches_redis] = nil end else # TODO end end
def redis_key
def redis_key @bidkey end
def save_context_changes
def save_context_changes @context&.save! end
def valid?(batch = self)
def valid?(batch = self) valid = !redis.exists?("invalidated-bid-#{batch.bid}") batch.parent ? valid && valid?(batch.parent) : valid end