class Aws::S3::MultipartFileUploader
@api private
def abort_upload(upload_id, options, errors)
def abort_upload(upload_id, options, errors) @client.abort_multipart_upload( bucket: options[:bucket], key: options[:key], upload_id: upload_id ) msg = "multipart upload failed: #{errors.map(&:message).join("; ")}" raise MultipartUploadError.new(msg, errors) rescue MultipartUploadError => error raise error rescue => error msg = "failed to abort multipart upload: #{error.message}" raise MultipartUploadError.new(msg, errors + [error]) end
def complete_upload(upload_id, parts, options)
def complete_upload(upload_id, parts, options) @client.complete_multipart_upload( bucket: options[:bucket], key: options[:key], upload_id: upload_id, multipart_upload: { parts: parts } ) end
def compute_default_part_size(source_size)
def compute_default_part_size(source_size) [(source_size.to_f / MAX_PARTS).ceil, MIN_PART_SIZE].max.to_i end
def compute_parts(upload_id, source, options)
def compute_parts(upload_id, source, options) size = File.size(source) default_part_size = compute_default_part_size(size) offset = 0 part_number = 1 parts = [] while offset < size parts << upload_part_opts(options).merge( upload_id: upload_id, part_number: part_number, body: FilePart.new( source: source, offset: offset, size: part_size(size, default_part_size, offset) ) ) part_number += 1 offset += default_part_size end parts end
def create_opts(options)
def create_opts(options) CREATE_OPTIONS.inject({}) do |hash, key| hash[key] = options[key] if options.key?(key) hash end end
def initialize(options = {})
(**options)
-
:thread_count
(Integer
) -- -
:client
(Client
) --
def initialize(options = {}) @client = options[:client] || Client.new @thread_count = options[:thread_count] || THREAD_COUNT end
def initiate_upload(options)
def initiate_upload(options) @client.create_multipart_upload(create_opts(options)).upload_id end
def part_size(total_size, part_size, offset)
def part_size(total_size, part_size, offset) if offset + part_size > total_size total_size - offset else part_size end end
def upload(source, options = {})
-
(void)
-
Options Hash:
(**options)
-
:key
(required, String
) -- The key for the object. -
:bucket
(required, String
) -- The bucket to upload to.
Parameters:
-
source
(String, Pathname, File, Tempfile
) -- The file to upload.
def upload(source, options = {}) if File.size(source) < MIN_PART_SIZE raise ArgumentError, FILE_TOO_SMALL else upload_id = initiate_upload(options) parts = upload_parts(upload_id, source, options) complete_upload(upload_id, parts, options) end end
def upload_in_threads(pending, completed)
def upload_in_threads(pending, completed) threads = [] @thread_count.times do thread = Thread.new do begin while part = pending.shift resp = @client.upload_part(part) part[:body].close completed.push(etag: resp.etag, part_number: part[:part_number]) end nil rescue => error # keep other threads from uploading other parts pending.clear! error end end thread.abort_on_exception = true threads << thread end threads.map(&:value).compact end
def upload_part_opts(options)
def upload_part_opts(options) UPLOAD_PART_OPTIONS.inject({}) do |hash, key| hash[key] = options[key] if options.key?(key) hash end end
def upload_parts(upload_id, source, options)
def upload_parts(upload_id, source, options) pending = PartList.new(compute_parts(upload_id, source, options)) completed = PartList.new errors = upload_in_threads(pending, completed) if errors.empty? completed.to_a.sort_by { |part| part[:part_number] } else abort_upload(upload_id, options, errors) end end