class Kafka::Datadog::ConsumerSubscriber

def join_group(event)

def join_group(event)
  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
  }
  timing("consumer.join_group", event.duration, tags: tags)
  if event.payload.key?(:exception)
    increment("consumer.join_group.errors", tags: tags)
  end
end

def leave_group(event)

def leave_group(event)
  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
  }
  timing("consumer.leave_group", event.duration, tags: tags)
  if event.payload.key?(:exception)
    increment("consumer.leave_group.errors", tags: tags)
  end
end

def loop(event)

def loop(event)
  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
  }
  histogram("consumer.loop.duration", event.duration, tags: tags)
end

def pause_status(event)

def pause_status(event)
  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition),
  }
  duration = event.payload.fetch(:duration)
  gauge("consumer.pause.duration", duration, tags: tags)
end

def process_batch(event)

def process_batch(event)
  offset = event.payload.fetch(:last_offset)
  lag = event.payload.fetch(:offset_lag)
  messages = event.payload.fetch(:message_count)
  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition),
  }
  if event.payload.key?(:exception)
    increment("consumer.process_batch.errors", tags: tags)
  else
    timing("consumer.process_batch.latency", event.duration, tags: tags)
    count("consumer.messages", messages, tags: tags)
  end
  gauge("consumer.offset", offset, tags: tags)
  gauge("consumer.lag", lag, tags: tags)
end

def process_message(event)

def process_message(event)
  offset = event.payload.fetch(:offset)
  offset_lag = event.payload.fetch(:offset_lag)
  create_time = event.payload.fetch(:create_time)
  time_lag = create_time && ((Time.now - create_time) * 1000).to_i
  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
    topic: event.payload.fetch(:topic),
    partition: event.payload.fetch(:partition),
  }
  if event.payload.key?(:exception)
    increment("consumer.process_message.errors", tags: tags)
  else
    timing("consumer.process_message.latency", event.duration, tags: tags)
    increment("consumer.messages", tags: tags)
  end
  gauge("consumer.offset", offset, tags: tags)
  gauge("consumer.lag", offset_lag, tags: tags)
  # Not all messages have timestamps.
  if time_lag
    gauge("consumer.time_lag", time_lag, tags: tags)
  end
end

def sync_group(event)

def sync_group(event)
  tags = {
    client: event.payload.fetch(:client_id),
    group_id: event.payload.fetch(:group_id),
  }
  timing("consumer.sync_group", event.duration, tags: tags)
  if event.payload.key?(:exception)
    increment("consumer.sync_group.errors", tags: tags)
  end
end