class Karafka::Connection::Client
def kafka_consumer
-
(Kafka::Consumer)
- returns a ready to consume Kafka consumer
def kafka_consumer # @note We don't cache the connection internally because we cache kafka_consumer that uses # kafka client object instance @kafka_consumer ||= Builder.call.consumer( *ApiAdapter.consumer(consumer_group) ).tap do |consumer| consumer_group.topics.each do |topic| consumer.subscribe(*ApiAdapter.subscribe(topic)) end end rescue Kafka::ConnectionError # If we would not wait it would totally spam log file with failed # attempts if Kafka is down sleep(consumer_group.reconnect_timeout) # We don't log and just reraise - this will be logged # down the road raise end