class Aws::S3::MultipartStreamUploader
@api private
def abort_upload(upload_id, options, errors)
def abort_upload(upload_id, options, errors) @client.abort_multipart_upload( bucket: options[:bucket], key: options[:key], upload_id: upload_id ) msg = "multipart upload failed: #{errors.map(&:message).join("; ")}" raise MultipartUploadError.new(msg, errors) rescue MultipartUploadError => error raise error rescue => error msg = "failed to abort multipart upload: #{error.message}" raise MultipartUploadError.new(msg, errors + [error]) end
def complete_opts(options)
def complete_opts(options) COMPLETE_UPLOAD_OPTIONS.inject({}) do |hash, key| hash[key] = options[key] if options.key?(key) hash end end
def complete_upload(upload_id, parts, options)
def complete_upload(upload_id, parts, options) @client.complete_multipart_upload( **complete_opts(options).merge( upload_id: upload_id, multipart_upload: { parts: parts } ) ) end
def create_opts(options)
def create_opts(options) CREATE_OPTIONS.inject({}) do |hash, key| hash[key] = options[key] if options.key?(key) hash end end
def initialize(options = {})
(**options)
-
:client
(Client
) --
def initialize(options = {}) @client = options[:client] || Client.new @tempfile = options[:tempfile] @part_size = options[:part_size] || PART_SIZE @thread_count = options[:thread_count] || THREAD_COUNT end
def initiate_upload(options)
def initiate_upload(options) @client.create_multipart_upload(create_opts(options)).upload_id end
def read_to_part_body(read_pipe)
def read_to_part_body(read_pipe) return if read_pipe.closed? temp_io = @tempfile ? Tempfile.new(TEMPFILE_PREIX) : StringIO.new(String.new) temp_io.binmode bytes_copied = IO.copy_stream(read_pipe, temp_io, @part_size) temp_io.rewind if bytes_copied == 0 if Tempfile === temp_io temp_io.close temp_io.unlink end nil else temp_io end end
def upload(options = {}, &block)
-
(Seahorse::Client::Response)
- - the CompleteMultipartUploadResponse
Options Hash:
(**options)
-
:key
(required, String
) -- -
:bucket
(required, String
) --
def upload(options = {}, &block) upload_id = initiate_upload(options) parts = upload_parts(upload_id, options, &block) complete_upload(upload_id, parts, options) end
def upload_in_threads(read_pipe, completed, options, thread_errors)
def upload_in_threads(read_pipe, completed, options, thread_errors) mutex = Mutex.new part_number = 0 @thread_count.times.map do thread = Thread.new do begin loop do body, thread_part_number = mutex.synchronize do [read_to_part_body(read_pipe), part_number += 1] end break unless (body || thread_part_number == 1) begin part = options.merge( body: body, part_number: thread_part_number, ) resp = @client.upload_part(part) completed_part = {etag: resp.etag, part_number: part[:part_number]} # get the requested checksum from the response if part[:checksum_algorithm] k = "checksum_#{part[:checksum_algorithm].downcase}".to_sym completed_part[k] = resp[k] end completed.push(completed_part) ensure if Tempfile === body body.close body.unlink elsif StringIO === body body.string.clear end end end nil rescue => error # keep other threads from uploading other parts mutex.synchronize do thread_errors.push(error) read_pipe.close_read unless read_pipe.closed? end error end end thread.abort_on_exception = true thread end end
def upload_part_opts(options)
def upload_part_opts(options) UPLOAD_PART_OPTIONS.inject({}) do |hash, key| hash[key] = options[key] if options.key?(key) hash end end
def upload_parts(upload_id, options, &block)
def upload_parts(upload_id, options, &block) completed = Queue.new thread_errors = [] errors = begin IO.pipe do |read_pipe, write_pipe| threads = upload_in_threads( read_pipe, completed, upload_part_opts(options).merge(upload_id: upload_id), thread_errors) begin block.call(write_pipe) ensure # Ensure the pipe is closed to avoid https://github.com/jruby/jruby/issues/6111 write_pipe.close end threads.map(&:value).compact end rescue => e thread_errors + [e] end if errors.empty? Array.new(completed.size) { completed.pop }.sort_by { |part| part[:part_number] } else abort_upload(upload_id, options, errors) end end