class Aws::Plugins::RetryErrors::Handler
def add_retry_headers(context)
def add_retry_headers(context) request_pairs = { 'attempt' => context.retries, 'max' => context.config.max_attempts } if (ttl = compute_request_ttl(context)) request_pairs['ttl'] = ttl end # create the request header formatted_header = request_pairs.map { |k, v| "#{k}=#{v}" }.join('; ') context.http_request.headers['amz-sdk-request'] = formatted_header end
def call(context)
def call(context) context.metadata[:retries] ||= {} config = context.config get_send_token(config) add_retry_headers(context) response = @handler.call(context) error_inspector = Retries::ErrorInspector.new( response.error, response.context.http_response.status_code ) request_bookkeeping(context, response, error_inspector) if error_inspector.endpoint_discovery?(context) key = config.endpoint_cache.extract_key(context) config.endpoint_cache.delete(key) end # Clock correction needs to be updated from the response even when # the request is not retryable but should only be updated # in the case of clock skew errors if error_inspector.clock_skew?(context) config.clock_skew.update_clock_correction(context) end # Estimated skew needs to be updated on every request config.clock_skew.update_estimated_skew(context) return response unless retryable?(context, response, error_inspector) return response if context.retries >= config.max_attempts - 1 context.metadata[:retries][:capacity_amount] = config.retry_quota.checkout_capacity(error_inspector) return response unless context.metadata[:retries][:capacity_amount] > 0 delay = exponential_backoff(context.retries) Kernel.sleep(delay) retry_request(context, error_inspector) end
def compute_request_ttl(context)
def compute_request_ttl(context) return if context.operation.async endpoint = context.http_request.endpoint estimated_skew = context.config.clock_skew.estimated_skew(endpoint) if context.config.respond_to?(:http_read_timeout) read_timeout = context.config.http_read_timeout end if estimated_skew && read_timeout (Time.now.utc + read_timeout + estimated_skew) .strftime('%Y%m%dT%H%M%SZ') end end
def exponential_backoff(retries)
def exponential_backoff(retries) # for a transient error, use backoff [Kernel.rand * 2**retries, MAX_BACKOFF].min end
def get_send_token(config)
def get_send_token(config) # either fail fast or block until a token becomes available # must be configurable # need a maximum rate at which we can send requests (max_send_rate) # is unset until a throttle is seen if config.retry_mode == 'adaptive' config.client_rate_limiter.token_bucket_acquire( 1, config.adaptive_retry_wait_to_fill ) end end
def refresh_credentials?(context, error)
def refresh_credentials?(context, error) error.expired_credentials? && context.config.credentials.respond_to?(:refresh!) end
def request_bookkeeping(context, response, error_inspector)
maxsendrate is updated if on adaptive mode and based on response
def request_bookkeeping(context, response, error_inspector) config = context.config if response.successful? config.retry_quota.release( context.metadata[:retries][:capacity_amount] ) end if config.retry_mode == 'adaptive' is_throttling_error = error_inspector.throttling_error? config.client_rate_limiter.update_sending_rate(is_throttling_error) end end
def retry_request(context, error)
def retry_request(context, error) context.retries += 1 context.config.credentials.refresh! if refresh_credentials?(context, error) context.http_request.body.rewind context.http_response.reset call(context) end
def retryable?(context, response, error_inspector)
def retryable?(context, response, error_inspector) return false if response.successful? error_inspector.retryable?(context) && context.http_response.body.respond_to?(:truncate) end