class Aws::EndpointProvider
@api private
def default_endpoint(partition, service, region)
def default_endpoint(partition, service, region) hostname_template = partition["defaults"]["hostname"] hostname_template. sub('{region}', region). sub('{service}', service). sub('{dnsSuffix}', partition["dnsSuffix"]) end
def default_partition
def default_partition @rules['partitions'].find { |p| p["partition"] == "aws" } || @rules['partitions'].first end
def default_provider
def default_provider @default_provider ||= EndpointProvider.new(Partitions.defaults) end
def dns_suffix_for(region)
def dns_suffix_for(region) partition = get_partition(region) partition['dnsSuffix'] end
def dns_suffix_for(region)
def dns_suffix_for(region) default_provider.dns_suffix_for(region) end
def endpoint_for(region, service)
def endpoint_for(region, service) partition = get_partition(region) endpoint = default_endpoint(partition, service, region) service_cfg = partition.fetch("services", {}).fetch(service, {}) # Check for service-level default endpoint. endpoint = service_cfg.fetch("defaults", {}).fetch("hostname", endpoint) # Check for global endpoint. if service_cfg["isRegionalized"] == false region = service_cfg.fetch("partitionEndpoint", region) end # Check for service/region level endpoint. endpoint = service_cfg.fetch("endpoints", {}). fetch(region, {}).fetch("hostname", endpoint) endpoint end
def get_partition(region)
def get_partition(region) partition_containing_region(region) || partition_matching_region(region) || default_partition end
def initialize(rules)
def initialize(rules) @rules = rules end
def partition_containing_region(region)
def partition_containing_region(region) @rules['partitions'].find do |p| p['regions'].key?(region) end end
def partition_matching_region(region)
def partition_matching_region(region) @rules['partitions'].find do |p| region.match(p["regionRegex"]) || p['services'].values.find { |svc| svc['endpoints'].key?(region) if svc.key? 'endpoints'} end end
def resolve(region, service)
def resolve(region, service) "https://" + endpoint_for(region, service) end
def resolve(region, service)
def resolve(region, service) default_provider.resolve(region, service) end
def signing_region(region, service)
def signing_region(region, service) get_partition(region). fetch("services", {}). fetch(service, {}). fetch("endpoints", {}). fetch(region, {}). fetch("credentialScope", {}). fetch("region", region) end
def signing_region(region, service)
def signing_region(region, service) default_provider.signing_region(region, service) end