class Concurrent::TimerSet

time. Tasks are run on the global task pool or on the supplied executor.
monitors the set and schedules each task for execution at the appropriate
Executes a collection of tasks at the specified times. A master thread

def self.calculate_schedule_time(intended_time, now = Time.now)

Raises:
  • (ArgumentError) - if the intended execution time is not in the future

Returns:
  • (Fixnum) - the intended time as seconds/millis from Epoch

Parameters:
  • now (Time) -- (Time.now) the time from which to calculate an interval
  • intended_time (Object) -- the time (as a `Time` object or an integer)
def self.calculate_schedule_time(intended_time, now = Time.now)
  if intended_time.is_a?(Time)
    raise ArgumentError.new('schedule time must be in the future') if intended_time <= now
    intended_time
  else
    raise ArgumentError.new('seconds must be greater than zero') if intended_time.to_f < 0.0
    now + intended_time
  end
end

def initialize(opts = {})

Options Hash: (**opts)
  • :executor (object) -- when provided will run all operations on
  • :operation (Boolean) -- when `true` will execute the future on the global

Parameters:
  • opts (Hash) -- the options controlling how the future will be processed
def initialize(opts = {})
  @queue = PriorityQueue.new(order: :min)
  @task_executor = OptionsParser::get_executor_from(opts)
  @timer_executor = SingleThreadExecutor.new
  @condition = Condition.new
  init_executor
end

def post(intended_time, *args, &task)

Raises:
  • (ArgumentError) - if no block is given
  • (ArgumentError) - if the intended execution time is not in the future

Returns:
  • (Boolean) - true if the message is post, false after shutdown

Other tags:
    Yield: - the task to be performed

Parameters:
  • intended_time (Object) -- the time to schedule the task for execution
def post(intended_time, *args, &task)
  time = TimerSet.calculate_schedule_time(intended_time).to_f
  raise ArgumentError.new('no block given') unless block_given?
  mutex.synchronize do
    return false unless running?
    if (time - Time.now.to_f) <= 0.01
      @task_executor.post(*args, &task)
    else
      @queue.push(Task.new(time, args, task))
      @timer_executor.post(&method(:process_tasks))
    end
    true
  end
end

def process_tasks

@!visibility private

for up to 60 seconds waiting for the next scheduled task.
garbage collection can occur. If there are no ready tasks it will sleep
scheduled time. If no tasks remain the thread will exit gracefully so that
Run a loop and execute tasks in the scheduled order and at the approximate
def process_tasks
  loop do
    break if @queue.empty?
    task = @queue.peek
    interval = task.time - Time.now.to_f
    if interval <= 0
      @task_executor.post(*task.args, &task.op)
      @queue.pop
    else
      mutex.synchronize do
        @condition.wait(mutex, [interval, 60].min)
      end
    end
  end
end

def shutdown_execution

@!visibility private
def shutdown_execution
  @queue.clear
  @timer_executor.kill
  stopped_event.set
end