class Karafka::Connection::Client

def kafka_consumer

Returns:
  • (Kafka::Consumer) - returns a ready to consume Kafka consumer
def kafka_consumer
  # @note We don't cache the connection internally because we cache kafka_consumer that uses
  #   kafka client object instance
  @kafka_consumer ||= Builder.call.consumer(
    *ApiAdapter.consumer(consumer_group)
  ).tap do |consumer|
    consumer_group.topics.each do |topic|
      consumer.subscribe(*ApiAdapter.subscribe(topic))
    end
  end
rescue Kafka::ConnectionError
  # If we would not wait it would totally spam log file with failed
  # attempts if Kafka is down
  sleep(consumer_group.reconnect_timeout)
  # We don't log and just reraise - this will be logged
  # down the road
  raise
end