class Aws::S3::MultipartFileUploader

@api private

def abort_upload(upload_id, options, errors)

def abort_upload(upload_id, options, errors)
  @client.abort_multipart_upload(
    bucket: options[:bucket],
    key: options[:key],
    upload_id: upload_id
  )
  msg = "multipart upload failed: #{errors.map(&:message).join('; ')}"
  raise MultipartUploadError.new(msg, errors)
rescue MultipartUploadError => error
  raise error
rescue => error
  msg = "failed to abort multipart upload: #{error.message}. "\
    "Multipart upload failed: #{errors.map(&:message).join('; ')}"
  raise MultipartUploadError.new(msg, errors + [error])
end

def checksum_key?(key)

def checksum_key?(key)
  CHECKSUM_KEYS.include?(key)
end

def complete_opts(options)

def complete_opts(options)
  opts = {}
  opts[:checksum_type] = 'FULL_OBJECT' if has_checksum_key?(options.keys)
  COMPLETE_OPTIONS.inject(opts) do |hash, key|
    hash[key] = options[key] if options.key?(key)
    hash
  end
end

def complete_upload(upload_id, parts, options)

def complete_upload(upload_id, parts, options)
  @client.complete_multipart_upload(
    **complete_opts(options).merge(
      upload_id: upload_id,
      multipart_upload: { parts: parts }
    )
  )
end

def compute_default_part_size(source_size)

def compute_default_part_size(source_size)
  [(source_size.to_f / MAX_PARTS).ceil, MIN_PART_SIZE].max.to_i
end

def compute_parts(upload_id, source, options)

def compute_parts(upload_id, source, options)
  size = File.size(source)
  default_part_size = compute_default_part_size(size)
  offset = 0
  part_number = 1
  parts = []
  while offset < size
    parts << upload_part_opts(options).merge(
      upload_id: upload_id,
      part_number: part_number,
      body: FilePart.new(
        source: source,
        offset: offset,
        size: part_size(size, default_part_size, offset)
      )
    )
    part_number += 1
    offset += default_part_size
  end
  parts
end

def create_opts(options)

def create_opts(options)
  opts = { checksum_algorithm: Aws::Plugins::ChecksumAlgorithm::DEFAULT_CHECKSUM }
  opts[:checksum_type] = 'FULL_OBJECT' if has_checksum_key?(options.keys)
  CREATE_OPTIONS.inject(opts) do |hash, key|
    hash[key] = options[key] if options.key?(key)
    hash
  end
end

def has_checksum_key?(keys)

def has_checksum_key?(keys)
  keys.any? { |key| checksum_key?(key) }
end

def initialize(options = {})

Options Hash: (**options)
  • :thread_count (Integer) --
  • :client (Client) --
def initialize(options = {})
  @client = options[:client] || Client.new
  @thread_count = options[:thread_count] || THREAD_COUNT
end

def initiate_upload(options)

def initiate_upload(options)
  @client.create_multipart_upload(create_opts(options)).upload_id
end

def part_size(total_size, part_size, offset)

def part_size(total_size, part_size, offset)
  if offset + part_size > total_size
    total_size - offset
  else
    part_size
  end
end

def upload(source, options = {})

Returns:
  • (Seahorse::Client::Response) - - the CompleteMultipartUploadResponse

Options Hash: (**options)
  • :progress_callback (Proc) --
  • :key (required, String) -- The key for the object.
  • :bucket (required, String) -- The bucket to upload to.

Parameters:
  • source (String, Pathname, File, Tempfile) -- The file to upload.
def upload(source, options = {})
  if File.size(source) < MIN_PART_SIZE
    raise ArgumentError, FILE_TOO_SMALL
  else
    upload_id = initiate_upload(options)
    parts = upload_parts(upload_id, source, options)
    complete_upload(upload_id, parts, options)
  end
end

def upload_in_threads(pending, completed, options)

def upload_in_threads(pending, completed, options)
  threads = []
  if (callback = options[:progress_callback])
    progress = MultipartProgress.new(pending, callback)
  end
  options.fetch(:thread_count, @thread_count).times do
    thread = Thread.new do
      begin
        while part = pending.shift
          if progress
            part[:on_chunk_sent] =
              proc do |_chunk, bytes, _total|
                progress.call(part[:part_number], bytes)
              end
          end
          resp = @client.upload_part(part)
          part[:body].close
          completed_part = {
            etag: resp.etag,
            part_number: part[:part_number]
          }
          algorithm = resp.context.params[:checksum_algorithm]
          k = "checksum_#{algorithm.downcase}".to_sym
          completed_part[k] = resp.send(k)
          completed.push(completed_part)
        end
        nil
      rescue => error
        # keep other threads from uploading other parts
        pending.clear!
        error
      end
    end
    threads << thread
  end
  threads.map(&:value).compact
end

def upload_part_opts(options)

def upload_part_opts(options)
  UPLOAD_PART_OPTIONS.inject({}) do |hash, key|
    if options.key?(key)
      # don't pass through checksum calculations
      hash[key] = options[key] unless checksum_key?(key)
    end
    hash
  end
end

def upload_parts(upload_id, source, options)

def upload_parts(upload_id, source, options)
  pending = PartList.new(compute_parts(upload_id, source, options))
  completed = PartList.new
  errors = upload_in_threads(pending, completed, options)
  if errors.empty?
    completed.to_a.sort_by { |part| part[:part_number] }
  else
    abort_upload(upload_id, options, errors)
  end
end