module Karafka::Patches::RubyKafka

def consumer_loop

thread)
over the stopping process that we need (since we're the once that initiate it for each
have to take care of. That way, nothing like that ever happens but we get the control
We don't wan't to use poll ruby-kafka api as it brings many more problems that we would
ruby-kafka fetch loop does not allow that directly
stop, so we can perform stop commit or anything else that we need since
This patch allows us to inject business logic in between fetches and before the consumer
def consumer_loop
  super do
    consumers = Karafka::Persistence::Consumer
                .all
                .values
                .flat_map(&:values)
                .select { |ctrl| ctrl.respond_to?(:run_callbacks) }
    if Karafka::App.stopped?
      consumers.each { |ctrl| ctrl.run_callbacks :before_stop }
      Karafka::Persistence::Client.read.stop
    else
      consumers.each { |ctrl| ctrl.run_callbacks :before_poll }
      yield
      consumers.each { |ctrl| ctrl.run_callbacks :after_poll }
    end
  end
end