class Aws::S3::MultipartFileUploader
@api private
def abort_upload(upload_id, options, errors)
def abort_upload(upload_id, options, errors) @client.abort_multipart_upload( bucket: options[:bucket], key: options[:key], upload_id: upload_id ) msg = "multipart upload failed: #{errors.map(&:message).join('; ')}" raise MultipartUploadError.new(msg, errors) rescue MultipartUploadError => error raise error rescue => error msg = "failed to abort multipart upload: #{error.message}. "\ "Multipart upload failed: #{errors.map(&:message).join('; ')}" raise MultipartUploadError.new(msg, errors + [error]) end
def checksum_key?(key)
def checksum_key?(key) CHECKSUM_KEYS.include?(key) end
def complete_opts(options)
def complete_opts(options) opts = {} opts[:checksum_type] = 'FULL_OBJECT' if has_checksum_key?(options.keys) COMPLETE_OPTIONS.inject(opts) do |hash, key| hash[key] = options[key] if options.key?(key) hash end end
def complete_upload(upload_id, parts, options)
def complete_upload(upload_id, parts, options) @client.complete_multipart_upload( **complete_opts(options).merge( upload_id: upload_id, multipart_upload: { parts: parts } ) ) end
def compute_default_part_size(source_size)
def compute_default_part_size(source_size) [(source_size.to_f / MAX_PARTS).ceil, MIN_PART_SIZE].max.to_i end
def compute_parts(upload_id, source, options)
def compute_parts(upload_id, source, options) size = File.size(source) default_part_size = compute_default_part_size(size) offset = 0 part_number = 1 parts = [] while offset < size parts << upload_part_opts(options).merge( upload_id: upload_id, part_number: part_number, body: FilePart.new( source: source, offset: offset, size: part_size(size, default_part_size, offset) ) ) part_number += 1 offset += default_part_size end parts end
def create_opts(options)
def create_opts(options) opts = { checksum_algorithm: Aws::Plugins::ChecksumAlgorithm::DEFAULT_CHECKSUM } opts[:checksum_type] = 'FULL_OBJECT' if has_checksum_key?(options.keys) CREATE_OPTIONS.inject(opts) do |hash, key| hash[key] = options[key] if options.key?(key) hash end end
def has_checksum_key?(keys)
def has_checksum_key?(keys) keys.any? { |key| checksum_key?(key) } end
def initialize(options = {})
(**options)
-
:thread_count
(Integer
) -- -
:client
(Client
) --
def initialize(options = {}) @client = options[:client] || Client.new @thread_count = options[:thread_count] || THREAD_COUNT end
def initiate_upload(options)
def initiate_upload(options) @client.create_multipart_upload(create_opts(options)).upload_id end
def part_size(total_size, part_size, offset)
def part_size(total_size, part_size, offset) if offset + part_size > total_size total_size - offset else part_size end end
def upload(source, options = {})
-
(Seahorse::Client::Response)
- - the CompleteMultipartUploadResponse
Options Hash:
(**options)
-
:progress_callback
(Proc
) -- -
:key
(required, String
) -- The key for the object. -
:bucket
(required, String
) -- The bucket to upload to.
Parameters:
-
source
(String, Pathname, File, Tempfile
) -- The file to upload.
def upload(source, options = {}) if File.size(source) < MIN_PART_SIZE raise ArgumentError, FILE_TOO_SMALL else upload_id = initiate_upload(options) parts = upload_parts(upload_id, source, options) complete_upload(upload_id, parts, options) end end
def upload_in_threads(pending, completed, options)
def upload_in_threads(pending, completed, options) threads = [] if (callback = options[:progress_callback]) progress = MultipartProgress.new(pending, callback) end options.fetch(:thread_count, @thread_count).times do thread = Thread.new do begin while part = pending.shift if progress part[:on_chunk_sent] = proc do |_chunk, bytes, _total| progress.call(part[:part_number], bytes) end end resp = @client.upload_part(part) part[:body].close completed_part = { etag: resp.etag, part_number: part[:part_number] } algorithm = resp.context.params[:checksum_algorithm] k = "checksum_#{algorithm.downcase}".to_sym completed_part[k] = resp.send(k) completed.push(completed_part) end nil rescue => error # keep other threads from uploading other parts pending.clear! error end end threads << thread end threads.map(&:value).compact end
def upload_part_opts(options)
def upload_part_opts(options) UPLOAD_PART_OPTIONS.inject({}) do |hash, key| if options.key?(key) # don't pass through checksum calculations hash[key] = options[key] unless checksum_key?(key) end hash end end
def upload_parts(upload_id, source, options)
def upload_parts(upload_id, source, options) pending = PartList.new(compute_parts(upload_id, source, options)) completed = PartList.new errors = upload_in_threads(pending, completed, options) if errors.empty? completed.to_a.sort_by { |part| part[:part_number] } else abort_upload(upload_id, options, errors) end end