class Google::Cloud::Env::LazyValue


outside of Google::Cloud::Env.
it to be available to other libraries. Currently it should not be used
We keep this private for now so we can move it in the future if we need
{LazyValue.expiring_value} or {LazyValue.raise_expiring_error}.
the next access following expiration, by calling
expire after a specified number of seconds, forcing a recomputation on
process. However, a computation can also cause its result (or error) to
retries have been exhausted) is maintained for the lifetime of the Ruby
By default, a computation’s memoized value (or final error after
failure before an access will retry the computation.
configurable, as is the retry “interval”, i.e. the time since the last
previous computation failed. The maximum number of retries is
configured so subsequent accesses will retry the computation if the
memoized and reraised on subsequent accesses. A LazyValue can also be
If a computation fails with an exception, that exception will also be
off their own computation.
complete, and will use that computation’s result rather than kicking
requesting the value will wait until the existing computation is
thread is already in the middle of a computation, any new threads
At most one thread will be allowed to run the computation; if another
value. Subsequent requests will return the cached value.
it will call a given block to compute its value, and will cache that
A lazy value box with thread-safe memoization. The first time accessed
@private
#

def await *extra_args, transient_errors: nil, max_tries: 1, max_time: nil, delay_epsilon: 0.0001

Raises:
  • (Exception) - if a fatal error happened, or retries have been

Returns:
  • (Object) - the value

Parameters:
  • delay_epsilon (Numeric) -- An extra delay in seconds to ensure
  • max_time (Numeric, nil) -- The maximum time in seconds this will
  • max_tries (Integer, nil) -- The maximum number of times this will
  • transient_errors (Array) -- An array of exception classes
  • extra_args (Array) -- extra arguments to pass to the block
def await *extra_args, transient_errors: nil, max_tries: 1, max_time: nil, delay_epsilon: 0.0001
  transient_errors ||= [StandardError]
  transient_errors = Array transient_errors
  expiry_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + max_time if max_time
  begin
    get(*extra_args)
  rescue *transient_errors
    # A snapshot of the state. It is possible that another thread has
    # changed this state since we received the error. This is okay
    # because our specification for this method is conservative:
    # whatever we return will have been correct at some point.
    state = internal_state
    # Don't retry unless we're in a state where retries can happen.
    raise if [:failed, :success].include? state[0]
    if max_tries
      # Handle retry countdown
      max_tries -= 1
      raise unless max_tries.positive?
    end
    # Determine the next delay
    delay = determine_await_retry_delay state, expiry_time, delay_epsilon
    # nil means we've exceeded the max time
    raise if delay.nil?
    sleep delay if delay.positive?
    retry
  end
end

def cached_value

Other tags:
    Private: -
def cached_value
  raise @error if @error
  @value
end

def determine_await_retry_delay state, expiry_time, delay_epsilon

Other tags:
    Private: -
def determine_await_retry_delay state, expiry_time, delay_epsilon
  cur_time = Process.clock_gettime Process::CLOCK_MONOTONIC
  next_run_time =
    if state[0] == :pending && state[2]
      # Run at end of the current retry delay, plus an epsilon,
      # if in pending state
      state[2] + delay_epsilon
    else
      # Default to run immediately otherwise
      cur_time
    end
  # Signal nil if we're past the max time
  return nil if expiry_time && next_run_time > expiry_time
  # No delay if we're already past the time we want to run
  return 0 if next_run_time < cur_time
  next_run_time - cur_time
end

def determine_expiry lifetime

Other tags:
    Private: -
def determine_expiry lifetime
  lifetime ? Process.clock_gettime(Process::CLOCK_MONOTONIC) + lifetime : nil
end

def do_expire

Other tags:
    Private: -
def do_expire
  @retries.reset!
  @value = @error = @expires_at = nil
end

def enter_backfill

Other tags:
    Private: -
def enter_backfill
  return unless @backfill_count.positive?
  @backfill_notify = Thread::ConditionVariable.new
end

def enter_compute cur_time

Other tags:
    Private: -
def enter_compute cur_time
  @computing_thread = Thread.current
  @compute_notify = Thread::ConditionVariable.new
  @expires_at = cur_time
  @value = @error = nil
end

def expire!

Returns:
  • (true, false) - whether the cache was expired
def expire!
  @mutex.synchronize do
    wait_backfill
    return false unless @retries.finished?
    do_expire
    true
  end
end

def expiring_value lifetime, value

Parameters:
  • value (Object) -- the computation result
  • lifetime (Numeric) -- timeout in seconds
def expiring_value lifetime, value
  return value unless lifetime
  ExpiringValue.new lifetime, value
end

def get *extra_args

Raises:
  • (Exception) - if an error happened while computing the value

Returns:
  • (Object) - the value
def get *extra_args
  @mutex.synchronize do
    # Wait for any backfill to complete, and handle expiration first
    # because it might change the state.
    wait_backfill
    do_expire if should_expire?
    # Main state handling
    if @retries.finished?
      # finished state: return value or error
      return cached_value
    elsif !@compute_notify.nil?
      # computing state: wait for the computing thread to finish then
      # return its result
      wait_compute
      return cached_value
    else
      # pending state
      cur_time = Process.clock_gettime Process::CLOCK_MONOTONIC
      # waiting for the next retry: return current error
      raise @error if @expires_at && cur_time < @expires_at
      # no delay: compute in the current thread
      enter_compute cur_time
      # and continue below
    end
  end
  # Gets here if we just transitioned from pending to compute
  perform_compute extra_args
end

def handle_failure error

Other tags:
    Private: -
def handle_failure error
  expires_at = nil
  if error.is_a? ExpiringError
    expires_at = determine_expiry error.lifetime
    error = error.cause
  end
  if Thread.current.equal? @computing_thread
    retry_delay = @retries.next start_time: @expires_at
    @value = nil
    @error = error
    @expires_at =
      if retry_delay.nil?
        # No more retries; use the expiration for the error
        expires_at
      elsif retry_delay.positive?
        determine_expiry retry_delay
      end
    enter_backfill
    leave_compute
  end
  raise error
end

def handle_success value

Other tags:
    Private: -
def handle_success value
  expires_at = nil
  if value.is_a? ExpiringValue
    expires_at = determine_expiry value.lifetime
    value = value.value
  end
  if Thread.current.equal? @computing_thread
    @retries.finish!
    @error = nil
    @value = value
    @expires_at = expires_at
    enter_backfill
    leave_compute
  end
  value
end

def initialize retries: nil, &block

Parameters:
  • block (Proc) -- A block that can be called to attempt to compute
  • retries (Retries) -- A retry manager. The default is a retry
def initialize retries: nil, &block
  @retries = retries || Retries.new
  @compute_handler = block
  raise ArgumentError, "missing compute handler block" unless block
  # Internally implemented by a state machine, protected by a mutex that
  # ensures state transitions are consistent. The states themselves are
  # implicit in the values of the various instance variables. The
  # following are the major states:
  #
  # 1. **Pending** The value is not known and needs to be computed.
  #     @retries.finished? is false.
  #     @value is nil.
  #     @error is nil if no previous attempt has yet been made to
  #         compute the value, or set to the error that resulted from
  #         the most recent attempt.
  #     @expires_at is set to the monotonic time of the end of the
  #         current retry delay, or nil if the next computation attempt
  #         should happen immediately at the next access.
  #     @computing_thread is nil.
  #     @compute_notify is nil.
  #     @backfill_notify is set if currently backfilling, otherwise nil.
  #     From this state, calling #get will start computation (first
  #     waiting on @backfill_notify if present). Calling #expire! will
  #     have no effect.
  #
  # 2. **Computing** One thread has initiated computation. All other
  #     threads will be blocked (waiting on @compute_notify) until the
  #     computing thread finishes.
  #     @retries.finished? is false.
  #     @value and @error are nil.
  #     @expires_at is set to the monotonic time when computing started.
  #     @computing_thread is set to the thread that is computing.
  #     @compute_notify is set.
  #     @backfill_notify is nil.
  #     From this state, calling #get will cause the thread to wait
  #     (on @compute_notify) for the computing thread to complete.
  #     Calling #expire! will have no effect.
  #     When the computing thread finishes, it will transition either
  #     to Finished if the computation was successful or failed with
  #     no more retries, or back to Pending if computation failed with
  #     at least one retry remaining. It might also set @backfill_notify
  #     if other threads are waiting for completion.
  #
  # 3. **Finished** Computation has succeeded, or has failed and no
  #     more retries remain.
  #     @retries.finished? is true.
  #     either @value or @error is set, and the other is nil, depending
  #         on whether the final state is success or failure. (If both
  #         are nil, it is considered a @value of nil.)
  #     @expires_at is set to the monotonic time of expiration, or nil
  #         if there is no expiration.
  #     @computing_thread is nil.
  #     @compute_notify is nil.
  #     @backfill_notify is set if currently backfilling, otherwise nil.
  #     From this state, calling #get will either return the result or
  #     raise the error. If the current time exceeds @expires_at,
  #     however, it will block on @backfill_notify (if present), and
  #     and then transition to Pending first, and proceed from there.
  #     Calling #expire! will block on @backfill_notify (if present)
  #     and then transition to Pending,
  #
  # @backfill_notify can be set in the Pending or Finished states. This
  # happens when threads that had been waiting on the previous
  # computation are still clearing out and returning their results.
  # Backfill must complete before the next computation attempt can be
  # started from the Pending state, or before an expiration can take
  # place from the Finished state. This prevents an "overlap" situation
  # where a thread that had been waiting for a previous computation,
  # isn't able to return the new result before some other thread starts
  # a new computation or expires the value. Note that it is okay for
  # #set! to be called during backfill; the threads still backfilling
  # will simply return the new value.
  #
  # Note: One might ask if it would be simpler to extend the mutex
  # across the entire computation, having it protect the computation
  # itself, instead of the current approach of having explicit compute
  # and backfill states with notifications and having the mutex protect
  # only the state transition. However, this would not have been able
  # to satisfy the requirement that we be able to detect whether a
  # thread asked for the value during another thread's computation,
  # and thus should "share" in that computation's result even if it's
  # a failure (rather than kicking off a retry). Additionally, we
  # consider it dangerous to have the computation block run inside a
  # mutex, because arbitrary code can run there which might result in
  # deadlocks.
  @mutex = Thread::Mutex.new
  # The evaluated, cached value, which could be nil.
  @value = nil
  # The last error encountered
  @error = nil
  # If non-nil, this is the CLOCK_MONOTONIC time when the current state
  # expires. If the state is finished, this is the time the current
  # value or error expires (while nil means it never expires). If the
  # state is pending, this is the time the wait period before the next
  # retry expires (and nil means there is no delay.) If the state is
  # computing, this is the time when computing started.
  @expires_at = nil
  # Set to a condition variable during computation. Broadcasts when the
  # computation is complete. Any threads wanting to get the value
  # during computation must wait on this first.
  @compute_notify = nil
  # Set to a condition variable during backfill. Broadcasts when the
  # last backfill thread is complete. Any threads wanting to expire the
  # cache or start a new computation during backfill must wait on this
  # first.
  @backfill_notify = nil
  # The number of threads waiting on backfill. Used to determine
  # whether to activate backfill_notify when a computation completes.
  @backfill_count = 0
  # The thread running the current computation. This is tested against
  # new requests to protect against deadlocks where a thread tries to
  # re-enter from its own computation. This is also tested when a
  # computation completes, to ensure that the computation is still
  # relevant (i.e. if #set! interrupts a computation, this is reset to
  # nil).
  @computing_thread = nil
end

def internal_state

Returns:
  • (Array) -
def internal_state
  @mutex.synchronize do
    if @retries.finished?
      if @error
        [:failed, @error, @expires_at]
      else
        [:success, @value, @expires_at]
      end
    elsif @compute_notify.nil?
      [:pending, @error, @expires_at]
    else
      [:computing, nil, @expires_at]
    end
  end
end

def leave_backfill

Other tags:
    Private: -
def leave_backfill
  return unless @backfill_count.zero?
  @backfill_notify.broadcast
  @backfill_notify = nil
end

def leave_compute

Other tags:
    Private: -
def leave_compute
  @computing_thread = nil
  @compute_notify.broadcast
  @compute_notify = nil
end

def perform_compute extra_args

Other tags:
    Private: -
def perform_compute extra_args
  value = @compute_handler.call(*extra_args)
  @mutex.synchronize do
    handle_success value
  end
rescue Exception => e # rubocop:disable Lint/RescueException
  @mutex.synchronize do
    handle_failure e
  end
end

def raise_expiring_error lifetime, error, *args

Parameters:
  • args (Array) -- any arguments to pass to an error constructor
  • error (String, Exception, Class) -- the error to raise
  • lifetime (Numeric) -- timeout in seconds
def raise_expiring_error lifetime, error, *args
  raise error unless lifetime
  raise ExpiringError, lifetime if error.equal? $ERROR_INFO
  if error.is_a?(Class) && error.ancestors.include?(Exception)
    error = error.new(*args)
  elsif !error.is_a? Exception
    error = RuntimeError.new error.to_s
  end
  begin
    raise error
  rescue error.class
    raise ExpiringError, lifetime
  end
end

def set! value, lifetime: nil

Returns:
  • (Object) - the value

Parameters:
  • lifetime (Numeric) -- the lifetime until expiration in seconds,
  • value (Object) -- the value to set
def set! value, lifetime: nil
  @mutex.synchronize do
    @value = value
    @expires_at = determine_expiry lifetime
    @error = nil
    @retries.finish!
    if @compute_notify.nil?
      enter_backfill
      leave_compute
    end
    value
  end
end

def should_expire?

Other tags:
    Private: -
def should_expire?
  @retries.finished? && @expires_at && Process.clock_gettime(Process::CLOCK_MONOTONIC) >= @expires_at
end

def wait_backfill

Other tags:
    Private: -
def wait_backfill
  @backfill_notify.wait @mutex while @backfill_notify
end

def wait_compute

Other tags:
    Private: -
def wait_compute
  if Thread.current.equal? @computing_thread
    raise ThreadError, "deadlock: tried to call LazyValue#get from its own computation"
  end
  @backfill_count += 1
  begin
    @compute_notify.wait @mutex
  ensure
    @backfill_count -= 1
    leave_backfill
  end
end