class Karafka::Persistence::Consumer
for a given topic is turned on
topic and partition to store some additional details when the persistent mode
Module used to provide a persistent cache across batch requests for a given
def all
-
(Hash)
- current thread persistence scope hash with all the consumers
def all # @note This does not need to be threadsafe (Hash) as it is always executed in a # current thread context Thread.current[PERSISTENCE_SCOPE] ||= Hash.new { |hash, key| hash[key] = {} } end
def fetch(topic, partition)
-
partition
(Integer
) -- number of partition for which we want to cache -
topic
(Karafka::Routing::Topic
) -- topic instance for which we might cache
Returns:
-
(Karafka::BaseConsumer)
- base consumer descendant
def fetch(topic, partition) # We always store a current instance for callback reasons if topic.persistent all[topic][partition] ||= topic.consumer.new else all[topic][partition] = topic.consumer.new end end