class Karafka::Connection::Listener
a raw Kafka::FetchedMessage
@note Listener itself does nothing with the message - it will return to the block
@note It does not loop on itself - it needs to be executed in a loop
A single listener that listens to incoming messages from a single route
def call
def call Karafka::Callbacks.before_fetch_loop( @consumer_group, client ) fetch_loop end
def client
-
(Karafka::Connection::Client)
- wrapped kafka consuming client for a given topic
def client @client ||= Client.new(@consumer_group) end
def fetch_loop
- Note: - We catch all the errors here, so they don't affect other listeners (or this one)
Note: - This will yield with a raw message - no preprocessing or reformatting
Other tags:
- Yieldparam: kafka - fetched messages
Yieldparam: consumer - group id
def fetch_loop client.fetch_loop do |raw_messages| # @note What happens here is a delegation of processing to a proper processor based # on the incoming messages characteristics Karafka::Connection::Delegator.call(@consumer_group.id, raw_messages) end # This is on purpose - see the notes for this method # rubocop:disable RescueException rescue Exception => e Karafka.monitor.instrument('connection.listener.fetch_loop.error', caller: self, error: e) # rubocop:enable RescueException @client&.stop retry if @client end
def initialize(consumer_group)
-
(Karafka::Connection::Listener)
- listener instance
Parameters:
-
consumer_group
(Karafka::Routing::ConsumerGroup
) -- consumer group that holds details
def initialize(consumer_group) @consumer_group = consumer_group end