class Karafka::Fetcher

topics, which means that if there are no partitions, it won’t use them.
@note Creating multiple fetchers will result in having multiple connections to the same
Class used to run the Karafka consumer and handle shutting down, restarting etc

def call

so we don't have to terminate them
Fetch loop should never end, which means that we won't create more actor clusters
Starts listening on all the listeners asynchronously
def call
  threads = listeners.map do |listener|
    # We abort on exception because there should be an exception handling developed for
    # each listener running in separate threads, so the exceptions should never leak
    # and if that happens, it means that something really bad happened and we should stop
    # the whole process
    Thread
      .new { listener.call }
      .tap { |thread| thread.abort_on_exception = true }
  end
  # We aggregate threads here for a supervised shutdown process
  threads.each { |thread| Karafka::Server.consumer_threads << thread }
  threads.each(&:join)
# If anything crashes here, we need to raise the error and crush the runner because it means
# that something terrible happened
rescue StandardError => e
  Karafka.monitor.instrument('fetcher.call.error', caller: self, error: e)
  Karafka::App.stop!
  raise e
end

def listeners

Returns:
  • (Array) - listeners that will consume messages
def listeners
  @listeners ||= App.consumer_groups.active.map do |consumer_group|
    Karafka::Connection::Listener.new(consumer_group)
  end
end