class Kafka::Statsd::ConsumerSubscriber
def process_batch(event)
def process_batch(event) lag = event.payload.fetch(:offset_lag) messages = event.payload.fetch(:message_count) client = event.payload.fetch(:client_id) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition) if event.payload.key?(:exception) increment("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_batch.errors") else timing("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_batch.latency", event.duration) count("consumer.#{client}.#{group_id}.#{topic}.#{partition}.messages", messages) end gauge("consumer.#{client}.#{group_id}.#{topic}.#{partition}.lag", lag) end