class Kafka::Consumer

def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true)

Returns:
  • (nil) -

Other tags:
    Yieldparam: batch - a message batch fetched from Kafka.

Parameters:
  • automatically_mark_as_processed (Boolean) -- whether to automatically
  • max_wait_time (Integer, Float) -- the maximum duration of time to wait before
  • max_bytes (Integer) -- the maximum number of bytes to read before
  • min_bytes (Integer) -- the minimum number of bytes to read before
def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true)
  @fetcher.configure(
    min_bytes: min_bytes,
    max_bytes: max_bytes,
    max_wait_time: max_wait_time,
  )
  consumer_loop do
    batches = fetch_batches
    batches.each do |batch|
      unless batch.empty?
        notification = {
          topic: batch.topic,
          partition: batch.partition,
          last_offset: batch.last_offset,
          offset_lag: batch.offset_lag,
          highwater_mark_offset: batch.highwater_mark_offset,
          message_count: batch.messages.count,
        }
        # Instrument an event immediately so that subscribers don't have to wait until
        # the block is completed.
        @instrumenter.instrument("start_process_batch.consumer", notification)
        @instrumenter.instrument("process_batch.consumer", notification) do
          begin
            yield batch
            @current_offsets[batch.topic][batch.partition] = batch.last_offset
          rescue => e
            offset_range = (batch.first_offset..batch.last_offset)
            location = "#{batch.topic}/#{batch.partition} in offset range #{offset_range}"
            backtrace = e.backtrace.join("\n")
            @logger.error "Exception raised when processing #{location} -- #{e.class}: #{e}\n#{backtrace}"
            raise ProcessingError.new(batch.topic, batch.partition, offset_range)
          end
        end
        mark_message_as_processed(batch.messages.last) if automatically_mark_as_processed
        # We've successfully processed a batch from the partition, so we can clear
        # the pause.
        pause_for(batch.topic, batch.partition).reset!
      end
      @offset_manager.commit_offsets_if_necessary
      @heartbeat.send_if_necessary
      return if !@running
    end
    # We may not have received any messages, but it's still a good idea to
    # commit offsets if we've processed messages in the last set of batches.
    # This also ensures the offsets are retained if we haven't read any messages
    # since the offset retention period has elapsed.
    @offset_manager.commit_offsets_if_necessary
  end
end