class Kafka::Client
def each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576, &block)
-
(nil)
-
Parameters:
-
max_bytes
(Integer
) -- the maximum number of bytes to include in the -
min_bytes
(Integer
) -- the minimum number of bytes to wait for. If set to -
max_wait_time
(Integer
) -- the maximum amount of time to wait before -
start_from_beginning
(Boolean
) -- whether to start from the beginning -
topic
(String
) -- the topic to consume messages from.
def each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576, &block) default_offset ||= start_from_beginning ? :earliest : :latest offsets = Hash.new { default_offset } loop do operation = FetchOperation.new( cluster: @cluster, logger: @logger, min_bytes: min_bytes, max_wait_time: max_wait_time, ) @cluster.partitions_for(topic).map(&:partition_id).each do |partition| partition_offset = offsets[partition] operation.fetch_from_partition(topic, partition, offset: partition_offset, max_bytes: max_bytes) end batches = operation.execute batches.each do |batch| batch.messages.each(&block) offsets[batch.partition] = batch.last_offset + 1 end end end