class ActiveJob::QueueAdapters::AsyncAdapter::Scheduler
:nodoc:
def enqueue(job, queue_name:)
def enqueue(job, queue_name:) executor.post(job, &:perform) end
def enqueue_at(job, timestamp, queue_name:)
def enqueue_at(job, timestamp, queue_name:) delay = timestamp - Time.current.to_f if delay > 0 Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform) else enqueue(job, queue_name: queue_name) end end
def executor
def executor immediate ? @immediate_executor : @async_executor end
def initialize(**options)
def initialize(**options) self.immediate = false @immediate_executor = Concurrent::ImmediateExecutor.new @async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options)) end
def shutdown(wait: true)
def shutdown(wait: true) @async_executor.shutdown @async_executor.wait_for_termination if wait end