lib/aws-sdk-s3/plugins/streaming_retry.rb
# frozen_string_literal: true require 'forwardable' module Aws module S3 module Plugins # A wrapper around BlockIO that adds no-ops for truncate and rewind # @api private class RetryableBlockIO extend Forwardable def_delegators :@block_io, :write, :read, :size def initialize(block_io) @block_io = block_io end def truncate(_integer); end def rewind; end end # A wrapper around ManagedFile that adds no-ops for truncate and rewind # @api private class RetryableManagedFile extend Forwardable def_delegators :@file, :write, :read, :size, :open?, :close def initialize(managed_file) @file = managed_file end def truncate(_integer); end def rewind; end end class NonRetryableStreamingError < StandardError def initialize(error) super('Unable to retry request - retry could result in processing duplicated chunks.') set_backtrace(error.backtrace) @original_error = error end attr_reader :original_error end # This handler works with the ResponseTarget plugin to provide smart # retries of S3 streaming operations that support the range parameter # (currently only: get_object). When a 200 OK with a TruncatedBodyError # is received this handler will add a range header that excludes the # data that has already been processed (written to file or sent to # the target Proc). # It is important to not write data to the custom target in the case of # a non-success response. We do not want to write an XML error # message to someone's file or pass it to a user's Proc. # @api private class StreamingRetry < Seahorse::Client::Plugin class Handler < Seahorse::Client::Handler def call(context) target = context.params[:response_target] || context[:response_target] # retry is only supported when range is NOT set on the initial request if supported_target?(target) && !context.params[:range] add_event_listeners(context, target) end @handler.call(context) end private def add_event_listeners(context, target) context.http_response.on_headers(200..299) do case context.http_response.body when Seahorse::Client::BlockIO then context.http_response.body = RetryableBlockIO.new(context.http_response.body) when Seahorse::Client::ManagedFile then context.http_response.body = RetryableManagedFile.new(context.http_response.body) end end context.http_response.on_headers(400..599) do context.http_response.body = StringIO.new # something to write the error to end context.http_response.on_success(200..299) do body = context.http_response.body if body.is_a?(RetryableManagedFile) && body.open? body.close end end context.http_response.on_error do |error| if retryable_body?(context) if truncated_body?(error) context.http_request.headers[:range] = "bytes=#{context.http_response.body.size}-" else case context.http_response.body when RetryableManagedFile # call rewind on the underlying file context.http_response.body.instance_variable_get(:@file).rewind else raise NonRetryableStreamingError, error end end end end end def truncated_body?(error) error.is_a?(Seahorse::Client::NetworkingError) && error.original_error.is_a?( Seahorse::Client::NetHttp::Handler::TruncatedBodyError ) end def retryable_body?(context) context.http_response.body.is_a?(RetryableBlockIO) || context.http_response.body.is_a?(RetryableManagedFile) end def supported_target?(target) case target when Proc, String, Pathname then true else false end end end handler(Handler, step: :sign, operations: [:get_object], priority: 10) end end end end