class Kafka::Client

def fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576, retries: 1)

Returns:
  • (Array) - the messages returned from the broker.

Parameters:
  • max_bytes (Integer) -- the maximum number of bytes to include in the
  • min_bytes (Integer) -- the minimum number of bytes to wait for. If set to
  • max_wait_time (Integer) -- the maximum amount of time to wait before
  • offset (Integer, Symbol) -- the offset to start reading from. Default is
  • partition (Integer) -- the partition that messages should be fetched from.
  • topic (String) -- the topic that messages should be fetched from.
def fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576, retries: 1)
  operation = FetchOperation.new(
    cluster: @cluster,
    logger: @logger,
    min_bytes: min_bytes,
    max_bytes: max_bytes,
    max_wait_time: max_wait_time,
  )
  operation.fetch_from_partition(topic, partition, offset: offset, max_bytes: max_bytes)
  attempt = 1
  begin
    operation.execute.flat_map {|batch| batch.messages }
  rescue Kafka::Error => e
    @cluster.mark_as_stale!
    if attempt >= (retries + 1)
      raise
    else
      attempt += 1
      @logger.warn "Error while fetching messages, #{e.class}: #{e.message}; retrying..."
      retry
    end
  end
end