class Kafka::Consumer
def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true)
-
(nil)
-
Other tags:
- Yieldparam: batch - a message batch fetched from Kafka.
Parameters:
-
automatically_mark_as_processed
(Boolean
) -- whether to automatically -
max_wait_time
(Integer, Float
) -- the maximum duration of time to wait before -
max_bytes
(Integer
) -- the maximum number of bytes to read before -
min_bytes
(Integer
) -- the minimum number of bytes to read before
def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true) @fetcher.configure( min_bytes: min_bytes, max_bytes: max_bytes, max_wait_time: max_wait_time, ) consumer_loop do batches = fetch_batches batches.each do |batch| unless batch.empty? notification = { topic: batch.topic, partition: batch.partition, last_offset: batch.last_offset, offset_lag: batch.offset_lag, highwater_mark_offset: batch.highwater_mark_offset, message_count: batch.messages.count, } # Instrument an event immediately so that subscribers don't have to wait until # the block is completed. @instrumenter.instrument("start_process_batch.consumer", notification) @instrumenter.instrument("process_batch.consumer", notification) do begin yield batch @current_offsets[batch.topic][batch.partition] = batch.last_offset rescue => e offset_range = (batch.first_offset..batch.last_offset) location = "#{batch.topic}/#{batch.partition} in offset range #{offset_range}" backtrace = e.backtrace.join("\n") @logger.error "Exception raised when processing #{location} -- #{e.class}: #{e}\n#{backtrace}" raise ProcessingError.new(batch.topic, batch.partition, offset_range) end end mark_message_as_processed(batch.messages.last) if automatically_mark_as_processed # We've successfully processed a batch from the partition, so we can clear # the pause. pause_for(batch.topic, batch.partition).reset! end @offset_manager.commit_offsets_if_necessary @heartbeat.send_if_necessary return if !@running end # We may not have received any messages, but it's still a good idea to # commit offsets if we've processed messages in the last set of batches. # This also ensures the offsets are retained if we haven't read any messages # since the offset retention period has elapsed. @offset_manager.commit_offsets_if_necessary end end