lib/aws-sdk-core/plugins/retries/client_rate_limiter.rb
# frozen_string_literal: true module Aws module Plugins module Retries # @api private # Used only in 'adaptive' retry mode class ClientRateLimiter MIN_CAPACITY = 1 MIN_FILL_RATE = 0.5 SMOOTH = 0.8 # How much to scale back after a throttling response BETA = 0.7 # Controls how aggressively we scale up after being throttled SCALE_CONSTANT = 0.4 def initialize @mutex = Mutex.new @fill_rate = nil @max_capacity = nil @current_capacity = 0 @last_timestamp = nil @enabled = false @measured_tx_rate = 0 @last_tx_rate_bucket = Aws::Util.monotonic_seconds @request_count = 0 @last_max_rate = 0 @last_throttle_time = Aws::Util.monotonic_seconds @calculated_rate = nil end def token_bucket_acquire(amount, wait_to_fill = true) # Client side throttling is not enabled until we see a # throttling error return unless @enabled @mutex.synchronize do token_bucket_refill # Next see if we have enough capacity for the requested amount while @current_capacity < amount raise Aws::Errors::RetryCapacityNotAvailableError unless wait_to_fill @mutex.sleep((amount - @current_capacity) / @fill_rate) token_bucket_refill end @current_capacity -= amount end end def update_sending_rate(is_throttling_error) @mutex.synchronize do update_measured_rate if is_throttling_error rate_to_use = if @enabled [@measured_tx_rate, @fill_rate].min else @measured_tx_rate end # The fill_rate is from the token bucket @last_max_rate = rate_to_use calculate_time_window @last_throttle_time = Aws::Util.monotonic_seconds @calculated_rate = cubic_throttle(rate_to_use) enable_token_bucket else calculate_time_window @calculated_rate = cubic_success(Aws::Util.monotonic_seconds) end new_rate = [@calculated_rate, 2 * @measured_tx_rate].min token_bucket_update_rate(new_rate) end end private def token_bucket_refill timestamp = Aws::Util.monotonic_seconds unless @last_timestamp @last_timestamp = timestamp return end fill_amount = (timestamp - @last_timestamp) * @fill_rate @current_capacity = [ @max_capacity, @current_capacity + fill_amount ].min @last_timestamp = timestamp end def token_bucket_update_rate(new_rps) # Refill based on our current rate before we update to the # new fill rate token_bucket_refill @fill_rate = [new_rps, MIN_FILL_RATE].max @max_capacity = [new_rps, MIN_CAPACITY].max # When we scale down we can't have a current capacity that exceeds our # max_capacity. @current_capacity = [@current_capacity, @max_capacity].min end def enable_token_bucket @enabled = true end def update_measured_rate t = Aws::Util.monotonic_seconds time_bucket = (t * 2).floor / 2.0 @request_count += 1 if time_bucket > @last_tx_rate_bucket current_rate = @request_count / (time_bucket - @last_tx_rate_bucket) @measured_tx_rate = (current_rate * SMOOTH) + (@measured_tx_rate * (1 - SMOOTH)) @request_count = 0 @last_tx_rate_bucket = time_bucket end end def calculate_time_window # This is broken out into a separate calculation because it only # gets updated when @last_max_rate changes so it can be cached. @time_window = ((@last_max_rate * (1 - BETA)) / SCALE_CONSTANT)**(1.0 / 3) end def cubic_success(timestamp) dt = timestamp - @last_throttle_time (SCALE_CONSTANT * ((dt - @time_window)**3)) + @last_max_rate end def cubic_throttle(rate_to_use) rate_to_use * BETA end end end end end