class Karafka::Persistence::Topic
message / message batches received
We use it in order not to build string instances and remap incoming topic upon each
Local cache for routing topics
def self.fetch(group_id, raw_topic_name)
-
(Karafka::Routing::Topic)
- remapped topic representation that can be used further
Parameters:
-
raw_topic_name
(String
) -- raw topic name (before remapping) for which we fetch a -
group_id
(String
) -- group id for which we fetch a topic representation
def self.fetch(group_id, raw_topic_name) Thread.current[PERSISTENCE_SCOPE] ||= Hash.new { |hash, key| hash[key] = {} } Thread.current[PERSISTENCE_SCOPE][group_id][raw_topic_name] ||= begin # We map from incoming topic name, as it might be namespaced, etc. # @see topic_mapper internal docs mapped_topic_name = Karafka::App.config.topic_mapper.incoming(raw_topic_name) Routing::Router.find("#{group_id}_#{mapped_topic_name}") end end