module Karafka::Connection::ApiAdapter

def client

Other tags:
    Note: - We return array, so we can inject any arguments we want, in case of changes in the

Returns:
  • (Array) - Array with all the client arguments including hash with all
def client
  # This one is a default that takes all the settings except special
  # cases defined in the map
  settings = {
    logger: ::Karafka.logger,
    client_id: ::Karafka::App.config.client_id
  }
  kafka_configs.each do |setting_name, setting_value|
    # All options for config adapter should be ignored as we're just interested
    # in what is left, as we want to pass all the options that are "typical"
    # and not listed in the api_adapter special cases mapping. All the values
    # from the api_adapter mapping go somewhere else, not to the client directly
    next if AttributesMap.api_adapter.values.flatten.include?(setting_name)
    settings[setting_name] = setting_value
  end
  settings_hash = sanitize(settings)
  # Normalization for the way Kafka::Client accepts arguments from  0.5.3
  [settings_hash.delete(:seed_brokers), settings_hash]
end

def consumer(consumer_group)

Returns:
  • (Array) - array with all the consumer arguments including hash with all

Parameters:
  • consumer_group (Karafka::Routing::ConsumerGroup) -- consumer group details
def consumer(consumer_group)
  settings = { group_id: consumer_group.id }
  settings = fetch_for(:consumer, consumer_group, settings)
  [sanitize(settings)]
end

def consumption(consumer_group)

Returns:
  • (Array) - Array with all the arguments required by consuming method

Parameters:
  • consumer_group (Karafka::Routing::ConsumerGroup) -- consumer group details
def consumption(consumer_group)
  [
    sanitize(
      fetch_for(
        :consumption,
        consumer_group,
        automatically_mark_as_processed: consumer_group.automatically_mark_as_consumed
      )
    )
  ]
end

def fetch_for(namespace_key, route_layer, preexisting_settings = {})

Parameters:
  • preexisting_settings (Hash) -- hash with some preexisting settings that might have
  • route_layer (Object) -- route topic or consumer group
  • namespace_key (Symbol) -- namespace from attributes map config adapter hash
def fetch_for(namespace_key, route_layer, preexisting_settings = {})
  kafka_configs.each_key do |setting_name|
    # Ignore settings that are not related to our namespace
    next unless AttributesMap.api_adapter[namespace_key].include?(setting_name)
    # Ignore settings that are already initialized
    # In case they are in preexisting settings fetched differently
    next if preexisting_settings.keys.include?(setting_name)
    # Fetch all the settings from a given layer object. Objects can handle the fallback
    # to the kafka settings, so
    preexisting_settings[setting_name] = route_layer.send(setting_name)
  end
  preexisting_settings
end

def kafka_configs

Returns:
  • (Hash) - Kafka config details as a hash
def kafka_configs
  ::Karafka::App.config.kafka.to_h
end

def mark_message_as_processed(params)

Other tags:
    Note: - When default empty topic mapper is used, no need for any conversion as the

Returns:
  • (Array) - array with all the details needed by ruby-kafka to mark message

Parameters:
  • params (Karafka::Params::Params) -- params instance
def mark_message_as_processed(params)
  # Majority of non heroku users don't use custom topic mappers. No need to change
  # anything when it is a default mapper that does not change anything
  return [params] if Karafka::App.config.topic_mapper == Karafka::Routing::TopicMapper
  # @note We don't use tap as it is around 13% slower than non-dup version
  dupped = params.dup
  dupped['topic'] = Karafka::App.config.topic_mapper.outgoing(params.topic)
  [dupped]
end

def pause(topic, partition, consumer_group)

Returns:
  • (Array) - array with all the details required to pause kafka consumer

Parameters:
  • consumer_group (Karafka::Routing::ConsumerGroup) -- consumer group details
  • partition (Integer) -- number partition that we want to pause
  • topic (String) -- topic that we want to pause
def pause(topic, partition, consumer_group)
  [
    Karafka::App.config.topic_mapper.outgoing(topic),
    partition,
    { timeout: consumer_group.pause_timeout }
  ]
end

def sanitize(settings)

Returns:
  • (Hash) - settings without nil using keys (non of karafka options should be nil)

Parameters:
  • settings (Hash) -- settings that may contain nil values
def sanitize(settings)
  settings.reject { |_key, value| value.nil? }
end

def subscribe(topic)

Returns:
  • (Hash) - hash with all the settings required by kafka consumer#subscribe method

Parameters:
  • topic (Karafka::Routing::Topic) -- topic that holds details for a given subscription
def subscribe(topic)
  settings = fetch_for(:subscribe, topic)
  [Karafka::App.config.topic_mapper.outgoing(topic.name), sanitize(settings)]
end