module Karafka::Connection::ApiAdapter
def client
- Note: - We return array, so we can inject any arguments we want, in case of changes in the
Returns:
-
(Array
- Array with all the client arguments including hash with all)
def client # This one is a default that takes all the settings except special # cases defined in the map settings = { logger: ::Karafka.logger, client_id: ::Karafka::App.config.client_id } kafka_configs.each do |setting_name, setting_value| # All options for config adapter should be ignored as we're just interested # in what is left, as we want to pass all the options that are "typical" # and not listed in the api_adapter special cases mapping. All the values # from the api_adapter mapping go somewhere else, not to the client directly next if AttributesMap.api_adapter.values.flatten.include?(setting_name) settings[setting_name] = setting_value end settings_hash = sanitize(settings) # Normalization for the way Kafka::Client accepts arguments from 0.5.3 [settings_hash.delete(:seed_brokers), settings_hash] end
def consumer(consumer_group)
-
(Array
- array with all the consumer arguments including hash with all)
Parameters:
-
consumer_group
(Karafka::Routing::ConsumerGroup
) -- consumer group details
def consumer(consumer_group) settings = { group_id: consumer_group.id } settings = fetch_for(:consumer, consumer_group, settings) [sanitize(settings)] end
def consumption(consumer_group)
-
(Array
- Array with all the arguments required by consuming method)
Parameters:
-
consumer_group
(Karafka::Routing::ConsumerGroup
) -- consumer group details
def consumption(consumer_group) [ sanitize( fetch_for( :consumption, consumer_group, automatically_mark_as_processed: consumer_group.automatically_mark_as_consumed ) ) ] end
def fetch_for(namespace_key, route_layer, preexisting_settings = {})
-
preexisting_settings
(Hash
) -- hash with some preexisting settings that might have -
route_layer
(Object
) -- route topic or consumer group -
namespace_key
(Symbol
) -- namespace from attributes map config adapter hash
def fetch_for(namespace_key, route_layer, preexisting_settings = {}) kafka_configs.each_key do |setting_name| # Ignore settings that are not related to our namespace next unless AttributesMap.api_adapter[namespace_key].include?(setting_name) # Ignore settings that are already initialized # In case they are in preexisting settings fetched differently next if preexisting_settings.keys.include?(setting_name) # Fetch all the settings from a given layer object. Objects can handle the fallback # to the kafka settings, so preexisting_settings[setting_name] = route_layer.send(setting_name) end preexisting_settings end
def kafka_configs
-
(Hash)
- Kafka config details as a hash
def kafka_configs ::Karafka::App.config.kafka.to_h end
def mark_message_as_processed(params)
- Note: - When default empty topic mapper is used, no need for any conversion as the
Returns:
-
(Array)
- array with all the details needed by ruby-kafka to mark message
Parameters:
-
params
(Karafka::Params::Params
) -- params instance
def mark_message_as_processed(params) # Majority of non heroku users don't use custom topic mappers. No need to change # anything when it is a default mapper that does not change anything return [params] if Karafka::App.config.topic_mapper == Karafka::Routing::TopicMapper # @note We don't use tap as it is around 13% slower than non-dup version dupped = params.dup dupped['topic'] = Karafka::App.config.topic_mapper.outgoing(params.topic) [dupped] end
def pause(topic, partition, consumer_group)
-
(Array)
- array with all the details required to pause kafka consumer
Parameters:
-
consumer_group
(Karafka::Routing::ConsumerGroup
) -- consumer group details -
partition
(Integer
) -- number partition that we want to pause -
topic
(String
) -- topic that we want to pause
def pause(topic, partition, consumer_group) [ Karafka::App.config.topic_mapper.outgoing(topic), partition, { timeout: consumer_group.pause_timeout } ] end
def sanitize(settings)
-
(Hash)
- settings without nil using keys (non of karafka options should be nil)
Parameters:
-
settings
(Hash
) -- settings that may contain nil values
def sanitize(settings) settings.reject { |_key, value| value.nil? } end
def subscribe(topic)
-
(Hash)
- hash with all the settings required by kafka consumer#subscribe method
Parameters:
-
topic
(Karafka::Routing::Topic
) -- topic that holds details for a given subscription
def subscribe(topic) settings = fetch_for(:subscribe, topic) [Karafka::App.config.topic_mapper.outgoing(topic.name), sanitize(settings)] end