class Kafka::Consumer

def consumer_loop

def consumer_loop
  @running = true
  @fetcher.start
  while @running
    begin
      @instrumenter.instrument("loop.consumer") do
        yield
      end
    rescue HeartbeatError, OffsetCommitError
      join_group
    rescue RebalanceInProgress
      @logger.warn "Group rebalance in progress, re-joining..."
      join_group
    rescue FetchError, NotLeaderForPartition, UnknownTopicOrPartition
      @cluster.mark_as_stale!
    rescue LeaderNotAvailable => e
      @logger.error "Leader not available; waiting 1s before retrying"
      @cluster.mark_as_stale!
      sleep 1
    rescue ConnectionError => e
      @logger.error "Connection error #{e.class}: #{e.message}"
      @cluster.mark_as_stale!
    rescue SignalException => e
      @logger.warn "Received signal #{e.message}, shutting down"
      @running = false
    end
  end
ensure
  @fetcher.stop
  # In order to quickly have the consumer group re-balance itself, it's
  # important that members explicitly tell Kafka when they're leaving.
  make_final_offsets_commit!
  @group.leave rescue nil
  @running = false
end