class Karafka::Connection::Client
features that we provide/might provide in future and to hide the internal implementation
Class used as a wrapper around Ruby-Kafka client to simplify additional
def fetch_loop
- Note: - This will yield with raw messages - no preprocessing or reformatting.
Other tags:
- Yieldparam: kafka - fetched messages
def fetch_loop settings = ApiAdapter.consumption(consumer_group) if consumer_group.batch_fetching kafka_consumer.each_batch(*settings) { |batch| yield(batch.messages) } else # always yield an array of messages, so we have consistent API (always a batch) kafka_consumer.each_message(*settings) { |message| yield([message]) } end rescue Kafka::ProcessingError => error # If there was an error during consumption, we have to log it, pause current partition # and process other things Karafka.monitor.instrument( 'connection.client.fetch_loop.error', caller: self, error: error.cause ) pause(error.topic, error.partition) retry # This is on purpose - see the notes for this method # rubocop:disable RescueException rescue Exception => error # rubocop:enable RescueException Karafka.monitor.instrument( 'connection.client.fetch_loop.error', caller: self, error: error ) retry end
def initialize(consumer_group)
-
(Karafka::Connection::Client)
- group consumer that can subscribe to
Parameters:
-
consumer_group
(Karafka::Routing::ConsumerGroup
) -- consumer group for which
def initialize(consumer_group) @consumer_group = consumer_group Persistence::Client.write(self) end
def kafka_consumer
-
(Kafka::Consumer)
- returns a ready to consume Kafka consumer
def kafka_consumer # @note We don't cache the connection internally because we cache kafka_consumer that uses # kafka client object instance @kafka_consumer ||= Builder.call.consumer( *ApiAdapter.consumer(consumer_group) ).tap do |consumer| consumer_group.topics.each do |topic| consumer.subscribe(*ApiAdapter.subscribe(topic)) end end rescue Kafka::ConnectionError # If we would not wait it would totally spam log file with failed # attempts if Kafka is down sleep(consumer_group.reconnect_timeout) # We don't log and just reraise - this will be logged # down the road raise end
def mark_as_consumed(params)
-
params
(Karafka::Params::Params
) -- message that we want to mark as processed
Other tags:
- Note: - In opposite to ruby-kafka, we commit the offset for each manual marking to be sure
def mark_as_consumed(params) kafka_consumer.mark_message_as_processed( *ApiAdapter.mark_message_as_processed(params) ) # Trigger an immediate, blocking offset commit in order to minimize the risk of crashing # before the automatic triggers have kicked in. kafka_consumer.commit_offsets end
def pause(topic, partition)
-
partition
(Integer
) -- number partition that we want to pause -
topic
(String
) -- topic that we want to pause
def pause(topic, partition) kafka_consumer.pause(*ApiAdapter.pause(topic, partition, consumer_group)) end
def stop
- Note: - Stopping running consumers without a really important reason is not recommended
def stop @kafka_consumer&.stop @kafka_consumer = nil end