class InspecPlugins::Compliance::API
everything will be stored in local Configuration store
API Implementation does not hold any state by itself,
def self.authenticate_login_using_version_api(url, api_token, insecure)
def self.authenticate_login_using_version_api(url, api_token, insecure) uri = URI.parse("#{url}/version") req = Net::HTTP::Get.new(uri.path) req["api-token"] = api_token response = InspecPlugins::Compliance::HTTP.send_request(uri, req, insecure) if response.code == "200" msg = "Successfully Logged In" success = true else success = false msg = "Failed to authenticate to #{url} \n\Response code: #{response.code}\nBody: #{response.body}" end [success, msg] end
def self.exist?(config, profile)
def self.exist?(config, profile) _msg, profiles = InspecPlugins::Compliance::API.profiles(config, profile) !profiles.empty? end
def self.get_headers(config)
def self.get_headers(config) token = get_token(config) headers = { "chef-delivery-enterprise" => config["automate"]["ent"] } if config["automate"]["token_type"] == "dctoken" headers["x-data-collector-token"] = token else headers["chef-delivery-user"] = config["user"] headers["chef-delivery-token"] = token end headers end
def self.get_token(config)
def self.get_token(config) return config["token"] unless config["refresh_token"] _success, _msg, token = get_token_via_refresh_token(config["server"], config["refresh_token"], config["insecure"]) token end
def self.get_token_via_password(url, username, password, insecure)
def self.get_token_via_password(url, username, password, insecure) uri = URI.parse("#{url}/login") req = Net::HTTP::Post.new(uri.path) req.body = { userid: username, password: password }.to_json access_token = nil response = InspecPlugins::Compliance::HTTP.send_request(uri, req, insecure) data = response.body if response.code == "200" access_token = data msg = "Successfully fetched an API access token valid for 12 hours" success = true else success = false msg = "Failed to authenticate to #{url} \n\ sponse code: #{response.code}\n Body: #{response.body}" end [success, msg, access_token] end
def self.get_token_via_refresh_token(url, refresh_token, insecure)
def self.get_token_via_refresh_token(url, refresh_token, insecure) uri = URI.parse("#{url}/login") req = Net::HTTP::Post.new(uri.path) req.body = { token: refresh_token }.to_json access_token = nil response = InspecPlugins::Compliance::HTTP.send_request(uri, req, insecure) data = response.body if response.code == "200" begin tokendata = JSON.parse(data) access_token = tokendata["access_token"] msg = "Successfully fetched API access token" success = true rescue JSON::ParserError => e success = false msg = e.message end else success = false msg = "Failed to authenticate to #{url} \n\ sponse code: #{response.code}\n Body: #{response.body}" end [success, msg, access_token] end
def self.is_automate2_server?(config)
def self.is_automate2_server?(config) config["server_type"] == "automate2" end
def self.profile_split(profile)
def self.profile_split(profile) owner, id = profile.split("/") id, version = id.split("#") [owner, id, version] end
def self.profiles(config, profile_filter = nil) # rubocop:disable Metrics/PerceivedComplexity, Metrics/CyclomaticComplexity, Metrics/AbcSize, Metrics/MethodLength
the username of the account is used that is logged in
the user is either specified in the options hash or by default
return all compliance profiles available for the user
def self.profiles(config, profile_filter = nil) # rubocop:disable Metrics/PerceivedComplexity, Metrics/CyclomaticComplexity, Metrics/AbcSize, Metrics/MethodLength owner = config["owner"] || config["user"] if is_automate2_server?(config) url = "#{config["server"]}/compliance/profiles/search" else raise ServerConfigurationMissing end headers = get_headers(config) if profile_filter _owner, id, ver = profile_split(profile_filter) else id, ver = nil end body = { owner: owner, name: id }.to_json response = InspecPlugins::Compliance::HTTP.post_with_headers(url, headers, body, config["insecure"]) data = response.body response_code = response.code case response_code when "200" msg = "success" profiles = JSON.parse(data) # iterate over profiles mapped_profiles = [] profiles["profiles"].each do |p| mapped_profiles << p end # filter by name and version if they were specified in profile_filter mapped_profiles.select! do |p| (!ver || p["version"] == ver) && (!id || p["name"] == id) end [msg, mapped_profiles] when "401" msg = "401 Unauthorized. Please check your token." [msg, []] else msg = "An unexpected error occurred (HTTP #{response_code}): #{response.message}" [msg, []] end end
def self.sanitize_profile_name(profile)
def self.sanitize_profile_name(profile) if URI(profile).scheme == "compliance" uri = URI(profile) else uri = URI("compliance://#{profile}") end uri.to_s.sub(%r{^compliance:\/\/}, "") end
def self.server_version_from_config(config)
def self.server_version_from_config(config) # Automate versions 0.8.x and later will have a "version" key in the config # that looks like: "version":{"api":"compliance","version":"0.8.24"} return nil unless config.key?("version") return nil unless config["version"].is_a?(Hash) config["version"]["version"] end
def self.target_url(config, profile)
def self.target_url(config, profile) "#{config["server"]}/compliance/profiles/tar" end
def self.upload(config, owner, profile_name, archive_path)
def self.upload(config, owner, profile_name, archive_path) url = "#{config["server"]}/compliance/profiles?owner=#{owner}" headers = get_headers(config) res = InspecPlugins::Compliance::HTTP.post_multipart_file(url, headers, archive_path, config["insecure"]) [res.is_a?(Net::HTTPSuccess), res.body] end
def self.version(config)
NB this method does not use Compliance::Configuration to allow for using
return the server api version
def self.version(config) url = config["server"] insecure = config["insecure"] raise ServerConfigurationMissing if url.nil? headers = get_headers(config) response = InspecPlugins::Compliance::HTTP.get(url + "/version", headers, insecure) return {} if response.code == "404" data = response.body return {} if data.nil? || data.empty? parsed = JSON.parse(data) return {} unless parsed.key?("build_timestamp") && !parsed["build_timestamp"].empty? parsed end