class Kafka::Statsd::AsyncProducerSubscriber
def buffer_overflow(event)
def buffer_overflow(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) increment("async_producer.#{client}.#{topic}.produce.errors") end
def drop_messages(event)
def drop_messages(event) client = event.payload.fetch(:client_id) message_count = event.payload.fetch(:message_count) count("async_producer.#{client}.dropped_messages", message_count) end
def enqueue_message(event)
def enqueue_message(event) client = event.payload.fetch(:client_id) topic = event.payload.fetch(:topic) queue_size = event.payload.fetch(:queue_size) max_queue_size = event.payload.fetch(:max_queue_size) queue_fill_ratio = queue_size.to_f / max_queue_size.to_f # This gets us the avg/max queue size per producer. timing("async_producer.#{client}.#{topic}.queue.size", queue_size) # This gets us the avg/max queue fill ratio per producer. timing("async_producer.#{client}.#{topic}.queue.fill_ratio", queue_fill_ratio) end