class CanvasSync::JobBatches::Batch

def self.with_batch(batch)

def self.with_batch(batch)
  batch = self.new(batch) if batch.is_a?(String)
  parent = Thread.current[CURRENT_BATCH_THREAD_KEY]
  Thread.current[CURRENT_BATCH_THREAD_KEY] = batch
  yield
ensure
  Thread.current[CURRENT_BATCH_THREAD_KEY] = parent
end

def self.without_batch(&blk)

Any Batches or Jobs created in the given block won't be assocaiated to the current batch
def self.without_batch(&blk)
  with_batch(nil, &blk)
end

def append_jobs(jids)

def append_jobs(jids)
  jids = jids.uniq
  return unless jids.size > 0
  redis do |r|
    tme = Time.now.utc.to_f
    added = r.zadd(@bidkey + "-jids", jids.map{|jid| [tme, jid] }, nx: true)
    r.multi do |r|
      r.hincrby(@bidkey, "pending", added)
      r.hincrby(@bidkey, "job_count", added)
      r.expire(@bidkey, BID_EXPIRE_TTL)
      r.expire(@bidkey + "-jids", BID_EXPIRE_TTL)
    end
  end
end

def assert_batch_is_open

def assert_batch_is_open
  unless defined?(@closed)
    @closed = redis.hget(@bidkey, 'success') == 'true'
  end
  raise "Cannot add jobs to Batch #{} bid - it has already entered the callback-stage" if @closed
end

def bid_hierarchy(bid, depth: 4, per_depth: 5, slice: nil)

def bid_hierarchy(bid, depth: 4, per_depth: 5, slice: nil)
  args = [bid, depth, per_depth]
  args << slice if slice
  redis do |r|
    BID_HIERARCHY.call(r, [], args)
  end
end

def cleanup_redis(bid)

def cleanup_redis(bid)
  logger.debug {"Cleaning redis of batch #{bid}"}
  redis do |r|
    r.zrem("batches", bid)
    r.zrem("BID-ROOT-bids", bid)
    r.unlink(
      "BID-#{bid}",
      "BID-#{bid}-callbacks-complete",
      "BID-#{bid}-callbacks-success",
      "BID-#{bid}-failed",
      "BID-#{bid}-batches-success",
      "BID-#{bid}-batches-complete",
      "BID-#{bid}-batches-failed",
      "BID-#{bid}-bids",
      "BID-#{bid}-jids",
      "BID-#{bid}-pending_callbacks",
    )
  end
end

def context

def context
  return @context if defined?(@context)
  if (@initialized || @existing)
    @context = ContextHash.new(bid)
  else
    @context = ContextHash.new(bid, {})
  end
end

def context=(value)

def context=(value)
  raise "context is read-only once the batch has been started" if (@initialized || @existing) # && !allow_context_changes
  raise "context must be a Hash" unless value.is_a?(Hash) || value.nil?
  return nil if value.nil? && @context.nil?
  value = {} if value.nil?
  value = value.local if value.is_a?(ContextHash)
  @context ||= ContextHash.new(bid, {})
  @context.set_local(value)
  # persist_bid_attr('context', JSON.unparse(@context.local))
end

def current

def current
  Thread.current[CURRENT_BATCH_THREAD_KEY]
end

def current_context

def current_context
  current&.context || {}
end

def delete_prematurely!(bid)

def delete_prematurely!(bid)
  child_bids = redis do |r|
    r.zrange("BID-#{bid}-bids", 0, -1)
  end
  child_bids.each do |cbid|
    delete_prematurely!(cbid)
  end
  cleanup_redis(bid)
end

def enqueue_callbacks(event, bid)

