class Aws::Sigv4::Signer
and ‘#session_token`.
returning another object that responds to `#access_key_id`, `#secret_access_key`,
A credential provider is any object that responds to `#credentials`
* `Aws::ECSCredentials`
* `Aws::AssumeRoleCredentials`
* `Aws::InstanceProfileCredentials`
* `Aws::SharedCredentials`
* `Aws::Credentials`
Other AWS SDK for Ruby classes that can be provided via `:credentials_provider`:
)
credentials_provider: Aws::InstanceProfileCredentials.new
region: ’us-east-1’,
service: ‘s3’,
signer = Aws::Sigv4::Signer.new(
classes:
If you are using the AWS SDK for Ruby, you can use any of the credential
You can also provide refreshing credentials via the ‘:credentials_provider`.
)
secret_access_key: ’secret’
access_key_id: ‘akid’,
# static credentials
region: ‘us-east-1’,
service: ‘s3’,
signer = Aws::Sigv4::Signer.new(
with static credentials:
The signer requires credentials. You can configure the signer
## Credentials
signature will be invalid.
It is important to have the correct service and region name, or the
ec2.us-west-1.amazonaws.com => us-west-1
the service name.
The region is normally the second portion of the endpoint, following
ec2.us-west-1.amazonaws.com => ec2
example:
The service name is normally the endpoint prefix to an AWS service. For
To use the signer, you need to specify the service, region, and credentials.
## Configuration
expires in 15 minutes.
By default, the body of this request is not signed and the request
* {#presign_url} - Computes a presigned request with an expiration.
the hash of headers that should be applied to the request.
* {#sign_request} - Computes a signature of the given request, returning
provides two methods for generating signatures:
Utility class for creating AWS signature version 4 signature. This class
def asymmetric_signature(creds, string_to_sign)
def asymmetric_signature(creds, string_to_sign) ec, _ = Aws::Sigv4::AsymmetricCredentials.derive_asymmetric_key( creds.access_key_id, creds.secret_access_key ) sts_digest = OpenSSL::Digest::SHA256.digest(string_to_sign) s = ec.dsa_sign_asn1(sts_digest) Digest.hexencode(s) end
def canonical_header_value(value)
def canonical_header_value(value) value.gsub(/\s+/, ' ').strip end
def canonical_headers(headers)
def canonical_headers(headers) headers = headers.inject([]) do |hdrs, (k,v)| if @unsigned_headers.include?(k) hdrs else hdrs << [k,v] end end headers = headers.sort_by(&:first) headers.map{|k,v| "#{k}:#{canonical_header_value(v.to_s)}" }.join("\n") end
def canonical_request(http_method, url, headers, content_sha256)
def canonical_request(http_method, url, headers, content_sha256) [ http_method, path(url), normalized_querystring(url.query || ''), canonical_headers(headers) + "\n", signed_headers(headers), content_sha256, ].join("\n") end
def credential(credentials, date)
def credential(credentials, date) "#{credentials.access_key_id}/#{credential_scope(date)}" end
def credential_scope(date)
def credential_scope(date) [ date, (@region unless @signing_algorithm == :sigv4a), @service, 'aws4_request' ].compact.join('/') end
def credentials_set?(credentials)
and may just be credential like Client response objects
Credentials may not implement the Credentials interface
Returns true if credentials are set (not nil or empty)
def credentials_set?(credentials) !credentials.access_key_id.nil? && !credentials.access_key_id.empty? && !credentials.secret_access_key.nil? && !credentials.secret_access_key.empty? end
def downcase_headers(headers)
def downcase_headers(headers) (headers || {}).to_hash.inject({}) do |hash, (key, value)| hash[key.downcase] = value hash end end
def event_signature(secret_access_key, date, string_to_sign)
string is handled at #sign_event instead. (Will be used
converting signature from binary string to hex-encoded
Note:
'bytes' type)
hex-encoded string. (Since ':chunk-signature' requires
returned signature is a binary string instread of
Comparing to original signature v4 algorithm,
def event_signature(secret_access_key, date, string_to_sign) k_date = hmac("AWS4" + secret_access_key, date) k_region = hmac(k_date, @region) k_service = hmac(k_region, @service) k_credentials = hmac(k_service, 'aws4_request') hmac(k_credentials, string_to_sign) end
def event_string_to_sign(datetime, headers, payload, prior_signature, encoder)
payload used is already eventstream encoded (event without signature),
While headers need to be encoded under eventstream format,
Note:
they will be used for computing digest in #event_string_to_sign
instead, an event contains headers and payload two parts, and
there is no canonical_request concept for an eventstream event,
Compared to original #string_to_sign at signature v4 algorithm
def event_string_to_sign(datetime, headers, payload, prior_signature, encoder) encoded_headers = encoder.encode_headers( Aws::EventStream::Message.new(headers: headers, payload: payload) ) [ "AWS4-HMAC-SHA256-PAYLOAD", datetime, credential_scope(datetime[0,8]), prior_signature, sha256_hexdigest(encoded_headers), sha256_hexdigest(payload) ].join("\n") end
def extract_credentials_provider(options)
def extract_credentials_provider(options) if options[:credentials_provider] options[:credentials_provider] elsif options.key?(:credentials) || options.key?(:access_key_id) StaticCredentialsProvider.new(options) else raise Errors::MissingCredentialsError end end
def extract_expires_in(options)
def extract_expires_in(options) case options[:expires_in] when nil then 900 when Integer then options[:expires_in] else msg = "expected :expires_in to be a number of seconds" raise ArgumentError, msg end end
def extract_http_method(request)
def extract_http_method(request) if request[:http_method] request[:http_method].upcase else msg = "missing required option :http_method" raise ArgumentError, msg end end
def extract_region(options)
def extract_region(options) if options[:region] options[:region] else raise Errors::MissingRegionError end end
def extract_service(options)
def extract_service(options) if options[:service] options[:service] else msg = "missing required option :service" raise ArgumentError, msg end end
def extract_url(request)
def extract_url(request) if request[:url] URI.parse(request[:url].to_s) else msg = "missing required option :url" raise ArgumentError, msg end end
def fetch_credentials
def fetch_credentials credentials = @credentials_provider.credentials if credentials_set?(credentials) expiration = nil if @credentials_provider.respond_to?(:expiration) expiration = @credentials_provider.expiration end [credentials, expiration] else raise Errors::MissingCredentialsError, 'unable to sign request without credentials set' end end
def hexhmac(key, value)
def hexhmac(key, value) OpenSSL::HMAC.hexdigest(OpenSSL::Digest.new('sha256'), key, value) end
def hmac(key, value)
def hmac(key, value) OpenSSL::HMAC.digest(OpenSSL::Digest.new('sha256'), key, value) end
def host(uri)
def host(uri) # Handles known and unknown URI schemes; default_port nil when unknown. if uri.default_port == uri.port uri.host else "#{uri.host}:#{uri.port}" end end
def initialize(options = {})
(**options)
-
:normalize_path
(Boolean
) -- When `true`, the -
:omit_session_token
(Boolean
) -- -
:signing_algorithm
(Symbol
) -- The -
:apply_checksum_header
(Boolean
) -- When `true`, -
:uri_escape_path
(Boolean
) -- When `true`, -
:unsigned_headers
(Array
) -- A list of
Parameters:
-
:credentials_provider
(#credentials
) -- An object that responds -
:region
(String
) -- The region name, e.g. 'us-east-1'. When signing -
:service
(String
) -- The service signing name, e.g. 's3'. -
:credentials
(Credentials
) -- Any object that responds to the following -
:region
(String
) -- The region name, e.g. 'us-east-1'. When signing -
:service
(String
) -- The service signing name, e.g. 's3'. -
:session_token
(String
) -- (nil) -
:secret_access_key
(String
) -- -
:access_key_id
(String
) -- -
:region
(String
) -- The region name, e.g. 'us-east-1'. When signing -
:service
(String
) -- The service signing name, e.g. 's3'.
Overloads:
-
initialize(service:, region:, credentials_provider:, **options)
-
initialize(service:, region:, credentials:, **options)
-
initialize(service:, region:, access_key_id:, secret_access_key:, session_token:nil, **options)
def initialize(options = {}) @service = extract_service(options) @region = extract_region(options) @credentials_provider = extract_credentials_provider(options) @unsigned_headers = Set.new((options.fetch(:unsigned_headers, [])).map(&:downcase)) @unsigned_headers << 'authorization' @unsigned_headers << 'x-amzn-trace-id' @unsigned_headers << 'expect' @uri_escape_path = options.fetch(:uri_escape_path, true) @apply_checksum_header = options.fetch(:apply_checksum_header, true) @signing_algorithm = options.fetch(:signing_algorithm, :sigv4) @normalize_path = options.fetch(:normalize_path, true) @omit_session_token = options.fetch(:omit_session_token, false) end
def normalize_path(uri)
- Api: - private
def normalize_path(uri) normalized_path = Pathname.new(uri.path).cleanpath.to_s # Pathname is probably not correct to use. Empty paths will # resolve to "." and should be disregarded normalized_path = '' if normalized_path == '.' # Ensure trailing slashes are correctly preserved if uri.path.end_with?('/') && !normalized_path.end_with?('/') normalized_path << '/' end uri.path = normalized_path end
def normalized_querystring(querystring)
def normalized_querystring(querystring) params = querystring.split('&') params = params.map { |p| p.match(/=/) ? p : p + '=' } # From: https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html # Sort the parameter names by character code point in ascending order. # Parameters with duplicate names should be sorted by value. # # Default sort <=> in JRuby will swap members # occasionally when <=> is 0 (considered still sorted), but this # causes our normalized query string to not match the sent querystring. # When names match, we then sort by their values. When values also # match then we sort by their original order params.each.with_index.sort do |a, b| a, a_offset = a b, b_offset = b a_name, a_value = a.split('=') b_name, b_value = b.split('=') if a_name == b_name if a_value == b_value a_offset <=> b_offset else a_value <=> b_value end else a_name <=> b_name end end.map(&:first).join('&') end
def path(url)
def path(url) path = url.path path = '/' if path == '' if @uri_escape_path uri_escape_path(path) else path end end
def presign_url(options)
-
(HTTPS::URI, HTTP::URI)
-
Options Hash:
(**options)
-
:time
(Time
) -- Time of the signature. -
:body_digest
(optional, String
) -- -
:body
(optional, String, IO
) -- -
:expires_in
(Integer
) -- -
:headers
(Hash
) -- Headers that should -
:url
(required, String, URI::HTTP, URI::HTTPS
) -- -
:http_method
(required, String
) -- The HTTP request method,
def presign_url(options) creds, expiration = fetch_credentials http_method = extract_http_method(options) url = extract_url(options) Signer.normalize_path(url) if @normalize_path headers = downcase_headers(options[:headers]) headers['host'] ||= host(url) datetime = headers['x-amz-date'] datetime ||= (options[:time] || Time.now).utc.strftime("%Y%m%dT%H%M%SZ") date = datetime[0,8] content_sha256 = headers['x-amz-content-sha256'] content_sha256 ||= options[:body_digest] content_sha256 ||= sha256_hexdigest(options[:body] || '') algorithm = sts_algorithm params = {} params['X-Amz-Algorithm'] = algorithm params['X-Amz-Credential'] = credential(creds, date) params['X-Amz-Date'] = datetime params['X-Amz-Expires'] = presigned_url_expiration(options, expiration, Time.strptime(datetime, "%Y%m%dT%H%M%S%Z")).to_s if creds.session_token if @signing_algorithm == 'sigv4-s3express'.to_sym params['X-Amz-S3session-Token'] = creds.session_token else params['X-Amz-Security-Token'] = creds.session_token end end params['X-Amz-SignedHeaders'] = signed_headers(headers) if @signing_algorithm == :sigv4a && @region params['X-Amz-Region-Set'] = @region end params = params.map do |key, value| "#{uri_escape(key)}=#{uri_escape(value)}" end.join('&') if url.query url.query += '&' + params else url.query = params end creq = canonical_request(http_method, url, headers, content_sha256) sts = string_to_sign(datetime, creq, algorithm) signature = if @signing_algorithm == :sigv4a asymmetric_signature(creds, sts) else signature(creds.secret_access_key, date, sts) end url.query += '&X-Amz-Signature=' + signature url end
def presigned_url_expiration(options, expiration, datetime)
def presigned_url_expiration(options, expiration, datetime) expires_in = extract_expires_in(options) return expires_in unless expiration expiration_seconds = (expiration - datetime).to_i # In the static stability case, credentials may expire in the past # but still be valid. For those cases, use the user configured # expires_in and ingore expiration. if expiration_seconds <= 0 expires_in else [expires_in, expiration_seconds].min end end
def sha256_hexdigest(value)
-
(String
-)
Parameters:
-
value
(File, Tempfile, IO#read, String
) --
def sha256_hexdigest(value) if (File === value || Tempfile === value) && !value.path.nil? && File.exist?(value.path) OpenSSL::Digest::SHA256.file(value).hexdigest elsif value.respond_to?(:read) sha256 = OpenSSL::Digest::SHA256.new loop do chunk = value.read(1024 * 1024) # 1MB break unless chunk sha256.update(chunk) end value.rewind sha256.hexdigest else OpenSSL::Digest::SHA256.hexdigest(value) end end
def sign_event(prior_signature, payload, encoder)
signature value (a binary string) used at ':chunk-signature' needs to converted to
V4 algorithm). Thus, when returning signature value used for next event siging, the
needs to be a binary string instead of a hex-encoded string (like original signature
Since ':chunk-signature' header value has bytes type, the signature value provided
Note:
The initial prior_signature should be using the signature computed at initial request
)
encoder
payload_1, # binary string (eventstream encoded event 1)
signature_0,
headers_1, signature_1 = signer.sign_event(
)
encoder, # Aws::EventStreamEncoder
payload_0, # binary string (eventstream encoded event 0)
prior_signature, # hex-encoded string
headers_0, signature_0 = signer.sign_event(
To sign events
which is serialized based on input and protocol
Payload of the sigv4 signed event message contains eventstream encoded message
* millisecond since epoch, 'timestamp' type
* ':date'
* computed signature of the event, binary string, 'bytes' type
* ':chunk-signature'
Headers of a sigv4 signed event message only contains 2 headers
used for next event signing.
Signs a event and returns signature headers and prior signature
def sign_event(prior_signature, payload, encoder) creds, _ = fetch_credentials time = Time.now headers = {} datetime = time.utc.strftime("%Y%m%dT%H%M%SZ") date = datetime[0,8] headers[':date'] = Aws::EventStream::HeaderValue.new(value: time.to_i * 1000, type: 'timestamp') sts = event_string_to_sign(datetime, headers, payload, prior_signature, encoder) sig = event_signature(creds.secret_access_key, date, sts) headers[':chunk-signature'] = Aws::EventStream::HeaderValue.new(value: sig, type: 'bytes') # Returning signed headers and signature value in hex-encoded string [headers, sig.unpack('H*').first] end
def sign_request(request)
-
(Signature)
- Return an instance of {Signature} that has
Options Hash:
(**request)
-
:body
(optional, String, IO
) -- The HTTP request body. -
:headers
(optional, Hash
) -- A hash of headers -
:url
(required, String, URI::HTTP, URI::HTTPS
) -- -
:http_method
(required, String
) -- One of
Parameters:
-
request
(Hash
) --
def sign_request(request) creds, _ = fetch_credentials http_method = extract_http_method(request) url = extract_url(request) Signer.normalize_path(url) if @normalize_path headers = downcase_headers(request[:headers]) datetime = headers['x-amz-date'] datetime ||= Time.now.utc.strftime("%Y%m%dT%H%M%SZ") date = datetime[0,8] content_sha256 = headers['x-amz-content-sha256'] content_sha256 ||= sha256_hexdigest(request[:body] || '') sigv4_headers = {} sigv4_headers['host'] = headers['host'] || host(url) sigv4_headers['x-amz-date'] = datetime if creds.session_token && !@omit_session_token if @signing_algorithm == 'sigv4-s3express'.to_sym sigv4_headers['x-amz-s3session-token'] = creds.session_token else sigv4_headers['x-amz-security-token'] = creds.session_token end end sigv4_headers['x-amz-content-sha256'] ||= content_sha256 if @apply_checksum_header if @signing_algorithm == :sigv4a && @region && !@region.empty? sigv4_headers['x-amz-region-set'] = @region end headers = headers.merge(sigv4_headers) # merge so we do not modify given headers hash algorithm = sts_algorithm # compute signature parts creq = canonical_request(http_method, url, headers, content_sha256) sts = string_to_sign(datetime, creq, algorithm) sig = if @signing_algorithm == :sigv4a asymmetric_signature(creds, sts) else signature(creds.secret_access_key, date, sts) end algorithm = sts_algorithm # apply signature sigv4_headers['authorization'] = [ "#{algorithm} Credential=#{credential(creds, date)}", "SignedHeaders=#{signed_headers(headers)}", "Signature=#{sig}", ].join(', ') # skip signing the session token, but include it in the headers if creds.session_token && @omit_session_token sigv4_headers['x-amz-security-token'] = creds.session_token end # Returning the signature components. Signature.new( headers: sigv4_headers, string_to_sign: sts, canonical_request: creq, content_sha256: content_sha256, signature: sig ) end
def signature(secret_access_key, date, string_to_sign)
def signature(secret_access_key, date, string_to_sign) k_date = hmac("AWS4" + secret_access_key, date) k_region = hmac(k_date, @region) k_service = hmac(k_region, @service) k_credentials = hmac(k_service, 'aws4_request') hexhmac(k_credentials, string_to_sign) end
def signed_headers(headers)
def signed_headers(headers) headers.inject([]) do |signed_headers, (header, _)| if @unsigned_headers.include?(header) signed_headers else signed_headers << header end end.sort.join(';') end
def string_to_sign(datetime, canonical_request, algorithm)
def string_to_sign(datetime, canonical_request, algorithm) [ algorithm, datetime, credential_scope(datetime[0,8]), sha256_hexdigest(canonical_request), ].join("\n") end
def sts_algorithm
def sts_algorithm @signing_algorithm == :sigv4a ? 'AWS4-ECDSA-P256-SHA256' : 'AWS4-HMAC-SHA256' end
def uri_escape(string)
def uri_escape(string) self.class.uri_escape(string) end
def uri_escape(string)
- Api: - private
def uri_escape(string) if string.nil? nil else CGI.escape(string.encode('UTF-8')).gsub('+', '%20').gsub('%7E', '~') end end
def uri_escape_path(string)
def uri_escape_path(string) self.class.uri_escape_path(string) end
def uri_escape_path(path)
- Api: - private
def uri_escape_path(path) path.gsub(/[^\/]+/) { |part| uri_escape(part) } end
def use_crt?
Kept for backwards compatability
def use_crt? false end