class Kafka::Consumer
def fetch_batches
def fetch_batches # Return early if the consumer has been stopped. return [] if !@running join_group unless @group.member? @heartbeat.send_if_necessary resume_paused_partitions! if !@fetcher.data? @logger.debug "No batches to process" sleep 2 [] else tag, message = @fetcher.poll case tag when :batches message when :exception raise message end end rescue OffsetOutOfRange => e @logger.error "Invalid offset #{e.offset} for #{e.topic}/#{e.partition}, resetting to default offset" @offset_manager.seek_to_default(e.topic, e.partition) retry rescue ConnectionError => e @logger.error "Connection error while fetching messages: #{e}" raise FetchError, e end