class Kafka::Protocol::MetadataResponse
Isr => [int32]
Replicas => [int32]
Leader => int32
PartitionId => int32
PartitionErrorCode => int16
PartitionMetadata => PartitionErrorCode PartitionId Leader Replicas Isr
TopicErrorCode => int16
TopicMetadata => TopicErrorCode TopicName [PartitionMetadata]
Port => int32
Host => string
NodeId => int32
Broker => NodeId Host Port (any number of brokers may be returned)
MetadataResponse => [Broker][TopicMetadata]
## API Specification
leader.
the subset of replicas that are “in sync”, i.e. have fully caught up with the
as well as a list of node ids for the set of replicas, are given. The ‘isr` list is
* For each topic partition the node id of the broker acting as partition leader,
* For each broker a node id, host, and port is provided.
the cluster.
The response contains information on the brokers, topics, and partitions in
A response to a {MetadataRequest}.
def self.decode(decoder)
-
(MetadataResponse)
- the metadata response.
Parameters:
-
decoder
(Decoder
) --
def self.decode(decoder) brokers = decoder.array do node_id = decoder.int32 host = decoder.string port = decoder.int32 rack = decoder.string BrokerInfo.new( node_id: node_id, host: host, port: port ) end controller_id = decoder.int32 topics = decoder.array do topic_error_code = decoder.int16 topic_name = decoder.string is_internal = decoder.boolean partitions = decoder.array do PartitionMetadata.new( partition_error_code: decoder.int16, partition_id: decoder.int32, leader: decoder.int32, replicas: decoder.array { decoder.int32 }, isr: decoder.array { decoder.int32 }, ) end TopicMetadata.new( topic_error_code: topic_error_code, topic_name: topic_name, partitions: partitions, ) end new(brokers: brokers, controller_id: controller_id, topics: topics) end
def find_broker(node_id)
-
(BrokerInfo)
- information about the broker.
Parameters:
-
node_id
(Integer
) -- the node id of the broker.
def find_broker(node_id) broker = @brokers.find {|broker| broker.node_id == node_id } raise Kafka::NoSuchBroker, "No broker with id #{node_id}" if broker.nil? broker end
def find_leader_id(topic, partition)
-
(Integer)
- the node id of the leader.
Parameters:
-
partition
(Integer
) -- the partition number. -
topic
(String
) -- the name of the topic.
def find_leader_id(topic, partition) topic_info = @topics.find {|t| t.topic_name == topic } if topic_info.nil? raise UnknownTopicOrPartition, "no topic #{topic}" end Protocol.handle_error(topic_info.topic_error_code) partition_info = topic_info.partitions.find {|p| p.partition_id == partition } if partition_info.nil? raise UnknownTopicOrPartition, "no partition #{partition} in topic #{topic}" end begin Protocol.handle_error(partition_info.partition_error_code) rescue ReplicaNotAvailable # This error can be safely ignored per the protocol specification. end partition_info.leader end
def initialize(brokers:, controller_id:, topics:)
def initialize(brokers:, controller_id:, topics:) @brokers = brokers @controller_id = controller_id @topics = topics end
def partitions_for(topic_name)
def partitions_for(topic_name) topic = @topics.find {|t| t.topic_name == topic_name } if topic.nil? raise UnknownTopicOrPartition, "unknown topic #{topic_name}" end Protocol.handle_error(topic.topic_error_code) topic.partitions end