class Concurrent::TimerSet

@!macro monotonic_clock_warning
@see Concurrent::ScheduledTask
Each task is represented as a ‘ScheduledTask`.
time. Tasks are run on the global thread pool or on the supplied executor.
monitors the set and schedules each task for execution at the appropriate
Executes a collection of tasks, each after a given delay. A master task

def initialize(opts = {})

Options Hash: (**opts)
  • :executor (Executor) -- when set use the given `Executor` instance.

Parameters:
  • opts (Hash) -- the options used to specify the executor on which to perform actions
def initialize(opts = {})
  super(opts)
end

def kill

not running.
will be accepted. Has no additional effect if the thread pool is
complete but enqueued tasks will be dismissed and no new tasks
Begin an immediate shutdown. In-progress tasks will be allowed to
def kill
  shutdown
end

def ns_initialize(opts)

Parameters:
  • opts (Hash) -- the options to create the object with.
def ns_initialize(opts)
  @queue              = Collection::NonConcurrentPriorityQueue.new(order: :min)
  @task_executor      = Options.executor_from_options(opts) || Concurrent.global_io_executor
  @timer_executor     = SingleThreadExecutor.new
  @condition          = Event.new
  @ruby_pid           = $$ # detects if Ruby has forked
end

def ns_post_task(task)

@!visibility private
def ns_post_task(task)
  return false unless ns_running?
  ns_reset_if_forked
  if (task.initial_delay) <= 0.01
    task.executor.post { task.process_task }
  else
    @queue.push(task)
    # only post the process method when the queue is empty
    @timer_executor.post(&method(:process_tasks)) if @queue.length == 1
    @condition.set
  end
  true
end

def ns_reset_if_forked

def ns_reset_if_forked
  if $$ != @ruby_pid
    @queue.clear
    @condition.reset
    @ruby_pid = $$
  end
end

def ns_shutdown_execution

@!visibility private

`ExecutorService` callback called during shutdown.
def ns_shutdown_execution
  ns_reset_if_forked
  @queue.clear
  @timer_executor.kill
  stopped_event.set
end

def post(delay, *args, &task)

Raises:
  • (ArgumentError) - if no block is given.
  • (ArgumentError) - if the intended execution time is not in the future.

Returns:
  • (Concurrent::ScheduledTask, false) - IVar representing the task if the post

Other tags:
    Yield: - the task to be performed.

Parameters:
  • args (Array) -- the arguments passed to the task on execution.
  • delay (Float) -- the number of seconds to wait for before executing the task.
  • def post(delay, *args, &task)
      raise ArgumentError.new('no block given') unless block_given?
      return false unless running?
      opts = { executor:  @task_executor,
               args:      args,
               timer_set: self }
      task = ScheduledTask.execute(delay, opts, &task) # may raise exception
      task.unscheduled? ? false : task
    end

    def post_task(task)

    Other tags:
      Note: - This is intended as a callback method from ScheduledTask
    def post_task(task)
      synchronize { ns_post_task(task) }
    end

    def process_tasks

    @!visibility private

    for up to 60 seconds waiting for the next scheduled task.
    garbage collection can occur. If there are no ready tasks it will sleep
    scheduled time. If no tasks remain the thread will exit gracefully so that
    Run a loop and execute tasks in the scheduled order and at the approximate
    def process_tasks
      loop do
        task = synchronize { @condition.reset; @queue.peek }
        break unless task
        now  = Concurrent.monotonic_time
        diff = task.schedule_time - now
        if diff <= 0
          # We need to remove the task from the queue before passing
          # it to the executor, to avoid race conditions where we pass
          # the peek'ed task to the executor and then pop a different
          # one that's been added in the meantime.
          #
          # Note that there's no race condition between the peek and
          # this pop - this pop could retrieve a different task from
          # the peek, but that task would be due to fire now anyway
          # (because @queue is a priority queue, and this thread is
          # the only reader, so whatever timer is at the head of the
          # queue now must have the same pop time, or a closer one, as
          # when we peeked).
          task = synchronize { @queue.pop }
          begin
            task.executor.post { task.process_task }
          rescue RejectedExecutionError
            # ignore and continue
          end
        else
          @condition.wait([diff, 60].min)
        end
      end
    end

    def remove_task(task)

    Other tags:
      Note: - This is intended as a callback method from `ScheduledTask`
    def remove_task(task)
      synchronize { @queue.delete(task) }
    end