class Kafka::Client

def each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576, &block)

Returns:
  • (nil) -

Parameters:
  • max_bytes (Integer) -- the maximum number of bytes to include in the
  • min_bytes (Integer) -- the minimum number of bytes to wait for. If set to
  • max_wait_time (Integer) -- the maximum amount of time to wait before
  • start_from_beginning (Boolean) -- whether to start from the beginning
  • topic (String) -- the topic to consume messages from.
def each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576, &block)
  default_offset ||= start_from_beginning ? :earliest : :latest
  offsets = Hash.new { default_offset }
  loop do
    operation = FetchOperation.new(
      cluster: @cluster,
      logger: @logger,
      min_bytes: min_bytes,
      max_wait_time: max_wait_time,
    )
    @cluster.partitions_for(topic).map(&:partition_id).each do |partition|
      partition_offset = offsets[partition]
      operation.fetch_from_partition(topic, partition, offset: partition_offset, max_bytes: max_bytes)
    end
    batches = operation.execute
    batches.each do |batch|
      batch.messages.each(&block)
      offsets[batch.partition] = batch.last_offset + 1
    end
  end
end