def enqueue_callbacks(event, bid)
  batch_key = "BID-#{bid}"
  callback_key = "#{batch_key}-callbacks-#{event}"
  callbacks, queue, parent_bid, callback_params = redis do |r|
    return unless r.exists?(batch_key)
    return if r.hget(batch_key, 'keep_open') == 'true'
    r.multi do |r|
      r.smembers(callback_key)
      r.hget(batch_key, "callback_queue")
      r.hget(batch_key, "parent_bid")
      r.hget(batch_key, "callback_params")
    end
  end
  queue ||= "default"
  parent_bid = !parent_bid || parent_bid.empty? ? nil : parent_bid    # Basically parent_bid.blank?
  callback_params = JSON.parse(callback_params) if callback_params.present?
  callback_args = callbacks.reduce([]) do |memo, jcb|
    cb = JSON.load(jcb)
    memo << [cb['callback'], event.to_s, cb['opts'], bid, parent_bid]
  end
  opts = {"bid" => bid, "event" => event}
  should_schedule_batch = callback_args.present? && !callback_params.present?
  already_processed = redis do |r|
    SCHEDULE_CALLBACK.call(r, [batch_key], [event.to_s, should_schedule_batch.to_s, BID_EXPIRE_TTL])
  end
  return if already_processed == 'true'
  if should_schedule_batch
    logger.debug {"Enqueue callback bid: #{bid} event: #{event} args: #{callback_args.inspect}"}
    with_batch(parent_bid) do
      cb_batch = self.new
      cb_batch.callback_params = {
        for_bid: bid,
        event: event,
      }
      opts['callback_bid'] = cb_batch.bid
      logger.debug {"Adding callback batch: #{cb_batch.bid} for batch: #{bid}"}
      cb_batch.jobs do
        push_callbacks(callback_args, queue)
      end
    end
  end
  if callback_params.present?
    opts['origin'] = callback_params
  end
  logger.debug {"Run batch finalizer bid: #{bid} event: #{event} args: #{callback_args.inspect}"}
  finalizer = Batch::Callback::Finalize.new
  status = Status.new bid
  finalizer.dispatch(status, opts)
end

def flush_pending_attrs

def flush_pending_attrs
  super
  redis.zadd("batches", created_at, bid)
end

def increment_job_queue(jid)

def increment_job_queue(jid)
  assert_batch_is_open
  append_jobs([jid])
end

def initialize(existing_bid = nil)

def initialize(existing_bid = nil)
  @bid = existing_bid || SecureRandom.urlsafe_base64(10)
  @existing = !(!existing_bid || existing_bid.empty?)  # Basically existing_bid.present?
  @initialized = false
  @bidkey = "BID-" + @bid.to_s
  self.created_at = Time.now.utc.to_f unless @existing
end

def invalidate_all

def invalidate_all
  redis.setex("invalidated-bid-#{bid}", BID_EXPIRE_TTL, 1)
end

def jobs

def jobs
  raise NoBlockGivenError unless block_given?
  if !@existing && !@initialized
    parent_bid = Thread.current[CURRENT_BATCH_THREAD_KEY]&.bid
    redis.multi do |r|
      r.hset(@bidkey, "parent_bid", parent_bid.to_s) if parent_bid
      r.expire(@bidkey, BID_EXPIRE_TTL)
      if parent_bid
        r.hincrby("BID-#{parent_bid}", "children", 1)
        r.expire("BID-#{parent_bid}", BID_EXPIRE_TTL)
        r.zadd("BID-#{parent_bid}-bids", created_at, bid)
      else
        r.zadd("BID-ROOT-bids", created_at, bid)
      end
    end
    flush_pending_attrs
    @context&.save!
    @initialized = true
  else
    assert_batch_is_open
  end
  begin
    parent = Thread.current[CURRENT_BATCH_THREAD_KEY]
    Thread.current[CURRENT_BATCH_THREAD_KEY] = self
    yield
  ensure
    Thread.current[CURRENT_BATCH_THREAD_KEY] = parent
  end
  nil
end

def keep_open!

def keep_open!
  if block_given?
    begin
      keep_open!
      yield
    ensure
      let_close!
    end
  else
    redis.hset(@bidkey, 'keep_open', "true")
  end
end

def let_close!

def let_close!
  _, failed, pending, children, complete, success = redis.multi do |r|
    r.hset(@bidkey, 'keep_open', "false")
    r.scard("BID-#{bid}-failed")
    r.hincrby("BID-#{bid}", "pending", 0)
    r.hincrby("BID-#{bid}", "children", 0)
    r.scard("BID-#{bid}-batches-complete")
    r.scard("BID-#{bid}-batches-success")
  end
  all_success = pending.to_i.zero? && children == success
  # if complete or successfull call complete callback (the complete callback may then call successful)
  if (pending.to_i == failed.to_i && children == complete) || all_success
    self.class.enqueue_callbacks(:complete, bid)
    self.class.enqueue_callbacks(:success, bid) if all_success
  end
