class Concurrent::TimerSet

def process_tasks

@!visibility private

for up to 60 seconds waiting for the next scheduled task.
garbage collection can occur. If there are no ready tasks it will sleep
scheduled time. If no tasks remain the thread will exit gracefully so that
Run a loop and execute tasks in the scheduled order and at the approximate
def process_tasks
  loop do
    task = synchronize { @condition.reset; @queue.peek }
    break unless task
    now  = Concurrent.monotonic_time
    diff = task.schedule_time - now
    if diff <= 0
      # We need to remove the task from the queue before passing
      # it to the executor, to avoid race conditions where we pass
      # the peek'ed task to the executor and then pop a different
      # one that's been added in the meantime.
      #
      # Note that there's no race condition between the peek and
      # this pop - this pop could retrieve a different task from
      # the peek, but that task would be due to fire now anyway
      # (because @queue is a priority queue, and this thread is
      # the only reader, so whatever timer is at the head of the
      # queue now must have the same pop time, or a closer one, as
      # when we peeked).
      task = synchronize { @queue.pop }
      begin
        task.executor.post { task.process_task }
      rescue RejectedExecutionError
        # ignore and continue
      end
    else
      @condition.wait([diff, 60].min)
    end
  end
end