class Kafka::BrokerPool
def close
def close @brokers.each do |id, broker| @logger.info "Disconnecting broker #{id}" broker.disconnect end end
def connect(host, port, node_id: nil)
def connect(host, port, node_id: nil) if @brokers.key?(node_id) broker = @brokers.fetch(node_id) return broker if broker.address_match?(host, port) broker.disconnect @brokers[node_id] = nil end broker = Broker.new( connection_builder: @connection_builder, host: host, port: port, node_id: node_id, logger: @logger, ) @brokers[node_id] = broker unless node_id.nil? broker end
def initialize(connection_builder:, logger:)
def initialize(connection_builder:, logger:) @logger = logger @connection_builder = connection_builder @brokers = {} end