class Karafka::Persistence::Consumer

for a given topic is turned on
topic and partition to store some additional details when the persistent mode
Module used to provide a persistent cache across batch requests for a given

def all

Returns:
  • (Hash) - current thread persistence scope hash with all the consumers
def all
  # @note This does not need to be threadsafe (Hash) as it is always executed in a
  # current thread context
  Thread.current[PERSISTENCE_SCOPE] ||= Hash.new { |hash, key| hash[key] = {} }
end

def fetch(topic, partition)

Parameters:
  • partition (Integer) -- number of partition for which we want to cache
  • topic (Karafka::Routing::Topic) -- topic instance for which we might cache

Returns:
  • (Karafka::BaseConsumer) - base consumer descendant
def fetch(topic, partition)
  # We always store a current instance for callback reasons
  if topic.persistent
    all[topic][partition] ||= topic.consumer.new
  else
    all[topic][partition] = topic.consumer.new
  end
end