module Karafka::AttributesMap

def api_adapter

Returns:
  • (Hash) - hash with proper sections on what to proxy where in Ruby-Kafka

Other tags:
    Note: - All other settings will be passed to Kafka.new method invocation.
def api_adapter
  {
    consumer: %i[
      session_timeout offset_commit_interval offset_commit_threshold
      offset_retention_time heartbeat_interval fetcher_max_queue_size
    ],
    subscribe: %i[start_from_beginning max_bytes_per_partition],
    consumption: %i[min_bytes max_bytes max_wait_time],
    pause: %i[pause_timeout],
    # All the options that are under kafka config namespace, but are not used
    # directly with kafka api, but from the Karafka user perspective, they are
    # still related to kafka. They should not be proxied anywhere
    ignored: %i[reconnect_timeout automatically_mark_as_consumed]
  }
end

def consumer_group

Other tags:
    Note: - Note that there are settings directly extracted from the config kafka namespace

Returns:
  • (Array) - properties that can be set on a per consumer group level
def consumer_group
  # @note We don't ignore the api_adapter[:ignored] values as they should be ignored
  #   only when proxying details go ruby-kafka. We use ignored fields internally in karafka
  ignored_settings = api_adapter[:subscribe]
  defined_settings = api_adapter.values.flatten
  karafka_settings = %i[batch_fetching]
  # This is a drity and bad hack of dry-configurable to get keys before setting values
  dynamically_proxied = Karafka::Setup::Config
                        ._settings
                        .find { |s| s.name == :kafka }
                        .value
                        .instance_variable_get('@klass').settings
  (defined_settings + dynamically_proxied).uniq + karafka_settings - ignored_settings
end

def topic

Returns:
  • (Array) - properties that can be set on a per topic level
def topic
  (api_adapter[:subscribe] + %i[
    backend
    name
    parser
    responder
    batch_consuming
    persistent
  ]).uniq
end