lib/kafka/cluster.rb



# frozen_string_literal: true

require "kafka/broker_pool"
require "set"

module Kafka

  # A cluster represents the state of a Kafka cluster. It needs to be initialized
  # with a non-empty list of seed brokers. The first seed broker that the cluster can connect
  # to will be asked for the cluster metadata, which allows the cluster to map topic
  # partitions to the current leader for those partitions.
  class Cluster

    # Initializes a Cluster with a set of seed brokers.
    #
    # The cluster will try to fetch cluster metadata from one of the brokers.
    #
    # @param seed_brokers [Array<URI>]
    # @param broker_pool [Kafka::BrokerPool]
    # @param logger [Logger]
    def initialize(seed_brokers:, broker_pool:, logger:)
      if seed_brokers.empty?
        raise ArgumentError, "At least one seed broker must be configured"
      end

      @logger = logger
      @seed_brokers = seed_brokers
      @broker_pool = broker_pool
      @cluster_info = nil
      @stale = true

      # This is the set of topics we need metadata for. If empty, metadata for
      # all topics will be fetched.
      @target_topics = Set.new
    end

    # Adds a list of topics to the target list. Only the topics on this list will
    # be queried for metadata.
    #
    # @param topics [Array<String>]
    # @return [nil]
    def add_target_topics(topics)
      new_topics = Set.new(topics) - @target_topics

      unless new_topics.empty?
        @logger.info "New topics added to target list: #{new_topics.to_a.join(', ')}"

        @target_topics.merge(new_topics)

        refresh_metadata!
      end
    end

    def api_info(api_key)
      apis.find {|api| api.api_key == api_key }
    end

    def supports_api?(api_key, version = nil)
      info = api_info(api_key)
      if info.nil?
        return false
      elsif version.nil?
        return true
      else
        return info.version_supported?(version)
      end
    end

    def apis
      @apis ||=
        begin
          response = random_broker.api_versions

          Protocol.handle_error(response.error_code)

          response.apis
        end
    end

    # Clears the list of target topics.
    #
    # @see #add_target_topics
    # @return [nil]
    def clear_target_topics
      @target_topics.clear
      refresh_metadata!
    end

    def mark_as_stale!
      @stale = true
    end

    def refresh_metadata!
      @cluster_info = nil
      cluster_info
    end

    def refresh_metadata_if_necessary!
      refresh_metadata! if @stale
    end

    # Finds the broker acting as the leader of the given topic and partition.
    #
    # @param topic [String]
    # @param partition [Integer]
    # @return [Broker] the broker that's currently leader.
    def get_leader(topic, partition)
      connect_to_broker(get_leader_id(topic, partition))
    end

    def get_group_coordinator(group_id:)
      @logger.debug "Getting group coordinator for `#{group_id}`"

      refresh_metadata_if_necessary!

      cluster_info.brokers.each do |broker_info|
        begin
          broker = connect_to_broker(broker_info.node_id)
          response = broker.find_group_coordinator(group_id: group_id)

          Protocol.handle_error(response.error_code)

          coordinator_id = response.coordinator_id

          @logger.debug "Coordinator for group `#{group_id}` is #{coordinator_id}. Connecting..."

          # It's possible that a new broker is introduced to the cluster and
          # becomes the coordinator before we have a chance to refresh_metadata.
          coordinator = begin
            connect_to_broker(coordinator_id)
          rescue Kafka::NoSuchBroker
            @logger.debug "Broker #{coordinator_id} missing from broker cache, refreshing"
            refresh_metadata!
            connect_to_broker(coordinator_id)
          end

          @logger.debug "Connected to coordinator: #{coordinator} for group `#{group_id}`"

          return coordinator
        rescue GroupCoordinatorNotAvailable
          @logger.debug "Coordinator not available; retrying in 1s"
          sleep 1
          retry
        rescue ConnectionError => e
          @logger.error "Failed to get group coordinator info from #{broker}: #{e}"
        end
      end

      raise Kafka::Error, "Failed to find group coordinator"
    end

    def partitions_for(topic)
      add_target_topics([topic])
      refresh_metadata_if_necessary!
      cluster_info.partitions_for(topic)
    rescue Kafka::ProtocolError
      mark_as_stale!
      raise
    end

    def create_topic(name, num_partitions:, replication_factor:, timeout:, config:)
      options = {
        topics: {
          name => {
            num_partitions: num_partitions,
            replication_factor: replication_factor,
            config: config,
          }
        },
        timeout: timeout,
      }

      broker = controller_broker

      @logger.info "Creating topic `#{name}` using controller broker #{broker}"

      response = broker.create_topics(**options)

      response.errors.each do |topic, error_code|
        Protocol.handle_error(error_code)
      end

      begin
        partitions_for(name).each do |info|
          Protocol.handle_error(info.partition_error_code)
        end
      rescue Kafka::LeaderNotAvailable
        @logger.warn "Leader not yet available for `#{name}`, waiting 1s..."
        sleep 1

        retry
      rescue Kafka::UnknownTopicOrPartition
        @logger.warn "Topic `#{name}` not yet created, waiting 1s..."
        sleep 1

        retry
      end

      @logger.info "Topic `#{name}` was created"
    end

    def delete_topic(name, timeout:)
      options = {
        topics: [name],
        timeout: timeout,
      }

      broker = controller_broker

      @logger.info "Deleting topic `#{name}` using controller broker #{broker}"

      response = broker.delete_topics(**options)

      response.errors.each do |topic, error_code|
        Protocol.handle_error(error_code)
      end

      @logger.info "Topic `#{name}` was deleted"
    end

    def describe_topic(name, configs = [])
      options = {
        resources: [[Kafka::Protocol::RESOURCE_TYPE_TOPIC, name, configs]]
      }
      broker = controller_broker

      @logger.info "Fetching topic `#{name}`'s configs using controller broker #{broker}"

      response = broker.describe_configs(**options)

      response.resources.each do |resource|
        Protocol.handle_error(resource.error_code, resource.error_message)
      end
      topic_description = response.resources.first
      topic_description.configs.each_with_object({}) do |config, hash|
        hash[config.name] = config.value
      end
    end

    def alter_topic(name, configs = {})
      options = {
        resources: [[Kafka::Protocol::RESOURCE_TYPE_TOPIC, name, configs]]
      }

      broker = controller_broker

      @logger.info "Altering the config for topic `#{name}` using controller broker #{broker}"

      response = broker.alter_configs(**options)

      response.resources.each do |resource|
        Protocol.handle_error(resource.error_code, resource.error_message)
      end

      nil
    end

    def create_partitions_for(name, num_partitions:, timeout:)
      options = {
        topics: [[name, num_partitions, nil]],
        timeout: timeout
      }

      broker = controller_broker

      @logger.info "Creating #{num_partitions} partition(s) for topic `#{name}` using controller broker #{broker}"

      response = broker.create_partitions(**options)

      response.errors.each do |topic, error_code, error_message|
        Protocol.handle_error(error_code, error_message)
      end
      mark_as_stale!

      @logger.info "Topic `#{name}` was updated"
    end

    def resolve_offsets(topic, partitions, offset)
      add_target_topics([topic])
      refresh_metadata_if_necessary!

      partitions_by_broker = partitions.each_with_object({}) {|partition, hsh|
        broker = get_leader(topic, partition)

        hsh[broker] ||= []
        hsh[broker] << partition
      }

      if offset == :earliest
        offset = -2
      elsif offset == :latest
        offset = -1
      end

      offsets = {}

      partitions_by_broker.each do |broker, broker_partitions|
        response = broker.list_offsets(
          topics: {
            topic => broker_partitions.map {|partition|
              {
                partition: partition,
                time: offset,
                max_offsets: 1,
              }
            }
          }
        )

        broker_partitions.each do |partition|
          offsets[partition] = response.offset_for(topic, partition)
        end
      end

      offsets
    rescue Kafka::ProtocolError
      mark_as_stale!
      raise
    end

    def resolve_offset(topic, partition, offset)
      resolve_offsets(topic, [partition], offset).fetch(partition)
    end

    def topics
      refresh_metadata_if_necessary!
      cluster_info.topics.select do |topic|
        topic.topic_error_code == 0
      end.map(&:topic_name)
    end

    # Lists all topics in the cluster.
    def list_topics
      response = random_broker.fetch_metadata(topics: nil)
      response.topics.select do |topic|
        topic.topic_error_code == 0
      end.map(&:topic_name)
    end

    def disconnect
      @broker_pool.close
    end

    private

    def get_leader_id(topic, partition)
      cluster_info.find_leader_id(topic, partition)
    end

    def cluster_info
      @cluster_info ||= fetch_cluster_info
    end

    # Fetches the cluster metadata.
    #
    # This is used to update the partition leadership information, among other things.
    # The methods will go through each node listed in `seed_brokers`, connecting to the
    # first one that is available. This node will be queried for the cluster metadata.
    #
    # @raise [ConnectionError] if none of the nodes in `seed_brokers` are available.
    # @return [Protocol::MetadataResponse] the cluster metadata.
    def fetch_cluster_info
      errors = []

      @seed_brokers.shuffle.each do |node|
        @logger.info "Fetching cluster metadata from #{node}"

        begin
          broker = @broker_pool.connect(node.hostname, node.port)
          cluster_info = broker.fetch_metadata(topics: @target_topics)

          if cluster_info.brokers.empty?
            @logger.error "No brokers in cluster"
          else
            @logger.info "Discovered cluster metadata; nodes: #{cluster_info.brokers.join(', ')}"

            @stale = false

            return cluster_info
          end
        rescue Error => e
          @logger.error "Failed to fetch metadata from #{node}: #{e}"
          errors << [node, e]
        ensure
          broker.disconnect unless broker.nil?
        end
      end

      error_description = errors.map {|node, exception| "- #{node}: #{exception}" }.join("\n")

      raise ConnectionError, "Could not connect to any of the seed brokers:\n#{error_description}"
    end

    def random_broker
      node_id = cluster_info.brokers.sample.node_id
      connect_to_broker(node_id)
    end

    def connect_to_broker(broker_id)
      info = cluster_info.find_broker(broker_id)

      @broker_pool.connect(info.host, info.port, node_id: info.node_id)
    end

    def controller_broker
      connect_to_broker(cluster_info.controller_id)
    end
  end
end