class Kafka::Consumer
def consumer_loop
def consumer_loop @running = true @fetcher.start while @running begin @instrumenter.instrument("loop.consumer") do yield end rescue HeartbeatError, OffsetCommitError join_group rescue RebalanceInProgress @logger.warn "Group rebalance in progress, re-joining..." join_group rescue FetchError, NotLeaderForPartition, UnknownTopicOrPartition @cluster.mark_as_stale! rescue LeaderNotAvailable => e @logger.error "Leader not available; waiting 1s before retrying" @cluster.mark_as_stale! sleep 1 rescue ConnectionError => e @logger.error "Connection error #{e.class}: #{e.message}" @cluster.mark_as_stale! rescue SignalException => e @logger.warn "Received signal #{e.message}, shutting down" @running = false end end ensure @fetcher.stop # In order to quickly have the consumer group re-balance itself, it's # important that members explicitly tell Kafka when they're leaving. make_final_offsets_commit! @group.leave rescue nil @running = false end