lib/rubocop/cop/custom/ip_ranges.rb
# frozen_string_literal: true require 'ipaddr' require 'net/http' require 'nokogiri' require 'jsonpath' require 'rubocop' require 'uri' module RuboCop module Cop module Custom class IpRanges < Base # :nodoc: extend AutoCorrector MSG = 'Outdated list of IP ranges compared to %<url>s' REGEXP = /^\s*#\s*@fetch:(?<param>[a-z0-9_]+)\s+(?<arg>.*)?/.freeze def_node_matcher :on_ip_ranges, <<~PATTERN (send nil? :ip_ranges $(array str+)) PATTERN def on_send(node) on_ip_ranges(node) do |value| params = fetch_params(node) return unless mandatory_params?(params) existing_ips = normalise_list(read_node_ips(value)) new_ips = normalise_list(fetch_ips(**params)) return unless new_ips return if existing_ips == new_ips register_offense(value, new_ips, **params) end end private def fetch_ips(url:, selector: nil, jsonpath: nil) body = get_url url return unless body return parse_html(body, selector) if selector return parse_json(body, jsonpath) if jsonpath parse_text(body) end def get_url(url) response = Net::HTTP.get_response URI(url) unless response.is_a?(Net::HTTPOK) add_global_offense "Could not fetch IPs from #{url} , HTTP status code #{response.code}" return end response.value response.body end def parse_html(body, selector) document = Nokogiri::HTML body document.css(selector).map(&:content) end def parse_json(body, jsonpath) document = JSON.parse body JsonPath.new(jsonpath).on(document) end def parse_text(body) body.lines.map(&:chomp) end def read_node_ips(value) value.child_nodes.map(&:value) end def normalise_list(ips) ips.sort_by(&IPAddr.method(:new)) end def register_offense(node, new_ips, **params) message = format(MSG, params) add_offense node, message: message do |corrector| corrector.replace node, node_replacement(new_ips) end end def mandatory_params?(params) params.include?(:url) end def fetch_params(node) comments = processed_source.ast_with_comments[node] comments.map do |comment| match = comment.text.match(REGEXP) next unless match [match[:param].to_sym, match[:arg]] end.compact.to_h end def node_replacement(new_ips) contents = new_ips.join("\n") "%w[\n#{contents}\n]" end end end end end