class Aws::S3::MultipartFileUploader

@api private

def abort_upload(upload_id, options, errors)

def abort_upload(upload_id, options, errors)
  @client.abort_multipart_upload(
    bucket: options[:bucket],
    key: options[:key],
    upload_id: upload_id
  )
  msg = "multipart upload failed: #{errors.map(&:message).join("; ")}"
  raise MultipartUploadError.new(msg, errors)
rescue MultipartUploadError => error
  raise error
rescue => error
  msg = "failed to abort multipart upload: #{error.message}"
  raise MultipartUploadError.new(msg, errors + [error])
end

def complete_upload(upload_id, parts, options)

def complete_upload(upload_id, parts, options)
  @client.complete_multipart_upload(
    bucket: options[:bucket],
    key: options[:key],
    upload_id: upload_id,
    multipart_upload: { parts: parts }
  )
end

def compute_default_part_size(source_size)

def compute_default_part_size(source_size)
  [(source_size.to_f / MAX_PARTS).ceil, MIN_PART_SIZE].max.to_i
end

def compute_parts(upload_id, source, options)

def compute_parts(upload_id, source, options)
  size = File.size(source)
  default_part_size = compute_default_part_size(size)
  offset = 0
  part_number = 1
  parts = []
  while offset < size
    parts << upload_part_opts(options).merge(
      upload_id: upload_id,
      part_number: part_number,
      body: FilePart.new(
        source: source,
        offset: offset,
        size: part_size(size, default_part_size, offset)
      )
    )
    part_number += 1
    offset += default_part_size
  end
  parts
end

def create_opts(options)

def create_opts(options)
  CREATE_OPTIONS.inject({}) do |hash, key|
    hash[key] = options[key] if options.key?(key)
    hash
  end
end

def initialize(options = {})

Options Hash: (**options)
  • :thread_count (Integer) --
  • :client (Client) --
def initialize(options = {})
  @client = options[:client] || Client.new
  @thread_count = options[:thread_count] || THREAD_COUNT
end

def initiate_upload(options)

def initiate_upload(options)
  @client.create_multipart_upload(create_opts(options)).upload_id
end

def part_size(total_size, part_size, offset)

def part_size(total_size, part_size, offset)
  if offset + part_size > total_size
    total_size - offset
  else
    part_size
  end
end

def upload(source, options = {})

Returns:
  • (void) -

Options Hash: (**options)
  • :key (required, String) -- The key for the object.
  • :bucket (required, String) -- The bucket to upload to.

Parameters:
  • source (String, Pathname, File, Tempfile) -- The file to upload.
def upload(source, options = {})
  if File.size(source) < MIN_PART_SIZE
    raise ArgumentError, FILE_TOO_SMALL
  else
    upload_id = initiate_upload(options)
    parts = upload_parts(upload_id, source, options)
    complete_upload(upload_id, parts, options)
  end
end

def upload_in_threads(pending, completed)

def upload_in_threads(pending, completed)
  threads = []
  @thread_count.times do
    thread = Thread.new do
      begin
        while part = pending.shift
          resp = @client.upload_part(part)
          part[:body].close
          completed.push(etag: resp.etag, part_number: part[:part_number])
        end
        nil
      rescue => error
        # keep other threads from uploading other parts
        pending.clear!
        error
      end
    end
    thread.abort_on_exception = true
    threads << thread
  end
  threads.map(&:value).compact
end

def upload_part_opts(options)

def upload_part_opts(options)
  UPLOAD_PART_OPTIONS.inject({}) do |hash, key|
    hash[key] = options[key] if options.key?(key)
    hash
  end
end

def upload_parts(upload_id, source, options)

def upload_parts(upload_id, source, options)
  pending = PartList.new(compute_parts(upload_id, source, options))
  completed = PartList.new
  errors = upload_in_threads(pending, completed)
  if errors.empty?
    completed.to_a.sort_by { |part| part[:part_number] }
  else
    abort_upload(upload_id, options, errors)
  end
end