class Concurrent::TimerSet
time. Tasks are run on the global task pool or on the supplied executor.
monitors the set and schedules each task for execution at the appropriate
Executes a collection of tasks at the specified times. A master thread
def self.calculate_schedule_time(intended_time, now = Time.now)
-
(ArgumentError)
- if the intended execution time is not in the future
Returns:
-
(Fixnum)
- the intended time as seconds/millis from Epoch
Parameters:
-
now
(Time
) -- (Time.now) the time from which to calculate an interval -
intended_time
(Object
) -- the time (as a `Time` object or an integer)
def self.calculate_schedule_time(intended_time, now = Time.now) if intended_time.is_a?(Time) raise ArgumentError.new('schedule time must be in the future') if intended_time <= now intended_time else raise ArgumentError.new('seconds must be greater than zero') if intended_time.to_f < 0.0 now + intended_time end end
def initialize(opts = {})
(**opts)
-
:executor
(object
) -- when provided will run all operations on -
:operation
(Boolean
) -- when `true` will execute the future on the global
Parameters:
-
opts
(Hash
) -- the options controlling how the future will be processed
def initialize(opts = {}) @queue = PriorityQueue.new(order: :min) @task_executor = OptionsParser::get_executor_from(opts) @timer_executor = SingleThreadExecutor.new @condition = Condition.new init_executor end
def post(intended_time, *args, &task)
-
(ArgumentError)
- if no block is given -
(ArgumentError)
- if the intended execution time is not in the future
Returns:
-
(Boolean)
- true if the message is post, false after shutdown
Other tags:
- Yield: - the task to be performed
Parameters:
-
intended_time
(Object
) -- the time to schedule the task for execution
def post(intended_time, *args, &task) time = TimerSet.calculate_schedule_time(intended_time).to_f raise ArgumentError.new('no block given') unless block_given? mutex.synchronize do return false unless running? if (time - Time.now.to_f) <= 0.01 @task_executor.post(*args, &task) else @queue.push(Task.new(time, args, task)) @timer_executor.post(&method(:process_tasks)) end true end end
def process_tasks
for up to 60 seconds waiting for the next scheduled task.
garbage collection can occur. If there are no ready tasks it will sleep
scheduled time. If no tasks remain the thread will exit gracefully so that
Run a loop and execute tasks in the scheduled order and at the approximate
def process_tasks loop do break if @queue.empty? task = @queue.peek interval = task.time - Time.now.to_f if interval <= 0 @task_executor.post(*task.args, &task.op) @queue.pop else mutex.synchronize do @condition.wait(mutex, [interval, 60].min) end end end end
def shutdown_execution
def shutdown_execution @queue.clear @timer_executor.kill stopped_event.set end