module Karafka::AttributesMap
def api_adapter
-
(Hash)- hash with proper sections on what to proxy where in Ruby-Kafka
Other tags:
- Note: - All other settings will be passed to Kafka.new method invocation.
def api_adapter { consumer: %i[ session_timeout offset_commit_interval offset_commit_threshold offset_retention_time heartbeat_interval fetcher_max_queue_size ], subscribe: %i[start_from_beginning max_bytes_per_partition], consumption: %i[min_bytes max_bytes max_wait_time], pause: %i[pause_timeout], # All the options that are under kafka config namespace, but are not used # directly with kafka api, but from the Karafka user perspective, they are # still related to kafka. They should not be proxied anywhere ignored: %i[reconnect_timeout automatically_mark_as_consumed] } end
def consumer_group
- Note: - Note that there are settings directly extracted from the config kafka namespace
Returns:
-
(Array- properties that can be set on a per consumer group level)
def consumer_group # @note We don't ignore the api_adapter[:ignored] values as they should be ignored # only when proxying details go ruby-kafka. We use ignored fields internally in karafka ignored_settings = api_adapter[:subscribe] defined_settings = api_adapter.values.flatten karafka_settings = %i[batch_fetching] # This is a drity and bad hack of dry-configurable to get keys before setting values dynamically_proxied = Karafka::Setup::Config ._settings .find { |s| s.name == :kafka } .value .instance_variable_get('@klass').settings (defined_settings + dynamically_proxied).uniq + karafka_settings - ignored_settings end
def topic
-
(Array- properties that can be set on a per topic level)
def topic (api_adapter[:subscribe] + %i[ backend name parser responder batch_consuming persistent ]).uniq end