class Karafka::Persistence::Topic

message / message batches received
We use it in order not to build string instances and remap incoming topic upon each
Local cache for routing topics

def self.fetch(group_id, raw_topic_name)

Returns:
  • (Karafka::Routing::Topic) - remapped topic representation that can be used further

Parameters:
  • raw_topic_name (String) -- raw topic name (before remapping) for which we fetch a
  • group_id (String) -- group id for which we fetch a topic representation
def self.fetch(group_id, raw_topic_name)
  Thread.current[PERSISTENCE_SCOPE] ||= Hash.new { |hash, key| hash[key] = {} }
  Thread.current[PERSISTENCE_SCOPE][group_id][raw_topic_name] ||= begin
    # We map from incoming topic name, as it might be namespaced, etc.
    # @see topic_mapper internal docs
    mapped_topic_name = Karafka::App.config.topic_mapper.incoming(raw_topic_name)
    Routing::Router.find("#{group_id}_#{mapped_topic_name}")
  end
end