class Concurrent::ScheduledTask
def self.execute(intended_time, opts = {}, &block)
- Since: - 0.5.0
def self.execute(intended_time, opts = {}, &block) return ScheduledTask.new(intended_time, opts, &block).execute end
def add_observer(*args, &block)
def add_observer(*args, &block) if_state(:unscheduled, :pending, :in_progress) do observers.add_observer(*args, &block) end end
def cancel
def cancel if_state(:unscheduled, :pending) do @state = :cancelled event.set true end end
def cancelled?
def cancelled? state == :cancelled end
def execute
- Since: - 0.5.0
def execute if compare_and_set_state(:pending, :unscheduled) @schedule_time = TimerSet.calculate_schedule_time(@intended_time) Concurrent::timer(@schedule_time.to_f - Time.now.to_f, &method(:process_task)) self end end
def in_progress?
def in_progress? state == :in_progress end
def initialize(intended_time, opts = {}, &block)
def initialize(intended_time, opts = {}, &block) raise ArgumentError.new('no block given') unless block_given? TimerSet.calculate_schedule_time(intended_time) # raises exceptons super(NO_VALUE, opts) self.observers = CopyOnNotifyObserverSet.new @intended_time = intended_time @state = :unscheduled @task = block end
def process_task
def process_task if compare_and_set_state(:in_progress, :pending) success, val, reason = SafeTaskExecutor.new(@task).execute mutex.synchronize do set_state(success, val, reason) event.set end time = Time.now observers.notify_and_delete_observers{ [time, self.value, reason] } end end