class Concurrent::TimerSet
def process_tasks
for up to 60 seconds waiting for the next scheduled task.
garbage collection can occur. If there are no ready tasks it will sleep
scheduled time. If no tasks remain the thread will exit gracefully so that
Run a loop and execute tasks in the scheduled order and at the approximate
def process_tasks loop do task = synchronize { @condition.reset; @queue.peek } break unless task now = Concurrent.monotonic_time diff = task.schedule_time - now if diff <= 0 # We need to remove the task from the queue before passing # it to the executor, to avoid race conditions where we pass # the peek'ed task to the executor and then pop a different # one that's been added in the meantime. # # Note that there's no race condition between the peek and # this pop - this pop could retrieve a different task from # the peek, but that task would be due to fire now anyway # (because @queue is a priority queue, and this thread is # the only reader, so whatever timer is at the head of the # queue now must have the same pop time, or a closer one, as # when we peeked). task = synchronize { @queue.pop } begin task.executor.post { task.process_task } rescue RejectedExecutionError # ignore and continue end else @condition.wait([diff, 60].min) end end end