class GoodJob::Scheduler
def create_thread
def create_thread future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer| output = nil Rails.application.executor.wrap { output = performer.next } output end future.add_observer(self, :task_observer) future.execute end
def execute
def execute end
def initialize(performer, timer_options: {}, pool_options: {})
def initialize(performer, timer_options: {}, pool_options: {}) raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next) @performer = performer @pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS.merge(pool_options)) @timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS.merge(timer_options)) do idle_threads = @pool.max_length - @pool.length create_thread if idle_threads.positive? end @timer.add_observer(self, :timer_observer) @timer.execute end
def shutdown(wait: true)
def shutdown(wait: true) @_shutdown = true ActiveSupport::Notifications.instrument("scheduler_start_shutdown.good_job", { wait: wait }) ActiveSupport::Notifications.instrument("scheduler_shutdown.good_job", { wait: wait }) do if @timer.running? @timer.shutdown @timer.wait_for_termination if wait end if @pool.running? @pool.shutdown @pool.wait_for_termination if wait end end end
def shutdown?
def shutdown? @_shutdown end
def task_observer(time, output, thread_error)
def task_observer(time, output, thread_error) ActiveSupport::Notifications.instrument("finished_job_task.good_job", { result: output, error: thread_error, time: time }) create_thread if output end
def timer_observer(time, executed_task, thread_error)
def timer_observer(time, executed_task, thread_error) ActiveSupport::Notifications.instrument("finished_timer_task.good_job", { result: executed_task, error: thread_error, time: time }) end