class Kafka::Consumer

def fetch_batches

def fetch_batches
  # Return early if the consumer has been stopped.
  return [] if !@running
  join_group unless @group.member?
  @heartbeat.send_if_necessary
  resume_paused_partitions!
  if !@fetcher.data?
    @logger.debug "No batches to process"
    sleep 2
    []
  else
    tag, message = @fetcher.poll
    case tag
    when :batches
      message
    when :exception
      raise message
    end
  end
rescue OffsetOutOfRange => e
  @logger.error "Invalid offset #{e.offset} for #{e.topic}/#{e.partition}, resetting to default offset"
  @offset_manager.seek_to_default(e.topic, e.partition)
  retry
rescue ConnectionError => e
  @logger.error "Connection error while fetching messages: #{e}"
  raise FetchError, e
end