class ActiveJob::QueueAdapters::AsyncAdapter::Scheduler

:nodoc:

def enqueue(job, queue_name:)

def enqueue(job, queue_name:)
  executor.post(job, &:perform)
end

def enqueue_at(job, timestamp, queue_name:)

def enqueue_at(job, timestamp, queue_name:)
  delay = timestamp - Time.current.to_f
  if delay > 0
    Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform)
  else
    enqueue(job, queue_name: queue_name)
  end
end

def executor

def executor
  immediate ? @immediate_executor : @async_executor
end

def initialize(**options)

def initialize(**options)
  self.immediate = false
  @immediate_executor = Concurrent::ImmediateExecutor.new
  @async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options))
end

def shutdown(wait: true)

def shutdown(wait: true)
  @async_executor.shutdown
  @async_executor.wait_for_termination if wait
end