class Kafka::Statsd::ConsumerSubscriber
def join_group(event)
def join_group(event) client = event.payload.fetch(:client_id) group_id = event.payload.fetch(:group_id) timing("consumer.#{client}.#{group_id}.join_group", event.duration) if event.payload.key?(:exception) increment("consumer.#{client}.#{group_id}.join_group.errors") end end
def leave_group(event)
def leave_group(event) client = event.payload.fetch(:client_id) group_id = event.payload.fetch(:group_id) timing("consumer.#{client}.#{group_id}.leave_group", event.duration) if event.payload.key?(:exception) increment("consumer.#{client}.#{group_id}.leave_group.errors") end end
def pause_status(event)
def pause_status(event) client = event.payload.fetch(:client_id) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition) duration = event.payload.fetch(:duration) gauge("consumer.#{client}.#{group_id}.#{topic}.#{partition}.pause.duration", duration) end
def process_batch(event)
def process_batch(event) lag = event.payload.fetch(:offset_lag) messages = event.payload.fetch(:message_count) client = event.payload.fetch(:client_id) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition) if event.payload.key?(:exception) increment("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_batch.errors") else timing("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_batch.latency", event.duration) count("consumer.#{client}.#{group_id}.#{topic}.#{partition}.messages", messages) end gauge("consumer.#{client}.#{group_id}.#{topic}.#{partition}.lag", lag) end
def process_message(event)
def process_message(event) offset_lag = event.payload.fetch(:offset_lag) create_time = event.payload.fetch(:create_time) client = event.payload.fetch(:client_id) group_id = event.payload.fetch(:group_id) topic = event.payload.fetch(:topic) partition = event.payload.fetch(:partition) time_lag = create_time && ((Time.now - create_time) * 1000).to_i if event.payload.key?(:exception) increment("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_message.errors") else timing("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_message.latency", event.duration) increment("consumer.#{client}.#{group_id}.#{topic}.#{partition}.messages") end gauge("consumer.#{client}.#{group_id}.#{topic}.#{partition}.lag", offset_lag) # Not all messages have timestamps. if time_lag gauge("consumer.#{client}.#{group_id}.#{topic}.#{partition}.time_lag", time_lag) end end
def sync_group(event)
def sync_group(event) client = event.payload.fetch(:client_id) group_id = event.payload.fetch(:group_id) timing("consumer.#{client}.#{group_id}.sync_group", event.duration) if event.payload.key?(:exception) increment("consumer.#{client}.#{group_id}.sync_group.errors") end end