class Karafka::Connection::Client

features that we provide/might provide in future and to hide the internal implementation
Class used as a wrapper around Ruby-Kafka client to simplify additional

def fetch_loop

Other tags:
    Note: - This will yield with raw messages - no preprocessing or reformatting.

Other tags:
    Yieldparam: kafka - fetched messages
def fetch_loop
  settings = ApiAdapter.consumption(consumer_group)
  if consumer_group.batch_fetching
    kafka_consumer.each_batch(*settings) { |batch| yield(batch.messages) }
  else
    # always yield an array of messages, so we have consistent API (always a batch)
    kafka_consumer.each_message(*settings) { |message| yield([message]) }
  end
rescue Kafka::ProcessingError => error
  # If there was an error during consumption, we have to log it, pause current partition
  # and process other things
  Karafka.monitor.instrument(
    'connection.client.fetch_loop.error',
    caller: self,
    error: error.cause
  )
  pause(error.topic, error.partition)
  retry
  # This is on purpose - see the notes for this method
  # rubocop:disable RescueException
rescue Exception => error
  # rubocop:enable RescueException
  Karafka.monitor.instrument(
    'connection.client.fetch_loop.error',
    caller: self,
    error: error
  )
  retry
end

def initialize(consumer_group)

Returns:
  • (Karafka::Connection::Client) - group consumer that can subscribe to

Parameters:
  • consumer_group (Karafka::Routing::ConsumerGroup) -- consumer group for which
def initialize(consumer_group)
  @consumer_group = consumer_group
  Persistence::Client.write(self)
end

def kafka_consumer

Returns:
  • (Kafka::Consumer) - returns a ready to consume Kafka consumer
def kafka_consumer
  # @note We don't cache the connection internally because we cache kafka_consumer that uses
  #   kafka client object instance
  @kafka_consumer ||= Builder.call.consumer(
    *ApiAdapter.consumer(consumer_group)
  ).tap do |consumer|
    consumer_group.topics.each do |topic|
      consumer.subscribe(*ApiAdapter.subscribe(topic))
    end
  end
rescue Kafka::ConnectionError
  # If we would not wait it would totally spam log file with failed
  # attempts if Kafka is down
  sleep(consumer_group.reconnect_timeout)
  # We don't log and just reraise - this will be logged
  # down the road
  raise
end

def mark_as_consumed(params)

Parameters:
  • params (Karafka::Params::Params) -- message that we want to mark as processed

Other tags:
    Note: - In opposite to ruby-kafka, we commit the offset for each manual marking to be sure
def mark_as_consumed(params)
  kafka_consumer.mark_message_as_processed(
    *ApiAdapter.mark_message_as_processed(params)
  )
  # Trigger an immediate, blocking offset commit in order to minimize the risk of crashing
  # before the automatic triggers have kicked in.
  kafka_consumer.commit_offsets
end

def pause(topic, partition)

Parameters:
  • partition (Integer) -- number partition that we want to pause
  • topic (String) -- topic that we want to pause
def pause(topic, partition)
  kafka_consumer.pause(*ApiAdapter.pause(topic, partition, consumer_group))
end

def stop

Other tags:
    Note: - Stopping running consumers without a really important reason is not recommended
def stop
  @kafka_consumer&.stop
  @kafka_consumer = nil
end