class Kafka::Protocol::MetadataResponse


Isr => [int32]
Replicas => [int32]
Leader => int32
PartitionId => int32
PartitionErrorCode => int16
PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
TopicErrorCode => int16
TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
Port => int32
Host => string
NodeId => int32
Broker => NodeId Host Port (any number of brokers may be returned)
MetadataResponse => [Broker][TopicMetadata]

## API Specification
leader.
the subset of replicas that are “in sync”, i.e. have fully caught up with the
as well as a list of node ids for the set of replicas, are given. The ‘isr` list is
* For each topic partition the node id of the broker acting as partition leader,
* For each broker a node id, host, and port is provided.
the cluster.
The response contains information on the brokers, topics, and partitions in
A response to a {MetadataRequest}.

def self.decode(decoder)

Returns:
  • (MetadataResponse) - the metadata response.

Parameters:
  • decoder (Decoder) --
def self.decode(decoder)
  brokers = decoder.array do
    node_id = decoder.int32
    host = decoder.string
    port = decoder.int32
    rack = decoder.string
    BrokerInfo.new(
      node_id: node_id,
      host: host,
      port: port
    )
  end
  controller_id = decoder.int32
  topics = decoder.array do
    topic_error_code = decoder.int16
    topic_name = decoder.string
    is_internal = decoder.boolean
    partitions = decoder.array do
      PartitionMetadata.new(
        partition_error_code: decoder.int16,
        partition_id: decoder.int32,
        leader: decoder.int32,
        replicas: decoder.array { decoder.int32 },
        isr: decoder.array { decoder.int32 },
      )
    end
    TopicMetadata.new(
      topic_error_code: topic_error_code,
      topic_name: topic_name,
      partitions: partitions,
    )
  end
  new(brokers: brokers, controller_id: controller_id, topics: topics)
end

def find_broker(node_id)

Returns:
  • (BrokerInfo) - information about the broker.

Parameters:
  • node_id (Integer) -- the node id of the broker.
def find_broker(node_id)
  broker = @brokers.find {|broker| broker.node_id == node_id }
  raise Kafka::NoSuchBroker, "No broker with id #{node_id}" if broker.nil?
  broker
end

def find_leader_id(topic, partition)

Returns:
  • (Integer) - the node id of the leader.

Parameters:
  • partition (Integer) -- the partition number.
  • topic (String) -- the name of the topic.
def find_leader_id(topic, partition)
  topic_info = @topics.find {|t| t.topic_name == topic }
  if topic_info.nil?
    raise UnknownTopicOrPartition, "no topic #{topic}"
  end
  Protocol.handle_error(topic_info.topic_error_code)
  partition_info = topic_info.partitions.find {|p| p.partition_id == partition }
  if partition_info.nil?
    raise UnknownTopicOrPartition, "no partition #{partition} in topic #{topic}"
  end
  begin
    Protocol.handle_error(partition_info.partition_error_code)
  rescue ReplicaNotAvailable
    # This error can be safely ignored per the protocol specification.
  end
  partition_info.leader
end

def initialize(brokers:, controller_id:, topics:)

def initialize(brokers:, controller_id:, topics:)
  @brokers = brokers
  @controller_id = controller_id
  @topics = topics
end

def partitions_for(topic_name)

def partitions_for(topic_name)
  topic = @topics.find {|t| t.topic_name == topic_name }
  if topic.nil?
    raise UnknownTopicOrPartition, "unknown topic #{topic_name}"
  end
  Protocol.handle_error(topic.topic_error_code)
  topic.partitions
end