lib/datadog/statsd/emitter.rb



# frozen_string_literal: true

require "forwardable"
require "datadog/statsd"
require "ostruct"
require "active_support/core_ext/string/inflections"

# Load colored2 for error formatting if available
begin
  require "colored2"
rescue LoadError
  # colored2 not available, use plain text
end

# Load schema classes if available
begin
  require_relative "schema"
  require_relative "schema/namespace"
  require_relative "schema/errors"
rescue LoadError
  # Schema classes not available, validation will be skipped
end

module Datadog
  class Statsd
    class Emitter
      MUTEX = Mutex.new

      DEFAULT_HOST = "127.0.0.1"
      DEFAULT_PORT = 8125
      DEFAULT_NAMESPACE = nil
      DEFAULT_ARGUMENTS = { delay_serialization: true }
      DEFAULT_SAMPLE_RATE = 1.0
      DEFAULT_VALIDATION_MODE = :strict

      # @description This class is a wrapper around the Datadog::Statsd class. It provides a
      #     simple interface for sending metrics to Datadog. It also supports AB testing.
      #     When initialized with a schema, it validates metrics and tags against the schema.
      #
      #     @see Datadog::Statsd::Emitter.new for more details.
      #
      class << self
        attr_accessor :datadog_statsd

        # @return [Datadog::Statsd, NilClass] The Datadog Statsd client instance or nil if not
        #     currently connected.
        def statsd
          return @datadog_statsd if defined?(@datadog_statsd)

          @datadog_statsd = ::Datadog::Statsd::Schema.configuration.statsd
        end

        extend Forwardable
        def_delegators :datadog_statsd,
                       :increment,
                       :decrement,
                       :gauge,
                       :histogram,
                       :distribution,
                       :set,
                       :flush

        def global_tags
          @global_tags ||= OpenStruct.new
        end

        def configure
          yield(global_tags)
        end

        def connect(
          host: DEFAULT_HOST,
          port: DEFAULT_PORT,
          tags: {},
          sample_rate: DEFAULT_SAMPLE_RATE,
          namespace: DEFAULT_NAMESPACE,
          **opts
        )
          return @datadog_statsd if defined?(@datadog_statsd) && @datadog_statsd

          tags ||= {}
          tags = tags.merge(global_tags.to_h)
          tags = tags.map { |k, v| "#{k}:#{v}" }

          opts ||= {}
          # Remove any unknown parameters that Datadog::Statsd doesn't support
          opts = opts.except(:emitter) if opts.key?(:emitter)
          opts = DEFAULT_ARGUMENTS.merge(opts)

          MUTEX.synchronize do
            unless defined?(@datadog_statsd)
              @datadog_statsd =
                ::Datadog::Statsd.new(host, port, namespace:, tags:, sample_rate:, **opts)
            end
          end

          yield(datadog_statsd) if block_given?
        end

        def close
          begin
            @datadog_statsd&.close
          rescue StandardError
            nil
          end
          @datadog_statsd = nil
        end
      end

      attr_reader :tags, :ab_test, :sample_rate, :metric, :schema, :validation_mode

      def initialize(
        emitter = nil,
        metric: nil,
        tags: nil,
        ab_test: nil,
        sample_rate: nil,
        schema: nil,
        validation_mode: DEFAULT_VALIDATION_MODE
      )
        if emitter.nil? && metric.nil? && tags.nil? && ab_test.nil? && sample_rate.nil? && schema.nil?
          raise ArgumentError,
                "Datadog::Statsd::Emitter: use class methods if you are passing nothing to the constructor."
        end
        @sample_rate = sample_rate || 1.0
        @tags = tags || nil
        @tags.merge!(self.class.global_tags.to_h) if self.class.global_tags.present?

        @ab_test = ab_test || {}
        @metric = metric
        @schema = schema
        @validation_mode = validation_mode

        emitter =
          case emitter
          when String, Symbol
            emitter.to_s
          when Module, Class
            emitter.name
          else
            emitter&.class&.name
          end

        emitter = nil if emitter == "Object"
        emitter = emitter&.gsub("::", ".")&.underscore&.downcase

        return unless emitter

        @tags ||= {}
        @tags[:emitter] = emitter
      end

      def method_missing(m, *args, **opts, &)
        args, opts = normalize_arguments(*args, **opts)

        # If schema validation fails, handle based on validation mode
        if @schema && should_validate?(args)
          validation_result = validate_metric_call(m, *args, **opts)
          return if validation_result == :drop
        end

        if ENV.fetch("DATADOG_DEBUG", false)
          warn "<CUSTOM METRIC to STATSD>: #{self}->#{m}(#{args.join(", ")}, #{opts.inspect})"
        end
        statsd&.send(m, *args, **opts, &)
      end

      def respond_to_missing?(method, *)
        statsd&.respond_to? method
      end

      def normalize_arguments(*args, **opts)
        # Handle metric name - use constructor metric if none provided in method call
        normalized_args = args.dup

        if @metric
          if normalized_args.empty?
            normalized_args = [@metric]
          elsif normalized_args.first.nil?
            normalized_args[0] = @metric
          end
        end

        # Start with instance tags
        merged_tags = (@tags || {}).dup

        # Convert instance ab_test to tags
        (@ab_test || {}).each do |test_name, group|
          merged_tags[:ab_test_name] = test_name
          merged_tags[:ab_test_group] = group
        end

        # Handle ab_test from method call opts and remove it from opts
        normalized_opts = opts.dup
        if normalized_opts[:ab_test]
          normalized_opts[:ab_test].each do |test_name, group|
            merged_tags[:ab_test_name] = test_name
            merged_tags[:ab_test_group] = group
          end
          normalized_opts.delete(:ab_test)
        end

        # Merge with method call tags (method call tags take precedence)
        merged_tags = merged_tags.merge(normalized_opts[:tags]) if normalized_opts[:tags]

        # Set merged tags in opts if there are any
        normalized_opts[:tags] = merged_tags unless merged_tags.empty?

        # Handle sample_rate - use instance sample_rate if not provided in method call
        if @sample_rate && @sample_rate != 1.0 && !normalized_opts.key?(:sample_rate)
          normalized_opts[:sample_rate] = @sample_rate
        end

        [normalized_args, normalized_opts]
      end

      private

      def should_validate?(args)
        !args.empty? && args.first && @validation_mode != :off
      end

      def validate_metric_call(metric_method, *args, **opts)
        return unless @schema && !args.empty?

        metric_name = args.first
        return unless metric_name

        metric_type = normalize_metric_type(metric_method)
        provided_tags = opts[:tags] || {}

        begin
          validate_metric_exists(metric_name, metric_type)
          validate_metric_tags(metric_name, provided_tags)
        rescue Datadog::Statsd::Schema::SchemaError => e
          handle_validation_error(e)
        end
      end

      def validate_metric_exists(metric_name, metric_type)
        # Try to find the metric in the schema
        all_metrics = @schema.all_metrics

        # Look for exact match first
        metric_info = all_metrics[metric_name.to_s]

        unless metric_info
          # Look for partial matches to provide better error messages
          suggestions = find_metric_suggestions(metric_name, all_metrics.keys)
          error_message = "Unknown metric '#{metric_name}'"
          error_message += ". Did you mean: #{suggestions.join(", ")}?" if suggestions.any?
          error_message += ". Available metrics: #{all_metrics.keys.first(5).join(", ")}"
          error_message += ", ..." if all_metrics.size > 5

          raise Datadog::Statsd::Schema::UnknownMetricError.new(error_message, metric: metric_name)
        end

        # Validate metric type matches
        expected_type = metric_info[:definition].type
        return unless expected_type != metric_type

        error_message = "Invalid metric type for '#{metric_name}'. Expected '#{expected_type}', got '#{metric_type}'"
        raise Datadog::Statsd::Schema::InvalidMetricTypeError.new(
          error_message,
          namespace: metric_info[:namespace_path].join("."),
          metric: metric_name
        )
      end

      def validate_metric_tags(metric_name, provided_tags)
        all_metrics = @schema.all_metrics
        metric_info = all_metrics[metric_name.to_s]
        return unless metric_info

        metric_definition = metric_info[:definition]
        namespace = metric_info[:namespace]

        # Get effective tags including inherited ones from namespace
        effective_tags = namespace.effective_tags

        # Check for missing required tags
        missing_required = metric_definition.missing_required_tags(provided_tags)
        if missing_required.any?
          error_message = "Missing required tags for metric '#{metric_name}': #{missing_required.join(", ")}"
          error_message += ". Required tags: #{metric_definition.required_tags.join(", ")}"

          raise Datadog::Statsd::Schema::MissingRequiredTagError.new(
            error_message,
            namespace: metric_info[:namespace_path].join("."),
            metric: metric_name
          )
        end

        # Check for invalid tags (if metric has allowed_tags restrictions)
        # Exclude framework tags like 'emitter' from validation
        framework_tags = %i[emitter ab_test_name ab_test_group]
        user_provided_tags = provided_tags.reject { |key, _| framework_tags.include?(key.to_sym) }

        invalid_tags = metric_definition.invalid_tags(user_provided_tags)
        if invalid_tags.any?
          error_message = "Invalid tags for metric '#{metric_name}': #{invalid_tags.join(", ")}"
          if metric_definition.allowed_tags.any?
            error_message += ". Allowed tags: #{metric_definition.allowed_tags.join(", ")}"
          end

          raise Datadog::Statsd::Schema::InvalidTagError.new(
            error_message,
            namespace: metric_info[:namespace_path].join("."),
            metric: metric_name
          )
        end

        # Validate tag values against schema definitions (including framework tags)
        provided_tags.each do |tag_name, tag_value|
          # Skip validation for framework tags that don't have schema definitions
          next if framework_tags.include?(tag_name.to_sym) && !effective_tags[tag_name.to_sym]

          tag_definition = effective_tags[tag_name.to_sym]
          next unless tag_definition

          validate_tag_value(metric_name, tag_name, tag_value, tag_definition, metric_info)
        end
      end

      def validate_tag_value(metric_name, tag_name, tag_value, tag_definition, metric_info)
        # Type validation
        case tag_definition.type
        when :integer
          unless tag_value.is_a?(Integer) || (tag_value.is_a?(String) && tag_value.match?(/^\d+$/))
            raise Datadog::Statsd::Schema::InvalidTagError.new(
              "Tag '#{tag_name}' for metric '#{metric_name}' must be an integer, got #{tag_value.class}",
              namespace: metric_info[:namespace_path].join("."),
              metric: metric_name,
              tag: tag_name
            )
          end
        when :symbol
          unless tag_value.is_a?(Symbol) || tag_value.is_a?(String)
            raise Datadog::Statsd::Schema::InvalidTagError.new(
              "Tag '#{tag_name}' for metric '#{metric_name}' must be a symbol or string, got #{tag_value.class}",
              namespace: metric_info[:namespace_path].join("."),
              metric: metric_name,
              tag: tag_name
            )
          end
        end

        # Value validation
        if tag_definition.values
          normalized_value = tag_value.to_s
          allowed_values = Array(tag_definition.values).map(&:to_s)

          unless allowed_values.include?(normalized_value) ||
                 value_matches_pattern?(normalized_value, tag_definition.values)
            raise Datadog::Statsd::Schema::InvalidTagError.new(
              "Invalid value '#{tag_value}' for tag '#{tag_name}' in metric '#{metric_name}'. Allowed values: #{allowed_values.join(", ")}",
              namespace: metric_info[:namespace_path].join("."),
              metric: metric_name,
              tag: tag_name
            )
          end
        end

        # Custom validation
        return unless tag_definition.validate && tag_definition.validate.respond_to?(:call)
        return if tag_definition.validate.call(tag_value)

        raise Datadog::Statsd::Schema::InvalidTagError.new(
          "Custom validation failed for tag '#{tag_name}' with value '#{tag_value}' in metric '#{metric_name}'",
          namespace: metric_info[:namespace_path].join("."),
          metric: metric_name,
          tag: tag_name
        )
      end

      def value_matches_pattern?(value, patterns)
        Array(patterns).any? do |pattern|
          case pattern
          when Regexp
            value.match?(pattern)
          else
            false
          end
        end
      end

      def find_metric_suggestions(metric_name, available_metrics)
        # Simple fuzzy matching - find metrics that contain the metric name or vice versa
        suggestions = available_metrics.select do |available|
          available.include?(metric_name) || metric_name.include?(available) ||
            levenshtein_distance(metric_name, available) <= 2
        end
        suggestions.first(3) # Limit to 3 suggestions
      end

      def levenshtein_distance(str1, str2)
        # Simple Levenshtein distance implementation
        return str2.length if str1.empty?
        return str1.length if str2.empty?

        matrix = Array.new(str1.length + 1) { Array.new(str2.length + 1) }

        (0..str1.length).each { |i| matrix[i][0] = i }
        (0..str2.length).each { |j| matrix[0][j] = j }

        (1..str1.length).each do |i|
          (1..str2.length).each do |j|
            cost = str1[i - 1] == str2[j - 1] ? 0 : 1
            matrix[i][j] = [
              matrix[i - 1][j] + 1,     # deletion
              matrix[i][j - 1] + 1,     # insertion
              matrix[i - 1][j - 1] + cost # substitution
            ].min
          end
        end

        matrix[str1.length][str2.length]
      end

      def normalize_metric_type(method_name)
        case method_name.to_sym
        when :increment, :decrement, :count
          :counter
        when :gauge
          :gauge
        when :histogram
          :histogram
        when :distribution
          :distribution
        when :set
          :set
        when :timing
          :timing
        else
          method_name.to_sym
        end
      end

      def handle_validation_error(error)
        case @validation_mode
        when :strict
          # Only show colored output if not in test and colored2 is available
          if Datadog::Statsd::Schema.in_test
            warn "Schema Validation Error: #{error.message}"
          else
            warn "Schema Validation Error:\n • ".yellow + error.message.to_s.red
          end
          raise error
        when :warn
          # Only show colored output if not in test and colored2 is available
          if Datadog::Statsd::Schema.in_test
            warn "Schema Validation Warning: #{error.message}"
          else
            warn "Schema Validation Warning:\n • ".yellow + error.message.to_s.bold.yellow
          end
          nil # Continue execution
        when :drop
          :drop # Signal to drop the metric
        when :off
          nil # No validation - continue execution
        else
          raise error
        end
      end

      delegate :flush, to: :class

      delegate :statsd, to: :class
    end
  end
end