class Honeybadger::Karafka::InsightsListener

def consumer_tags(consumer)

Returns:
  • (Array) -

Parameters:
  • consumer (Karafka::BaseConsumer) --
def consumer_tags(consumer)
  messages = consumer.messages
  metadata = messages.metadata
  consumer_group_id = consumer.topic.consumer_group.id
  {
    topic: metadata.topic,
    partition: metadata.partition,
    consumer_group: consumer_group_id
  }
end

def initialize

def initialize
  metric_source("karafka")
end

def on_connection_listener_fetch_loop_received(event)

Parameters:
  • event (Karafka::Core::Monitoring::Event) --
def on_connection_listener_fetch_loop_received(event)
  time_taken = event[:time]
  messages_count = event[:messages_buffer].size
  consumer_group_id = event[:subscription_group].consumer_group.id
  extra_tags = {consumer_group: consumer_group_id}
  if Honeybadger.config.load_plugin_insights_metrics?(:karafka)
    histogram("listener_polling_time_taken", value: time_taken, **extra_tags)
    histogram("listener_polling_messages", value: messages_count, **extra_tags)
  end
end

def on_consumer_consumed(event)

Parameters:
  • event (Karafka::Core::Monitoring::Event) --
def on_consumer_consumed(event)
  consumer = event.payload[:caller]
  messages = consumer.messages
  metadata = messages.metadata
  tags = consumer_tags(consumer)
  if Honeybadger.config.load_plugin_insights_events?(:karafka)
    event_context = tags.merge({
      consumer: consumer.class.name,
      duration: event[:time],
      processing_lag: metadata.processing_lag,
      consumption_lag: metadata.consumption_lag,
      processed: messages.count
    })
    Honeybadger.event("consumer.consumed.karafka", event_context)
  end
  if Honeybadger.config.load_plugin_insights_metrics?(:karafka)
    increment_counter("consumer_messages", value: messages.count, **tags)
    increment_counter("consumer_batches", value: 1, **tags)
    gauge("consumer_offset", value: metadata.last_offset, **tags)
    histogram("consumer_consumed_time_taken", value: event[:time], **tags)
    histogram("consumer_batch_size", value: messages.count, **tags)
    histogram("consumer_processing_lag", value: metadata.processing_lag, **tags)
    histogram("consumer_consumption_lag", value: metadata.consumption_lag, **tags)
  end
end

def on_error_occurred(event)

Parameters:
  • event (Karafka::Core::Monitoring::Event) --
def on_error_occurred(event)
  extra_tags = {type: event[:type]}
  if event.payload[:caller].respond_to?(:messages)
    extra_tags.merge!(consumer_tags(event.payload[:caller]))
  end
  if Honeybadger.config.load_plugin_insights_events?(:karafka)
    Honeybadger.event("error.occurred.karafka", error: event[:error], **extra_tags)
  end
  if Honeybadger.config.load_plugin_insights_metrics?(:karafka)
    increment_counter("error_occurred", value: 1, **extra_tags)
  end
end

def on_statistics_emitted(event)

Parameters:
  • event (Karafka::Core::Monitoring::Event) --
def on_statistics_emitted(event)
  if Honeybadger.config.load_plugin_insights_events?(:karafka)
    Honeybadger.event("statistics_emitted.karafka", event.payload)
  end
  return unless Honeybadger.config.load_plugin_insights_metrics?(:karafka)
  statistics = event[:statistics]
  consumer_group_id = event[:consumer_group_id]
  base_tags = {consumer_group: consumer_group_id}
  RD_KAFKA_METRICS.each do |metric|
    report_metric(metric, statistics, base_tags)
  end
  report_aggregated_topics_metrics(statistics, consumer_group_id)
end

def on_worker_process(event)

Parameters:
  • event (Karafka::Core::Monitoring::Event) --
def on_worker_process(event)
  jq_stats = event[:jobs_queue].statistics
  if Honeybadger.config.load_plugin_insights_metrics?(:karafka)
    gauge("worker_total_threads", value: ::Karafka::App.config.concurrency)
    histogram("worker_processing", value: jq_stats[:busy])
    histogram("worker_enqueued_jobs", value: jq_stats[:enqueued])
  end
end

def on_worker_processed(event)

Parameters:
  • event (Karafka::Core::Monitoring::Event) --
def on_worker_processed(event)
  jq_stats = event[:jobs_queue].statistics
  if Honeybadger.config.load_plugin_insights_metrics?(:karafka)
    histogram("worker_processing", value: jq_stats[:busy])
  end
end

def report_aggregated_topics_metrics(statistics, consumer_group_id)

Parameters:
  • consumer_group_id (String) -- cg in context which we operate
  • statistics (Hash) -- hash with all the statistics emitted
def report_aggregated_topics_metrics(statistics, consumer_group_id)
  AGGREGATED_RD_KAFKA_METRICS.each do |metric|
    statistics.fetch("topics").each do |topic_name, topic_values|
      sum = 0
      topic_values["partitions"].each do |partition_name, partition_statistics|
        next if partition_name == "-1"
        # Skip until lag info is available
        next if partition_statistics["consumer_lag"] == -1
        next if partition_statistics["consumer_lag_stored"] == -1
        sum += partition_statistics.dig(*metric.key_location)
      end
      public_send(
        metric.type,
        metric.name,
        value: sum,
        consumer_group: consumer_group_id,
        topic: topic_name
      )
    end
  end
end

def report_metric(metric, statistics, base_tags)

Parameters:
  • base_tags (Array) -- base tags we want to start with
  • statistics (Hash) -- hash with all the statistics emitted
  • metric (RdKafkaMetric) -- metric value object
def report_metric(metric, statistics, base_tags)
  case metric.scope
  when :root
    public_send(
      metric.type,
      metric.name,
      value: statistics.fetch(*metric.key_location),
      **base_tags
    )
  when :brokers
    statistics.fetch("brokers").each_value do |broker_statistics|
      # Skip bootstrap nodes
      # Bootstrap nodes have nodeid -1, other nodes have positive
      # node ids
      next if broker_statistics["nodeid"] == -1
      public_send(
        metric.type,
        metric.name,
        value: broker_statistics.dig(*metric.key_location),
        **base_tags.merge(broker: broker_statistics["nodename"])
      )
    end
  when :topics
    statistics.fetch("topics").each do |topic_name, topic_values|
      topic_values["partitions"].each do |partition_name, partition_statistics|
        next if partition_name == "-1"
        # Skip until lag info is available
        next if partition_statistics["consumer_lag"] == -1
        next if partition_statistics["consumer_lag_stored"] == -1
        # Skip if we do not own the fetch assignment
        next if partition_statistics["fetch_state"] == "stopped"
        next if partition_statistics["fetch_state"] == "none"
        public_send(
          metric.type,
          metric.name,
          value: partition_statistics.dig(*metric.key_location),
          **base_tags.merge({
            topic: topic_name,
            partition: partition_name
          })
        )
      end
    end
  else
    raise ArgumentError, metric.scope
  end
end