class AwsS3Bucket
author: Matthew Dromazos
def bucket_acl
def bucket_acl catch_aws_errors do @bucket_acl ||= BackendFactory.create(inspec_runner).get_bucket_acl(bucket: bucket_name).grants end end
def bucket_policy
def bucket_policy @bucket_policy ||= fetch_bucket_policy end
def fetch_bucket_encryption_configuration
def fetch_bucket_encryption_configuration @has_default_encryption_enabled ||= catch_aws_errors do begin !BackendFactory.create(inspec_runner) .get_bucket_encryption(bucket: bucket_name) .server_side_encryption_configuration .nil? rescue Aws::S3::Errors::ServerSideEncryptionConfigurationNotFoundError false end end end
def fetch_bucket_policy
def fetch_bucket_policy backend = BackendFactory.create(inspec_runner) catch_aws_errors do begin # AWS SDK returns a StringIO, we have to read() raw_policy = backend.get_bucket_policy(bucket: bucket_name).policy return JSON.parse(raw_policy.read)['Statement'].map do |statement| lowercase_hash = {} statement.each_key { |k| lowercase_hash[k.downcase] = statement[k] } @bucket_policy = OpenStruct.new(lowercase_hash) end rescue Aws::S3::Errors::NoSuchBucketPolicy @bucket_policy = [] end end end
def fetch_from_api
def fetch_from_api backend = BackendFactory.create(inspec_runner) # Since there is no basic "get_bucket" API call, use the # region fetch as the existence check. begin @region = backend.get_bucket_location(bucket: bucket_name).location_constraint rescue Aws::S3::Errors::NoSuchBucket @exists = false return end @exists = true end
def has_access_logging_enabled?
def has_access_logging_enabled? return false unless @exists catch_aws_errors do @has_access_logging_enabled ||= !BackendFactory.create(inspec_runner).get_bucket_logging(bucket: bucket_name).logging_enabled.nil? end end
def has_default_encryption_enabled?
def has_default_encryption_enabled? return false unless @exists @has_default_encryption_enabled ||= fetch_bucket_encryption_configuration end
def public?
def public? # first line just for formatting false || \ bucket_acl.any? { |g| g.grantee.type == 'Group' && g.grantee.uri =~ /AllUsers/ } || \ bucket_acl.any? { |g| g.grantee.type == 'Group' && g.grantee.uri =~ /AuthenticatedUsers/ } || \ bucket_policy.any? { |s| s.effect == 'Allow' && s.principal == '*' } end
def to_s
def to_s "S3 Bucket #{@bucket_name}" end
def validate_params(raw_params)
def validate_params(raw_params) validated_params = check_resource_param_names( raw_params: raw_params, allowed_params: [:bucket_name], allowed_scalar_name: :bucket_name, allowed_scalar_type: String, ) if validated_params.empty? or !validated_params.key?(:bucket_name) raise ArgumentError, 'You must provide a bucket_name to aws_s3_bucket.' end validated_params end