class Honeybadger::Karafka::InsightsListener
def consumer_tags(consumer)
-
(Array
-)
Parameters:
-
consumer
(Karafka::BaseConsumer
) --
def consumer_tags(consumer) messages = consumer.messages metadata = messages.metadata consumer_group_id = consumer.topic.consumer_group.id { topic: metadata.topic, partition: metadata.partition, consumer_group: consumer_group_id } end
def initialize
def initialize metric_source("karafka") end
def on_connection_listener_fetch_loop_received(event)
-
event
(Karafka::Core::Monitoring::Event
) --
def on_connection_listener_fetch_loop_received(event) time_taken = event[:time] messages_count = event[:messages_buffer].size consumer_group_id = event[:subscription_group].consumer_group.id extra_tags = {consumer_group: consumer_group_id} if Honeybadger.config.load_plugin_insights_metrics?(:karafka) histogram("listener_polling_time_taken", value: time_taken, **extra_tags) histogram("listener_polling_messages", value: messages_count, **extra_tags) end end
def on_consumer_consumed(event)
-
event
(Karafka::Core::Monitoring::Event
) --
def on_consumer_consumed(event) consumer = event.payload[:caller] messages = consumer.messages metadata = messages.metadata tags = consumer_tags(consumer) if Honeybadger.config.load_plugin_insights_events?(:karafka) event_context = tags.merge({ consumer: consumer.class.name, duration: event[:time], processing_lag: metadata.processing_lag, consumption_lag: metadata.consumption_lag, processed: messages.count }) Honeybadger.event("consumer.consumed.karafka", event_context) end if Honeybadger.config.load_plugin_insights_metrics?(:karafka) increment_counter("consumer_messages", value: messages.count, **tags) increment_counter("consumer_batches", value: 1, **tags) gauge("consumer_offset", value: metadata.last_offset, **tags) histogram("consumer_consumed_time_taken", value: event[:time], **tags) histogram("consumer_batch_size", value: messages.count, **tags) histogram("consumer_processing_lag", value: metadata.processing_lag, **tags) histogram("consumer_consumption_lag", value: metadata.consumption_lag, **tags) end end
def on_error_occurred(event)
-
event
(Karafka::Core::Monitoring::Event
) --
def on_error_occurred(event) extra_tags = {type: event[:type]} if event.payload[:caller].respond_to?(:messages) extra_tags.merge!(consumer_tags(event.payload[:caller])) end if Honeybadger.config.load_plugin_insights_events?(:karafka) Honeybadger.event("error.occurred.karafka", error: event[:error], **extra_tags) end if Honeybadger.config.load_plugin_insights_metrics?(:karafka) increment_counter("error_occurred", value: 1, **extra_tags) end end
def on_statistics_emitted(event)
-
event
(Karafka::Core::Monitoring::Event
) --
def on_statistics_emitted(event) if Honeybadger.config.load_plugin_insights_events?(:karafka) Honeybadger.event("statistics_emitted.karafka", event.payload) end return unless Honeybadger.config.load_plugin_insights_metrics?(:karafka) statistics = event[:statistics] consumer_group_id = event[:consumer_group_id] base_tags = {consumer_group: consumer_group_id} RD_KAFKA_METRICS.each do |metric| report_metric(metric, statistics, base_tags) end report_aggregated_topics_metrics(statistics, consumer_group_id) end
def on_worker_process(event)
-
event
(Karafka::Core::Monitoring::Event
) --
def on_worker_process(event) jq_stats = event[:jobs_queue].statistics if Honeybadger.config.load_plugin_insights_metrics?(:karafka) gauge("worker_total_threads", value: ::Karafka::App.config.concurrency) histogram("worker_processing", value: jq_stats[:busy]) histogram("worker_enqueued_jobs", value: jq_stats[:enqueued]) end end
def on_worker_processed(event)
-
event
(Karafka::Core::Monitoring::Event
) --
def on_worker_processed(event) jq_stats = event[:jobs_queue].statistics if Honeybadger.config.load_plugin_insights_metrics?(:karafka) histogram("worker_processing", value: jq_stats[:busy]) end end
def report_aggregated_topics_metrics(statistics, consumer_group_id)
-
consumer_group_id
(String
) -- cg in context which we operate -
statistics
(Hash
) -- hash with all the statistics emitted
def report_aggregated_topics_metrics(statistics, consumer_group_id) AGGREGATED_RD_KAFKA_METRICS.each do |metric| statistics.fetch("topics").each do |topic_name, topic_values| sum = 0 topic_values["partitions"].each do |partition_name, partition_statistics| next if partition_name == "-1" # Skip until lag info is available next if partition_statistics["consumer_lag"] == -1 next if partition_statistics["consumer_lag_stored"] == -1 sum += partition_statistics.dig(*metric.key_location) end public_send( metric.type, metric.name, value: sum, consumer_group: consumer_group_id, topic: topic_name ) end end end
def report_metric(metric, statistics, base_tags)
-
base_tags
(Array
) -- base tags we want to start with -
statistics
(Hash
) -- hash with all the statistics emitted -
metric
(RdKafkaMetric
) -- metric value object
def report_metric(metric, statistics, base_tags) case metric.scope when :root public_send( metric.type, metric.name, value: statistics.fetch(*metric.key_location), **base_tags ) when :brokers statistics.fetch("brokers").each_value do |broker_statistics| # Skip bootstrap nodes # Bootstrap nodes have nodeid -1, other nodes have positive # node ids next if broker_statistics["nodeid"] == -1 public_send( metric.type, metric.name, value: broker_statistics.dig(*metric.key_location), **base_tags.merge(broker: broker_statistics["nodename"]) ) end when :topics statistics.fetch("topics").each do |topic_name, topic_values| topic_values["partitions"].each do |partition_name, partition_statistics| next if partition_name == "-1" # Skip until lag info is available next if partition_statistics["consumer_lag"] == -1 next if partition_statistics["consumer_lag_stored"] == -1 # Skip if we do not own the fetch assignment next if partition_statistics["fetch_state"] == "stopped" next if partition_statistics["fetch_state"] == "none" public_send( metric.type, metric.name, value: partition_statistics.dig(*metric.key_location), **base_tags.merge({ topic: topic_name, partition: partition_name }) ) end end else raise ArgumentError, metric.scope end end