lib/honeybadger/karafka.rb



require "honeybadger/instrumentation_helper"

module Honeybadger
  module Karafka
    class ErrorsListener
      # Sends error details to Honeybadger
      #
      # @param event [Karafka::Core::Monitoring::Event]
      def on_error_occurred(event)
        context = {
          type: event[:type]
        }

        if (consumer = event.payload[:caller]).respond_to?(:messages)
          messages = consumer.messages
          metadata = messages.metadata
          consumer_group_id = consumer.topic.consumer_group.id

          context[:topic] = metadata.topic
          context[:partition] = metadata.partition
          context[:consumer_group] = consumer_group_id
        end

        Honeybadger.notify(event[:error], context: context)
      end
    end

    class InsightsListener
      include ::Honeybadger::InstrumentationHelper

      # Value object for storing a single rdkafka metric publishing details
      RdKafkaMetric = Struct.new(:type, :scope, :name, :key_location)

      # All the rdkafka metrics we want to publish
      #
      # By default we publish quite a lot so this can be tuned
      # Note, that the once with `_d` come from Karafka, not rdkafka or Kafka
      RD_KAFKA_METRICS = [
        # Client metrics
        RdKafkaMetric.new(:increment_counter, :root, "messages_consumed", "rxmsgs_d"),
        RdKafkaMetric.new(:increment_counter, :root, "messages_consumed_bytes", "rxmsg_bytes"),

        # Broker metrics
        RdKafkaMetric.new(:increment_counter, :brokers, "consume_attempts", "txretries_d"),
        RdKafkaMetric.new(:increment_counter, :brokers, "consume_errors", "txerrs_d"),
        RdKafkaMetric.new(:increment_counter, :brokers, "receive_errors", "rxerrs_d"),
        RdKafkaMetric.new(:increment_counter, :brokers, "connection_connects", "connects_d"),
        RdKafkaMetric.new(:increment_counter, :brokers, "connection_disconnects", "disconnects_d"),
        RdKafkaMetric.new(:gauge, :brokers, "network_latency_avg", %w[rtt avg]),
        RdKafkaMetric.new(:gauge, :brokers, "network_latency_p95", %w[rtt p95]),
        RdKafkaMetric.new(:gauge, :brokers, "network_latency_p99", %w[rtt p99]),

        # Topics metrics
        RdKafkaMetric.new(:gauge, :topics, "consumer_lags", "consumer_lag_stored"),
        RdKafkaMetric.new(:gauge, :topics, "consumer_lags_delta", "consumer_lag_stored_d")
      ].freeze

      # Metrics that sum values on topics levels and not on partition levels
      AGGREGATED_RD_KAFKA_METRICS = [
        # Topic aggregated metrics
        RdKafkaMetric.new(:gauge, :topics, "consumer_aggregated_lag", "consumer_lag_stored")
      ].freeze

      def initialize
        metric_source("karafka")
      end

      # Hooks up to Karafka instrumentation for emitted statistics
      #
      # @param event [Karafka::Core::Monitoring::Event]
      def on_statistics_emitted(event)
        if Honeybadger.config.load_plugin_insights_events?(:karafka)
          Honeybadger.event("statistics_emitted.karafka", event.payload)
        end

        return unless Honeybadger.config.load_plugin_insights_metrics?(:karafka)

        statistics = event[:statistics]
        consumer_group_id = event[:consumer_group_id]

        base_tags = {consumer_group: consumer_group_id}

        RD_KAFKA_METRICS.each do |metric|
          report_metric(metric, statistics, base_tags)
        end

        report_aggregated_topics_metrics(statistics, consumer_group_id)
      end

      # Publishes aggregated topic-level metrics that are sum of per partition metrics
      #
      # @param statistics [Hash] hash with all the statistics emitted
      # @param consumer_group_id [String] cg in context which we operate
      def report_aggregated_topics_metrics(statistics, consumer_group_id)
        AGGREGATED_RD_KAFKA_METRICS.each do |metric|
          statistics.fetch("topics").each do |topic_name, topic_values|
            sum = 0

            topic_values["partitions"].each do |partition_name, partition_statistics|
              next if partition_name == "-1"
              # Skip until lag info is available
              next if partition_statistics["consumer_lag"] == -1
              next if partition_statistics["consumer_lag_stored"] == -1

              sum += partition_statistics.dig(*metric.key_location)
            end

            public_send(
              metric.type,
              metric.name,
              value: sum,
              consumer_group: consumer_group_id,
              topic: topic_name
            )
          end
        end
      end

      # Increases the errors count by 1
      #
      # @param event [Karafka::Core::Monitoring::Event]
      def on_error_occurred(event)
        extra_tags = {type: event[:type]}

        if event.payload[:caller].respond_to?(:messages)
          extra_tags.merge!(consumer_tags(event.payload[:caller]))
        end

        if Honeybadger.config.load_plugin_insights_events?(:karafka)
          Honeybadger.event("error.occurred.karafka", error: event[:error], **extra_tags)
        end

        if Honeybadger.config.load_plugin_insights_metrics?(:karafka)
          increment_counter("error_occurred", value: 1, **extra_tags)
        end
      end

      # Reports how many messages we've polled and how much time did we spend on it
      #
      # @param event [Karafka::Core::Monitoring::Event]
      def on_connection_listener_fetch_loop_received(event)
        time_taken = event[:time]
        messages_count = event[:messages_buffer].size
        consumer_group_id = event[:subscription_group].consumer_group.id
        extra_tags = {consumer_group: consumer_group_id}

        if Honeybadger.config.load_plugin_insights_metrics?(:karafka)
          histogram("listener_polling_time_taken", value: time_taken, **extra_tags)
          histogram("listener_polling_messages", value: messages_count, **extra_tags)
        end
      end

      # Here we report majority of things related to processing as we have access to the
      # consumer
      # @param event [Karafka::Core::Monitoring::Event]
      def on_consumer_consumed(event)
        consumer = event.payload[:caller]
        messages = consumer.messages
        metadata = messages.metadata

        tags = consumer_tags(consumer)

        if Honeybadger.config.load_plugin_insights_events?(:karafka)
          event_context = tags.merge({
            consumer: consumer.class.name,
            duration: event[:time],
            processing_lag: metadata.processing_lag,
            consumption_lag: metadata.consumption_lag,
            processed: messages.count
          })
          Honeybadger.event("consumer.consumed.karafka", event_context)
        end

        if Honeybadger.config.load_plugin_insights_metrics?(:karafka)
          increment_counter("consumer_messages", value: messages.count, **tags)
          increment_counter("consumer_batches", value: 1, **tags)
          gauge("consumer_offset", value: metadata.last_offset, **tags)
          histogram("consumer_consumed_time_taken", value: event[:time], **tags)
          histogram("consumer_batch_size", value: messages.count, **tags)
          histogram("consumer_processing_lag", value: metadata.processing_lag, **tags)
          histogram("consumer_consumption_lag", value: metadata.consumption_lag, **tags)
        end
      end

      {
        revoked: :revoked,
        shutdown: :shutdown,
        ticked: :tick
      }.each do |after, name|
        class_eval <<~RUBY, __FILE__, __LINE__ + 1
          # Keeps track of user code execution
          #
          # @param event [Karafka::Core::Monitoring::Event]
          def on_consumer_#{after}(event)
            if Honeybadger.config.load_plugin_insights_metrics?(:karafka)
              tags = consumer_tags(event.payload[:caller])
              increment_counter('consumer_#{name}', value: 1, **tags)
            end
          end
        RUBY
      end

      # Worker related metrics
      # @param event [Karafka::Core::Monitoring::Event]
      def on_worker_process(event)
        jq_stats = event[:jobs_queue].statistics

        if Honeybadger.config.load_plugin_insights_metrics?(:karafka)
          gauge("worker_total_threads", value: ::Karafka::App.config.concurrency)
          histogram("worker_processing", value: jq_stats[:busy])
          histogram("worker_enqueued_jobs", value: jq_stats[:enqueued])
        end
      end

      # We report this metric before and after processing for higher accuracy
      # Without this, the utilization would not be fully reflected
      # @param event [Karafka::Core::Monitoring::Event]
      def on_worker_processed(event)
        jq_stats = event[:jobs_queue].statistics

        if Honeybadger.config.load_plugin_insights_metrics?(:karafka)
          histogram("worker_processing", value: jq_stats[:busy])
        end
      end

      private

      # Reports a given metric statistics to Honeybadger
      # @param metric [RdKafkaMetric] metric value object
      # @param statistics [Hash] hash with all the statistics emitted
      # @param base_tags [Array<String>] base tags we want to start with
      def report_metric(metric, statistics, base_tags)
        case metric.scope
        when :root
          public_send(
            metric.type,
            metric.name,
            value: statistics.fetch(*metric.key_location),
            **base_tags
          )
        when :brokers
          statistics.fetch("brokers").each_value do |broker_statistics|
            # Skip bootstrap nodes
            # Bootstrap nodes have nodeid -1, other nodes have positive
            # node ids
            next if broker_statistics["nodeid"] == -1

            public_send(
              metric.type,
              metric.name,
              value: broker_statistics.dig(*metric.key_location),
              **base_tags.merge(broker: broker_statistics["nodename"])
            )
          end
        when :topics
          statistics.fetch("topics").each do |topic_name, topic_values|
            topic_values["partitions"].each do |partition_name, partition_statistics|
              next if partition_name == "-1"
              # Skip until lag info is available
              next if partition_statistics["consumer_lag"] == -1
              next if partition_statistics["consumer_lag_stored"] == -1

              # Skip if we do not own the fetch assignment
              next if partition_statistics["fetch_state"] == "stopped"
              next if partition_statistics["fetch_state"] == "none"

              public_send(
                metric.type,
                metric.name,
                value: partition_statistics.dig(*metric.key_location),
                **base_tags.merge({
                  topic: topic_name,
                  partition: partition_name
                })
              )
            end
          end
        else
          raise ArgumentError, metric.scope
        end
      end

      # Builds basic per consumer tags for publication
      #
      # @param consumer [Karafka::BaseConsumer]
      # @return [Array<String>]
      def consumer_tags(consumer)
        messages = consumer.messages
        metadata = messages.metadata
        consumer_group_id = consumer.topic.consumer_group.id

        {
          topic: metadata.topic,
          partition: metadata.partition,
          consumer_group: consumer_group_id
        }
      end
    end
  end
end