class Kafka::Protocol::Record

def self.decode(decoder)

def self.decode(decoder)
  record_decoder = Decoder.from_string(decoder.varint_bytes)
  attributes = record_decoder.int8
  timestamp_delta = record_decoder.varint
  offset_delta = record_decoder.varint
  key = record_decoder.varint_string
  value = record_decoder.varint_bytes
  headers = {}
  record_decoder.varint_array do
    header_key = record_decoder.varint_string
    header_value = record_decoder.varint_bytes
    headers[header_key] = header_value
  end
  new(
    key: key,
    value: value,
    headers: headers,
    attributes: attributes,
    offset_delta: offset_delta,
    timestamp_delta: timestamp_delta
  )
end

def ==(other)

def ==(other)
  offset_delta == other.offset_delta &&
    timestamp_delta == other.timestamp_delta &&
    offset == other.offset &&
    is_control_record == other.is_control_record
end

def encode(encoder)

def encode(encoder)
  record_buffer = StringIO.new
  record_encoder = Encoder.new(record_buffer)
  record_encoder.write_int8(@attributes)
  record_encoder.write_varint(@timestamp_delta)
  record_encoder.write_varint(@offset_delta)
  record_encoder.write_varint_string(@key)
  record_encoder.write_varint_bytes(@value)
  record_encoder.write_varint_array(@headers.to_a) do |header_key, header_value|
    record_encoder.write_varint_string(header_key.to_s)
    record_encoder.write_varint_bytes(header_value.to_s)
  end
  encoder.write_varint_bytes(record_buffer.string)
end

def initialize(

def initialize(
  key: nil,
  value:,
  headers: {},
  attributes: 0,
  offset_delta: 0,
  offset: 0,
  timestamp_delta: 0,
  create_time: Time.now,
  is_control_record: false
)
  @key = key
  @value = value
  @headers = headers
  @attributes = attributes
  @offset_delta = offset_delta
  @offset = offset
  @timestamp_delta = timestamp_delta
  @create_time = create_time
  @is_control_record = is_control_record
  @bytesize = @key.to_s.bytesize + @value.to_s.bytesize
end