end

def logger

def logger
  defined?(::Sidekiq) ? ::Sidekiq.logger : Rails.logger
end

def on(event, callback, options = {})

def on(event, callback, options = {})
  return unless Callback::VALID_CALLBACKS.include?(event.to_s)
  callback_key = "#{@bidkey}-callbacks-#{event}"
  redis.multi do |r|
    r.sadd(callback_key, JSON.unparse({
      callback: callback,
      opts: options
    }))
    r.expire(callback_key, BID_EXPIRE_TTL)
  end
end

def parent

def parent
  if parent_bid
    Batch.new(parent_bid)
  end
end

def parent_bid

def parent_bid
  redis.hget(@bidkey, "parent_bid")
end

def process_dead_job(bid, jid)

If this is called for a job, process_failed_job was also called
Dead jobs are a Sidekiq feature.
def process_dead_job(bid, jid)
  _, dead_count = redis do |r|
    return unless r.exists?("BID-#{bid}")
    r.multi do |r|
      r.sadd("BID-#{bid}-dead", jid)
      r.scard("BID-#{bid}-dead")
      r.expire("BID-#{bid}-dead", BID_EXPIRE_TTL)
    end
  end
  enqueue_callbacks(:death, bid)
end

def process_failed_job(bid, jid)

def process_failed_job(bid, jid)
  _, pending, failed, children, complete, parent_bid = redis do |r|
    return unless r.exists?("BID-#{bid}")
    r.multi do |r|
      r.sadd("BID-#{bid}-failed", jid)
      r.hincrby("BID-#{bid}", "pending", 0)
      r.scard("BID-#{bid}-failed")
      r.hincrby("BID-#{bid}", "children", 0)
      r.scard("BID-#{bid}-batches-complete")
      r.hget("BID-#{bid}", "parent_bid")
      r.expire("BID-#{bid}-failed", BID_EXPIRE_TTL)
    end
  end
  if pending.to_i == failed.to_i && children == complete
    enqueue_callbacks(:complete, bid)
  end
end

def process_successful_job(bid, jid)

def process_successful_job(bid, jid)
  _, failed, pending, children, complete, success, parent_bid, keep_open = redis do |r|
    return unless r.exists?("BID-#{bid}")
    r.multi do |r|
      r.srem("BID-#{bid}-failed", jid)
      r.scard("BID-#{bid}-failed")
      r.hincrby("BID-#{bid}", "pending", -1)
      r.hincrby("BID-#{bid}", "children", 0)
      r.scard("BID-#{bid}-batches-complete")
      r.scard("BID-#{bid}-batches-success")
      r.hget("BID-#{bid}", "parent_bid")
      r.hget("BID-#{bid}", "keep_open")
      r.hincrby("BID-#{bid}", "successful-jobs", 1)
      r.zrem("BID-#{bid}-jids", jid)
      r.expire("BID-#{bid}", BID_EXPIRE_TTL)
    end
  end
  all_success = pending.to_i.zero? && children == success
  # if complete or successfull call complete callback (the complete callback may then call successful)
  if (pending.to_i == failed.to_i && children == complete) || all_success
    enqueue_callbacks(:complete, bid)
    enqueue_callbacks(:success, bid) if all_success
  end
end

def push_callbacks(args, queue)

def push_callbacks(args, queue)
  Batch::Callback::worker_class.enqueue_all(args, queue)
end

def redis(&blk)

def redis(&blk)
  return RedisProxy.new unless block_given?
  if Thread.current[:job_batches_redis]
    yield Thread.current[:job_batches_redis]
  elsif defined?(::Sidekiq)
    ::Sidekiq.redis do |r|
      Thread.current[:job_batches_redis] = r
      yield r
    ensure
      Thread.current[:job_batches_redis] = nil
    end
  else
    # TODO
  end
end

def redis_key

def redis_key
  @bidkey
end

def save_context_changes

def save_context_changes
  @context&.save!
end

def valid?(batch = self)

def valid?(batch = self)
  valid = !redis.exists?("invalidated-bid-#{batch.bid}")
  batch.parent ? valid && valid?(batch.parent) : valid
end