class Aws::Sigv4::Signer


and ‘#session_token`.
returning another object that responds to `#access_key_id`, `#secret_access_key`,
A credential provider is any object that responds to `#credentials`
* `Aws::ECSCredentials`
* `Aws::AssumeRoleCredentials`
* `Aws::InstanceProfileCredentials`
* `Aws::SharedCredentials`
* `Aws::Credentials`
Other AWS SDK for Ruby classes that can be provided via `:credentials_provider`:
)
credentials_provider: Aws::InstanceProfileCredentials.new
region: ’us-east-1’,
service: ‘s3’,
signer = Aws::Sigv4::Signer.new(
classes:
If you are using the AWS SDK for Ruby, you can use any of the credential
You can also provide refreshing credentials via the ‘:credentials_provider`.
)
secret_access_key: ’secret’
access_key_id: ‘akid’,
# static credentials
region: ‘us-east-1’,
service: ‘s3’,
signer = Aws::Sigv4::Signer.new(
with static credentials:
The signer requires credentials. You can configure the signer
## Credentials
signature will be invalid.
It is important to have the correct service and region name, or the
ec2.us-west-1.amazonaws.com => us-west-1
the service name.
The region is normally the second portion of the endpoint, following
ec2.us-west-1.amazonaws.com => ec2
example:
The service name is normally the endpoint prefix to an AWS service. For
To use the signer, you need to specify the service, region, and credentials.
## Configuration
expires in 15 minutes.
By default, the body of this request is not signed and the request
* {#presign_url} - Computes a presigned request with an expiration.
the hash of headers that should be applied to the request.
* {#sign_request} - Computes a signature of the given request, returning
provides two methods for generating signatures:
Utility class for creating AWS signature version 4 signature. This class

def asymmetric_signature(creds, string_to_sign)

def asymmetric_signature(creds, string_to_sign)
  ec, _ = Aws::Sigv4::AsymmetricCredentials.derive_asymmetric_key(
    creds.access_key_id, creds.secret_access_key
  )
  sts_digest = OpenSSL::Digest::SHA256.digest(string_to_sign)
  s = ec.dsa_sign_asn1(sts_digest)
  Digest.hexencode(s)
end

def canonical_header_value(value)

def canonical_header_value(value)
  value.gsub(/\s+/, ' ').strip
end

def canonical_headers(headers)

def canonical_headers(headers)
  headers = headers.inject([]) do |hdrs, (k,v)|
    if @unsigned_headers.include?(k)
      hdrs
    else
      hdrs << [k,v]
    end
  end
  headers = headers.sort_by(&:first)
  headers.map{|k,v| "#{k}:#{canonical_header_value(v.to_s)}" }.join("\n")
end

def canonical_request(http_method, url, headers, content_sha256)

def canonical_request(http_method, url, headers, content_sha256)
  [
    http_method,
    path(url),
    normalized_querystring(url.query || ''),
    canonical_headers(headers) + "\n",
    signed_headers(headers),
    content_sha256,
  ].join("\n")
end

def credential(credentials, date)

def credential(credentials, date)
  "#{credentials.access_key_id}/#{credential_scope(date)}"
end

def credential_scope(date)

def credential_scope(date)
  [
    date,
    (@region unless @signing_algorithm == :sigv4a),
    @service,
    'aws4_request'
  ].compact.join('/')
end

def credentials_set?(credentials)

(eg those returned by sts#assume_role)
and may just be credential like Client response objects
Credentials may not implement the Credentials interface
Returns true if credentials are set (not nil or empty)
def credentials_set?(credentials)
  !credentials.access_key_id.nil? &&
    !credentials.access_key_id.empty? &&
    !credentials.secret_access_key.nil? &&
    !credentials.secret_access_key.empty?
end

def downcase_headers(headers)

def downcase_headers(headers)
  (headers || {}).to_hash.inject({}) do |hash, (key, value)|
    hash[key.downcase] = value
    hash
  end
end

def event_signature(secret_access_key, date, string_to_sign)

as next prior signature for event signing)
string is handled at #sign_event instead. (Will be used
converting signature from binary string to hex-encoded
Note:

'bytes' type)
hex-encoded string. (Since ':chunk-signature' requires
returned signature is a binary string instread of
Comparing to original signature v4 algorithm,
def event_signature(secret_access_key, date, string_to_sign)
  k_date = hmac("AWS4" + secret_access_key, date)
  k_region = hmac(k_date, @region)
  k_service = hmac(k_region, @service)
  k_credentials = hmac(k_service, 'aws4_request')
  hmac(k_credentials, string_to_sign)
end

def event_string_to_sign(datetime, headers, payload, prior_signature, encoder)

thus no extra encoding is needed.
payload used is already eventstream encoded (event without signature),
While headers need to be encoded under eventstream format,
Note:

they will be used for computing digest in #event_string_to_sign
instead, an event contains headers and payload two parts, and
there is no canonical_request concept for an eventstream event,
Compared to original #string_to_sign at signature v4 algorithm
def event_string_to_sign(datetime, headers, payload, prior_signature, encoder)
  encoded_headers = encoder.encode_headers(
    Aws::EventStream::Message.new(headers: headers, payload: payload)
  )
  [
    "AWS4-HMAC-SHA256-PAYLOAD",
    datetime,
    credential_scope(datetime[0,8]),
    prior_signature,
    sha256_hexdigest(encoded_headers),
    sha256_hexdigest(payload)
  ].join("\n")
end

def extract_credentials_provider(options)

def extract_credentials_provider(options)
  if options[:credentials_provider]
    options[:credentials_provider]
  elsif options.key?(:credentials) || options.key?(:access_key_id)
    StaticCredentialsProvider.new(options)
  else
    raise Errors::MissingCredentialsError
  end
end

def extract_expires_in(options)

def extract_expires_in(options)
  case options[:expires_in]
  when nil then 900
  when Integer then options[:expires_in]
  else
    msg = "expected :expires_in to be a number of seconds"
    raise ArgumentError, msg
  end
end

def extract_http_method(request)

def extract_http_method(request)
  if request[:http_method]
    request[:http_method].upcase
  else
    msg = "missing required option :http_method"
    raise ArgumentError, msg
  end
end

def extract_region(options)

def extract_region(options)
  if options[:region]
    options[:region]
  else
    raise Errors::MissingRegionError
  end
end

def extract_service(options)

def extract_service(options)
  if options[:service]
    options[:service]
  else
    msg = "missing required option :service"
    raise ArgumentError, msg
  end
end

def extract_url(request)

def extract_url(request)
  if request[:url]
    URI.parse(request[:url].to_s)
  else
    msg = "missing required option :url"
    raise ArgumentError, msg
  end
end

def fetch_credentials

def fetch_credentials
  credentials = @credentials_provider.credentials
  if credentials_set?(credentials)
    expiration = nil
    if @credentials_provider.respond_to?(:expiration)
      expiration = @credentials_provider.expiration
    end
    [credentials, expiration]
  else
    raise Errors::MissingCredentialsError,
    'unable to sign request without credentials set'
  end
end

def hexhmac(key, value)

def hexhmac(key, value)
  OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha256'), key, value)
end

def hmac(key, value)

def hmac(key, value)
  OpenSSL::HMAC.digest(OpenSSL::Digest.new('sha256'), key, value)
end

def host(uri)

def host(uri)
  # Handles known and unknown URI schemes; default_port nil when unknown.
  if uri.default_port == uri.port
    uri.host
  else
    "#{uri.host}:#{uri.port}"
  end
end

def initialize(options = {})

Options Hash: (**options)
  • :normalize_path (Boolean) -- When `true`, the
  • :omit_session_token (Boolean) --
  • :signing_algorithm (Symbol) -- The
  • :apply_checksum_header (Boolean) -- When `true`,
  • :uri_escape_path (Boolean) -- When `true`,
  • :unsigned_headers (Array) -- A list of

Parameters:
  • :credentials_provider (#credentials) -- An object that responds
  • :region (String) -- The region name, e.g. 'us-east-1'. When signing
  • :service (String) -- The service signing name, e.g. 's3'.
  • :credentials (Credentials) -- Any object that responds to the following
  • :region (String) -- The region name, e.g. 'us-east-1'. When signing
  • :service (String) -- The service signing name, e.g. 's3'.
  • :session_token (String) -- (nil)
  • :secret_access_key (String) --
  • :access_key_id (String) --
  • :region (String) -- The region name, e.g. 'us-east-1'. When signing
  • :service (String) -- The service signing name, e.g. 's3'.

Overloads:
  • initialize(service:, region:, credentials_provider:, **options)
  • initialize(service:, region:, credentials:, **options)
  • initialize(service:, region:, access_key_id:, secret_access_key:, session_token:nil, **options)
def initialize(options = {})
  @service = extract_service(options)
  @region = extract_region(options)
  @credentials_provider = extract_credentials_provider(options)
  @unsigned_headers = Set.new((options.fetch(:unsigned_headers, [])).map(&:downcase))
  @unsigned_headers << 'authorization'
  @unsigned_headers << 'x-amzn-trace-id'
  @unsigned_headers << 'expect'
  @uri_escape_path = options.fetch(:uri_escape_path, true)
  @apply_checksum_header = options.fetch(:apply_checksum_header, true)
  @signing_algorithm = options.fetch(:signing_algorithm, :sigv4)
  @normalize_path = options.fetch(:normalize_path, true)
  @omit_session_token = options.fetch(:omit_session_token, false)
end

def normalize_path(uri)

Other tags:
    Api: - private
def normalize_path(uri)
  normalized_path = Pathname.new(uri.path).cleanpath.to_s
  # Pathname is probably not correct to use. Empty paths will
  # resolve to "." and should be disregarded
  normalized_path = '' if normalized_path == '.'
  # Ensure trailing slashes are correctly preserved
  if uri.path.end_with?('/') && !normalized_path.end_with?('/')
    normalized_path << '/'
  end
  uri.path = normalized_path
end

def normalized_querystring(querystring)

def normalized_querystring(querystring)
  params = querystring.split('&')
  params = params.map { |p| p.match(/=/) ? p : p + '=' }
  # From: https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
  # Sort the parameter names by character code point in ascending order.
  # Parameters with duplicate names should be sorted by value.
  #
  # Default sort <=> in JRuby will swap members
  # occasionally when <=> is 0 (considered still sorted), but this
  # causes our normalized query string to not match the sent querystring.
  # When names match, we then sort by their values.  When values also
  # match then we sort by their original order
  params.each.with_index.sort do |a, b|
    a, a_offset = a
    b, b_offset = b
    a_name, a_value = a.split('=')
    b_name, b_value = b.split('=')
    if a_name == b_name
      if a_value == b_value
        a_offset <=> b_offset
      else
        a_value <=> b_value
      end
    else
      a_name <=> b_name
    end
  end.map(&:first).join('&')
end

def path(url)

def path(url)
  path = url.path
  path = '/' if path == ''
  if @uri_escape_path
    uri_escape_path(path)
  else
    path
  end
end

def presign_url(options)

Returns:
  • (HTTPS::URI, HTTP::URI) -

Options Hash: (**options)
  • :time (Time) -- Time of the signature.
  • :body_digest (optional, String) --
  • :body (optional, String, IO) --
  • :expires_in (Integer) --
  • :headers (Hash) -- Headers that should
  • :url (required, String, URI::HTTP, URI::HTTPS) --
  • :http_method (required, String) -- The HTTP request method,
def presign_url(options)
  creds, expiration = fetch_credentials
  http_method = extract_http_method(options)
  url = extract_url(options)
  Signer.normalize_path(url) if @normalize_path
  headers = downcase_headers(options[:headers])
  headers['host'] ||= host(url)
  datetime = headers['x-amz-date']
  datetime ||= (options[:time] || Time.now).utc.strftime("%Y%m%dT%H%M%SZ")
  date = datetime[0,8]
  content_sha256 = headers['x-amz-content-sha256']
  content_sha256 ||= options[:body_digest]
  content_sha256 ||= sha256_hexdigest(options[:body] || '')
  algorithm = sts_algorithm
  params = {}
  params['X-Amz-Algorithm'] = algorithm
  params['X-Amz-Credential'] = credential(creds, date)
  params['X-Amz-Date'] = datetime
  params['X-Amz-Expires'] = presigned_url_expiration(options, expiration, Time.strptime(datetime, "%Y%m%dT%H%M%S%Z")).to_s
  if creds.session_token
    if @signing_algorithm == 'sigv4-s3express'.to_sym
      params['X-Amz-S3session-Token'] = creds.session_token
    else
      params['X-Amz-Security-Token'] = creds.session_token
    end
  end
  params['X-Amz-SignedHeaders'] = signed_headers(headers)
  if @signing_algorithm == :sigv4a && @region
    params['X-Amz-Region-Set'] = @region
  end
  params = params.map do |key, value|
    "#{uri_escape(key)}=#{uri_escape(value)}"
  end.join('&')
  if url.query
    url.query += '&' + params
  else
    url.query = params
  end
  creq = canonical_request(http_method, url, headers, content_sha256)
  sts = string_to_sign(datetime, creq, algorithm)
  signature =
    if @signing_algorithm == :sigv4a
      asymmetric_signature(creds, sts)
    else
      signature(creds.secret_access_key, date, sts)
    end
  url.query += '&X-Amz-Signature=' + signature
  url
end

def presigned_url_expiration(options, expiration, datetime)

def presigned_url_expiration(options, expiration, datetime)
  expires_in = extract_expires_in(options)
  return expires_in unless expiration
  expiration_seconds = (expiration - datetime).to_i
  # In the static stability case, credentials may expire in the past
  # but still be valid.  For those cases, use the user configured
  # expires_in and ingore expiration.
  if expiration_seconds <= 0
    expires_in
  else
    [expires_in, expiration_seconds].min
  end
end

def sha256_hexdigest(value)

Returns:
  • (String) -

Parameters:
  • value (File, Tempfile, IO#read, String) --
def sha256_hexdigest(value)
  if (File === value || Tempfile === value) && !value.path.nil? && File.exist?(value.path)
    OpenSSL::Digest::SHA256.file(value).hexdigest
  elsif value.respond_to?(:read)
    sha256 = OpenSSL::Digest::SHA256.new
    loop do
      chunk = value.read(1024 * 1024) # 1MB
      break unless chunk
      sha256.update(chunk)
    end
    value.rewind
    sha256.hexdigest
  else
    OpenSSL::Digest::SHA256.hexdigest(value)
  end
end

def sign_event(prior_signature, payload, encoder)

hex-encoded string using #unpack
signature value (a binary string) used at ':chunk-signature' needs to converted to
V4 algorithm). Thus, when returning signature value used for next event siging, the
needs to be a binary string instead of a hex-encoded string (like original signature
Since ':chunk-signature' header value has bytes type, the signature value provided

Note:

The initial prior_signature should be using the signature computed at initial request

)
encoder
payload_1, # binary string (eventstream encoded event 1)
signature_0,
headers_1, signature_1 = signer.sign_event(

)
encoder, # Aws::EventStreamEncoder
payload_0, # binary string (eventstream encoded event 0)
prior_signature, # hex-encoded string
headers_0, signature_0 = signer.sign_event(

To sign events

which is serialized based on input and protocol
Payload of the sigv4 signed event message contains eventstream encoded message

* millisecond since epoch, 'timestamp' type
* ':date'
* computed signature of the event, binary string, 'bytes' type
* ':chunk-signature'
Headers of a sigv4 signed event message only contains 2 headers

used for next event signing.
Signs a event and returns signature headers and prior signature
def sign_event(prior_signature, payload, encoder)
  creds, _ = fetch_credentials
  time = Time.now
  headers = {}
  datetime = time.utc.strftime("%Y%m%dT%H%M%SZ")
  date = datetime[0,8]
  headers[':date'] = Aws::EventStream::HeaderValue.new(value: time.to_i * 1000, type: 'timestamp')
  sts = event_string_to_sign(datetime, headers, payload, prior_signature, encoder)
  sig = event_signature(creds.secret_access_key, date, sts)
  headers[':chunk-signature'] = Aws::EventStream::HeaderValue.new(value: sig, type: 'bytes')
  # Returning signed headers and signature value in hex-encoded string
  [headers, sig.unpack('H*').first]
end

def sign_request(request)

Returns:
  • (Signature) - Return an instance of {Signature} that has

Options Hash: (**request)
  • :body (optional, String, IO) -- The HTTP request body.
  • :headers (optional, Hash) -- A hash of headers
  • :url (required, String, URI::HTTP, URI::HTTPS) --
  • :http_method (required, String) -- One of

Parameters:
  • request (Hash) --
def sign_request(request)
  creds, _ = fetch_credentials
  http_method = extract_http_method(request)
  url = extract_url(request)
  Signer.normalize_path(url) if @normalize_path
  headers = downcase_headers(request[:headers])
  datetime = headers['x-amz-date']
  datetime ||= Time.now.utc.strftime("%Y%m%dT%H%M%SZ")
  date = datetime[0,8]
  content_sha256 = headers['x-amz-content-sha256']
  content_sha256 ||= sha256_hexdigest(request[:body] || '')
  sigv4_headers = {}
  sigv4_headers['host'] = headers['host'] || host(url)
  sigv4_headers['x-amz-date'] = datetime
  if creds.session_token && !@omit_session_token
    if @signing_algorithm == 'sigv4-s3express'.to_sym
      sigv4_headers['x-amz-s3session-token'] = creds.session_token
    else
      sigv4_headers['x-amz-security-token'] = creds.session_token
    end
  end
  sigv4_headers['x-amz-content-sha256'] ||= content_sha256 if @apply_checksum_header
  if @signing_algorithm == :sigv4a && @region && !@region.empty?
    sigv4_headers['x-amz-region-set'] = @region
  end
  headers = headers.merge(sigv4_headers) # merge so we do not modify given headers hash
  algorithm = sts_algorithm
  # compute signature parts
  creq = canonical_request(http_method, url, headers, content_sha256)
  sts = string_to_sign(datetime, creq, algorithm)
  sig =
    if @signing_algorithm == :sigv4a
      asymmetric_signature(creds, sts)
    else
      signature(creds.secret_access_key, date, sts)
    end
  algorithm = sts_algorithm
  # apply signature
  sigv4_headers['authorization'] = [
    "#{algorithm} Credential=#{credential(creds, date)}",
    "SignedHeaders=#{signed_headers(headers)}",
    "Signature=#{sig}",
  ].join(', ')
  # skip signing the session token, but include it in the headers
  if creds.session_token && @omit_session_token
    sigv4_headers['x-amz-security-token'] = creds.session_token
  end
  # Returning the signature components.
  Signature.new(
    headers: sigv4_headers,
    string_to_sign: sts,
    canonical_request: creq,
    content_sha256: content_sha256,
    signature: sig
  )
end

def signature(secret_access_key, date, string_to_sign)

def signature(secret_access_key, date, string_to_sign)
  k_date = hmac("AWS4" + secret_access_key, date)
  k_region = hmac(k_date, @region)
  k_service = hmac(k_region, @service)
  k_credentials = hmac(k_service, 'aws4_request')
  hexhmac(k_credentials, string_to_sign)
end

def signed_headers(headers)

def signed_headers(headers)
  headers.inject([]) do |signed_headers, (header, _)|
    if @unsigned_headers.include?(header)
      signed_headers
    else
      signed_headers << header
    end
  end.sort.join(';')
end

def string_to_sign(datetime, canonical_request, algorithm)

def string_to_sign(datetime, canonical_request, algorithm)
  [
    algorithm,
    datetime,
    credential_scope(datetime[0,8]),
    sha256_hexdigest(canonical_request),
  ].join("\n")
end

def sts_algorithm

def sts_algorithm
  @signing_algorithm == :sigv4a ? 'AWS4-ECDSA-P256-SHA256' : 'AWS4-HMAC-SHA256'
end

def uri_escape(string)

def uri_escape(string)
  self.class.uri_escape(string)
end

def uri_escape(string)

Other tags:
    Api: - private
def uri_escape(string)
  if string.nil?
    nil
  else
    CGI.escape(string.encode('UTF-8')).gsub('+', '%20').gsub('%7E', '~')
  end
end

def uri_escape_path(string)

def uri_escape_path(string)
  self.class.uri_escape_path(string)
end

def uri_escape_path(path)

Other tags:
    Api: - private
def uri_escape_path(path)
  path.gsub(/[^\/]+/) { |part| uri_escape(part) }
end

def use_crt?

Always return false since we are not using crt signing functionality
Kept for backwards compatability
def use_crt?
  false
end