module Karafka::Params::Dsl::ClassMethods

def build(message, parser)

Other tags:
    Example: Build params instance from a Kafka::FetchedMessage object -
    Example: Build params instance from a hash -

Returns:
  • (Karafka::Params::Params) - Karafka params object not yet used parser for

Parameters:
  • parser (Class) -- parser class that we will use to unparse data
  • message (Kafka::FetchedMessage, Hash) -- message that we get out of Kafka
def build(message, parser)
  instance = new
  instance['parser'] = parser
  # Non kafka fetched message can happen when we interchange data with an
  # additional backend
  if message.is_a?(Kafka::FetchedMessage)
    instance.send(
      :merge!,
      'value' => message.value,
      'partition' => message.partition,
      'offset' => message.offset,
      'key' => message.key,
      'create_time' => message.create_time,
      'receive_time' => Time.now,
      # When we get raw messages, they might have a topic, that was modified by a
      # topic mapper. We need to "reverse" this change and map back to the non-modified
      # format, so our internal flow is not corrupted with the mapping
      'topic' => Karafka::App.config.topic_mapper.incoming(message.topic)
    )
  else
    instance.send(:merge!, message)
  end
  instance
end