class Karafka::BaseConsumer

Base consumer from which all Karafka consumers should inherit

def call

Executes the default consumer flow.
def call
  process
end

def client

Returns:
  • (Karafka::Connection::Client) - messages consuming client that can be used to
def client
  Persistence::Client.read
end

def consume

Other tags:
    Note: - This method needs bo be implemented in a subclass. We stub it here as a failover if
def consume
  raise NotImplementedError, 'Implement this in a subclass'
end

def params_batch=(messages)

Returns:
  • (Karafka::Params::ParamsBatch) - lazy loaded params batch

Parameters:
  • messages (Array, Array) -- messages with raw

Other tags:
    Note: - Until first params usage, it won't parse data at all
def params_batch=(messages)
  @params_batch = Karafka::Params::ParamsBatch.new(messages, topic.parser)
end

def topic

Returns:
  • (Karafka::Routing::Topic) - topic to which a given consumer is subscribed
def topic
  self.class.topic
end

def topic=(topic)

Returns:
  • (Karafka::Routing::Topic) - assigned topic

Parameters:
  • topic (Karafka::Routing::Topic) --
def topic=(topic)
  @topic = topic
  Consumers::Includer.call(self)
end