class Karafka::Connection::Listener

a raw Kafka::FetchedMessage
@note Listener itself does nothing with the message - it will return to the block
@note It does not loop on itself - it needs to be executed in a loop
A single listener that listens to incoming messages from a single route

def call

Runs prefetch callbacks and executes the main listener fetch loop
def call
  Karafka::Callbacks.before_fetch_loop(
    @consumer_group,
    client
  )
  fetch_loop
end

def client

Returns:
  • (Karafka::Connection::Client) - wrapped kafka consuming client for a given topic
def client
  @client ||= Client.new(@consumer_group)
end

def fetch_loop

Other tags:
    Note: - We catch all the errors here, so they don't affect other listeners (or this one)
    Note: - This will yield with a raw message - no preprocessing or reformatting

Other tags:
    Yieldparam: kafka - fetched messages
    Yieldparam: consumer - group id
def fetch_loop
  client.fetch_loop do |raw_messages|
    # @note What happens here is a delegation of processing to a proper processor based
    #   on the incoming messages characteristics
    Karafka::Connection::Delegator.call(@consumer_group.id, raw_messages)
  end
  # This is on purpose - see the notes for this method
  # rubocop:disable RescueException
rescue Exception => e
  Karafka.monitor.instrument('connection.listener.fetch_loop.error', caller: self, error: e)
  # rubocop:enable RescueException
  @client&.stop
  retry if @client
end

def initialize(consumer_group)

Returns:
  • (Karafka::Connection::Listener) - listener instance

Parameters:
  • consumer_group (Karafka::Routing::ConsumerGroup) -- consumer group that holds details
def initialize(consumer_group)
  @consumer_group = consumer_group
end