module Karafka::Params::Dsl::ClassMethods
def build(message, parser)
- Example: Build params instance from a Kafka::FetchedMessage object -
Example: Build params instance from a hash -
Returns:
-
(Karafka::Params::Params)
- Karafka params object not yet used parser for
Parameters:
-
parser
(Class
) -- parser class that we will use to unparse data -
message
(Kafka::FetchedMessage, Hash
) -- message that we get out of Kafka
def build(message, parser) instance = new instance['parser'] = parser # Non kafka fetched message can happen when we interchange data with an # additional backend if message.is_a?(Kafka::FetchedMessage) instance.send( :merge!, 'value' => message.value, 'partition' => message.partition, 'offset' => message.offset, 'key' => message.key, 'create_time' => message.create_time, 'receive_time' => Time.now, # When we get raw messages, they might have a topic, that was modified by a # topic mapper. We need to "reverse" this change and map back to the non-modified # format, so our internal flow is not corrupted with the mapping 'topic' => Karafka::App.config.topic_mapper.incoming(message.topic) ) else instance.send(:merge!, message) end instance end