class Concurrent::ScheduledTask

@see Concurrent.timer
@!macro monotonic_clock_warning
#>> The task completed at 2013-11-07 12:26:09 -0500 with value ‘What does the fox say?’
sleep(3)
# wait for it…
task.pending? #=> true
task.execute
task.add_observer(observer)
task = Concurrent::ScheduledTask.new(2){ ‘What does the fox say?’ }
}.new
end
puts “The task completed at #{time} with value ‘#{value}’”
def update(time, value, reason)
observer = Class.new{
@example Task execution with observation
task.reason #=> #<StandardError: Call me maybe?>
task.value #=> nil
task.rejected? #=> true
task.fulfilled? #=> false
task.pending? #=> false
task.unscheduled? #=> false
sleep(3)
# wait for it…
task.pending? #=> true
task = Concurrent::ScheduledTask.execute(2){ raise StandardError.new(‘Call me maybe?’) }
@example Failed task execution
task.state #=> pending
task = Concurrent::ScheduledTask.execute(2){ ‘What do you get when you multiply 6 by 9?’ }
task.state #=> pending
task = Concurrent::ScheduledTask.new(2){ ‘What does the fox say?’ }.execute
@example One line creation and execution
task.value #=> ‘What does the fox say?’
task.rejected? #=> false
task.fulfilled? #=> true
task.pending? #=> false
task.unscheduled? #=> false
sleep(3)
# wait for it…
task.state #=> pending
task.execute
task.state #=> :unscheduled
task = Concurrent::ScheduledTask.new(2){ ‘What does the fox say?’ }
@example Successful task execution
price.value #=> 63.65
price.fulfilled? #=> true
price.state #=> :fulfilled
price.value #=> 63.65 (after blocking if necessary)
sleep(1) # do other stuff
price.value(0) #=> nil (does not block)
price.pending? #=> true
price.state #=> :pending
price = Concurrent::Future.execute{ Ticker.new.get_year_end_closing(‘TWTR’, 2013, api_key) }
# Future
abort(error_message) unless api_key
api_key = ENV[‘ALPHAVANTAGE_KEY’]

end
end
p e
rescue => e
year_end
year_end = data.first
end
data << row.to_f if row.include?(year.to_s)
CSV.parse(csv, headers: true) do |row|
end
return :rate_limit_exceeded
if csv.include?(‘call frequency’)
csv = URI.parse(uri).read
data = []
uri = “www.alphavantage.co/query?function=TIME_SERIES_MONTHLY&symbol=#{symbol}&apikey=#{api_key}&datatype=csv”
def get_year_end_closing(symbol, year, api_key)
class Ticker
require ‘open-uri’
require ‘csv’
require ‘concurrent/scheduled_task’
@example Basic usage
@!macro copy_options
behaves identically to [Future](Observable) with regard to these modules.
module from the Ruby standard library. With one exception ‘ScheduledTask`<br>(ruby-doc.org/stdlib-2.0/libdoc/observer/rdoc/Observable.html)
module and the
asynchronously. `ScheduledTask` mixes in both the [Obligation](Obligation)
The result of a `ScheduledTask` can be obtained either synchronously or
**Obligation and Observation**
A cancelled `ScheduledTask` cannot be restarted. It is immutable.
method returns a boolean indicating the success of the cancellation attempt.
other state, including `:processing`, cannot be cancelled. The `#cancel`
A `:pending` task can be cancelled using the `#cancel` method. A task in any
Cancellation
because it has implications for task cancellation.
state of the object will be `:processing`. This additional state is necessary
has one additional state, however. While the task (block) is being executed the
“future” behavior. This includes the expected lifecycle states. `ScheduledTask`
`ScheduledTask` mixes in the [Obligation](Obligation) module thus giving it
States
If no block is given an `ArgumentError` will be raised.
The final constructor argument is a block representing the task to be performed.<br><br>(Dereferenceable) module.
the only supported options are those recognized by the
The constructor can also be given zero or more processing options. Currently
time of task execution is set when the `execute` method is called.
equal to or less than zero will result in an exception. The actual schedule
representing a number of seconds in the future. Any other value or a numeric
with the `delay` argument. The delay is a numeric (floating point or integer)
The intended schedule time of task execution is set on object construction
It is a more feature-rich variant of {Concurrent.timer}.
(docs.oracle.com/javase/7/docs/api/java/util/concurrent/ScheduledExecutorService.html).
implementation is loosely based on Java’s
whereas a ‘ScheduledTask` is set to execute after a specified delay. This
important difference: A `Future` is set to execute as soon as possible
`ScheduledTask` is a close relative of `Concurrent::Future` but with one

def self.execute(delay, opts = {}, &task)

Raises:
  • (ArgumentError) - if no block is given

Returns:
  • (ScheduledTask) - the newly created `ScheduledTask` in the `:pending` state

Parameters:
  • delay (Float) -- the number of seconds to wait for before executing the task
def self.execute(delay, opts = {}, &task)
  new(delay, opts, &task).execute
end

def <=>(other)

@!visibility private

Comparator which orders by schedule time.
def <=>(other)
  schedule_time <=> other.schedule_time
end

def cancel

Returns:
  • (Boolean) - true if successfully cancelled else false
def cancel
  if compare_and_set_state(:cancelled, :pending, :unscheduled)
    complete(false, nil, CancelledOperationError.new)
    # To avoid deadlocks this call must occur outside of #synchronize
    # Changing the state above should prevent redundant calls
    @parent.send(:remove_task, self)
  else
    false
  end
end

def cancelled?

Returns:
  • (Boolean) - true if the task is in the given state else false
def cancelled?
  synchronize { ns_check_state?(:cancelled) }
end

def execute

Returns:
  • (ScheduledTask) - a reference to `self`
def execute
  if compare_and_set_state(:pending, :unscheduled)
    synchronize{ ns_schedule(@delay) }
  end
  self
end

def initial_delay

Returns:
  • (Float) - the initial delay.
def initial_delay
  synchronize { @delay }
end

def initialize(delay, opts = {}, &task)

Raises:
  • (ArgumentError) - When given a time that is in the past
  • (ArgumentError) - When no block is given

Options Hash: (**opts)
  • :args (object, Array) -- zero or more arguments to be passed the task

Other tags:
    Yield: - the task to be performed

Parameters:
  • delay (Float) -- the number of seconds to wait for before executing the task
def initialize(delay, opts = {}, &task)
  raise ArgumentError.new('no block given') unless block_given?
  raise ArgumentError.new('seconds must be greater than zero') if delay.to_f < 0.0
  super(NULL, opts, &nil)
  synchronize do
    ns_set_state(:unscheduled)
    @parent = opts.fetch(:timer_set, Concurrent.global_timer_set)
    @args = get_arguments_from(opts)
    @delay = delay.to_f
    @task = task
    @time = nil
    @executor = Options.executor_from_options(opts) || Concurrent.global_io_executor
    self.observers = Collection::CopyOnNotifyObserverSet.new
  end
end

def ns_reschedule(delay)

Returns:
  • (Boolean) - true if successfully rescheduled else false

Parameters:
  • delay (Float) -- the number of seconds to wait for before executing the task
def ns_reschedule(delay)
  return false unless ns_check_state?(:pending)
  @parent.send(:remove_task, self) && ns_schedule(delay)
end

def ns_schedule(delay)

Returns:
  • (Boolean) - true if successfully rescheduled else false

Parameters:
  • delay (Float) -- the number of seconds to wait for before executing the task
def ns_schedule(delay)
  @delay = delay
  @time = Concurrent.monotonic_time + @delay
  @parent.send(:post_task, self)
end

def process_task

@!visibility private

Execute the task.
def process_task
  safe_execute(@task, @args)
end

def processing?

Returns:
  • (Boolean) - true if the task is in the given state else false
def processing?
  synchronize { ns_check_state?(:processing) }
end

def reschedule(delay)

Raises:
  • (ArgumentError) - When given a time that is in the past

Returns:
  • (Boolean) - true if successfully rescheduled else false

Parameters:
  • delay (Float) -- the number of seconds to wait for before executing the task
def reschedule(delay)
  delay = delay.to_f
  raise ArgumentError.new('seconds must be greater than zero') if delay < 0.0
  synchronize{ ns_reschedule(delay) }
end

def reset

Returns:
  • (Boolean) - true if successfully rescheduled else false
def reset
  synchronize{ ns_reschedule(@delay) }
end

def schedule_time

Returns:
  • (Float) - the schedule time or nil if `unscheduled`
def schedule_time
  synchronize { @time }
end