class Kafka::Statsd::ConsumerSubscriber

def process_batch(event)

def process_batch(event)
  lag = event.payload.fetch(:offset_lag)
  messages = event.payload.fetch(:message_count)
  client = event.payload.fetch(:client_id)
  group_id = event.payload.fetch(:group_id)
  topic = event.payload.fetch(:topic)
  partition = event.payload.fetch(:partition)
  if event.payload.key?(:exception)
    increment("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_batch.errors")
  else
    timing("consumer.#{client}.#{group_id}.#{topic}.#{partition}.process_batch.latency", event.duration)
    count("consumer.#{client}.#{group_id}.#{topic}.#{partition}.messages", messages)
  end
  gauge("consumer.#{client}.#{group_id}.#{topic}.#{partition}.lag", lag)
end