class Kafka::Protocol::ProduceRequest

def encode(encoder)

def encode(encoder)
  encoder.write_int16(@required_acks)
  encoder.write_int32(@timeout)
  encoder.write_array(@messages_for_topics) do |topic, messages_for_partition|
    encoder.write_string(topic)
    encoder.write_array(messages_for_partition) do |partition, message_set|
      encoder.write_int32(partition)
      # When encoding the message set into the request, the bytesize of the message
      # set must precede the actual data. Therefore we need to encode the entire
      # message set into a separate buffer first.
      encoded_message_set = Encoder.encode_with(message_set)
      encoder.write_bytes(encoded_message_set)
    end
  end
end