class GoodJob::Scheduler

def create_thread

def create_thread
  future = Concurrent::Future.new(args: [@performer], executor: @pool) do |performer|
    output = nil
    Rails.application.executor.wrap { output = performer.next }
    output
  end
  future.add_observer(self, :task_observer)
  future.execute
end

def execute

def execute
end

def initialize(performer, timer_options: {}, pool_options: {})

def initialize(performer, timer_options: {}, pool_options: {})
  raise ArgumentError, "Performer argument must implement #next" unless performer.respond_to?(:next)
  @performer = performer
  @pool = Concurrent::ThreadPoolExecutor.new(DEFAULT_POOL_OPTIONS.merge(pool_options))
  @timer = Concurrent::TimerTask.new(DEFAULT_TIMER_OPTIONS.merge(timer_options)) do
    idle_threads = @pool.max_length - @pool.length
    create_thread if idle_threads.positive?
  end
  @timer.add_observer(self, :timer_observer)
  @timer.execute
end

def shutdown(wait: true)

def shutdown(wait: true)
  @_shutdown = true
  ActiveSupport::Notifications.instrument("scheduler_start_shutdown.good_job", { wait: wait })
  ActiveSupport::Notifications.instrument("scheduler_shutdown.good_job", { wait: wait }) do
    if @timer.running?
      @timer.shutdown
      @timer.wait_for_termination if wait
    end
    if @pool.running?
      @pool.shutdown
      @pool.wait_for_termination if wait
    end
  end
end

def shutdown?

def shutdown?
  @_shutdown
end

def task_observer(time, output, thread_error)

def task_observer(time, output, thread_error)
  ActiveSupport::Notifications.instrument("finished_job_task.good_job", { result: output, error: thread_error, time: time })
  create_thread if output
end

def timer_observer(time, executed_task, thread_error)

def timer_observer(time, executed_task, thread_error)
  ActiveSupport::Notifications.instrument("finished_timer_task.good_job", { result: executed_task, error: thread_error, time: time })
end