lib/kafka/protocol.rb



# frozen_string_literal: true

module Kafka

  # The protocol layer of the library.
  #
  # The Kafka protocol (https://kafka.apache.org/protocol) defines a set of API
  # requests, each with a well-known numeric API key, as well as a set of error
  # codes with specific meanings.
  #
  # This module, and the classes contained in it, implement the client side of
  # the protocol.
  module Protocol
    # The replica id of non-brokers is always -1.
    REPLICA_ID = -1

    PRODUCE_API = 0
    FETCH_API = 1
    LIST_OFFSET_API = 2
    TOPIC_METADATA_API = 3
    OFFSET_COMMIT_API = 8
    OFFSET_FETCH_API = 9
    GROUP_COORDINATOR_API = 10
    JOIN_GROUP_API = 11
    HEARTBEAT_API = 12
    LEAVE_GROUP_API = 13
    SYNC_GROUP_API = 14
    SASL_HANDSHAKE_API = 17
    API_VERSIONS_API = 18
    CREATE_TOPICS_API = 19
    DELETE_TOPICS_API = 20
    DESCRIBE_CONFIGS_API = 32
    ALTER_CONFIGS_API = 33
    CREATE_PARTITIONS_API = 37

    # A mapping from numeric API keys to symbolic API names.
    APIS = {
      PRODUCE_API => :produce,
      FETCH_API => :fetch,
      LIST_OFFSET_API => :list_offset,
      TOPIC_METADATA_API => :topic_metadata,
      OFFSET_COMMIT_API => :offset_commit,
      OFFSET_FETCH_API => :offset_fetch,
      GROUP_COORDINATOR_API => :group_coordinator,
      JOIN_GROUP_API => :join_group,
      HEARTBEAT_API => :heartbeat,
      LEAVE_GROUP_API => :leave_group,
      SYNC_GROUP_API => :sync_group,
      SASL_HANDSHAKE_API => :sasl_handshake,
      API_VERSIONS_API => :api_versions,
      CREATE_TOPICS_API => :create_topics,
      DELETE_TOPICS_API => :delete_topics,
      DESCRIBE_CONFIGS_API => :describe_configs_api,
      CREATE_PARTITIONS_API => :create_partitions
    }

    # A mapping from numeric error codes to exception classes.
    ERRORS = {
      -1 => UnknownError,
       1 => OffsetOutOfRange,
       2 => CorruptMessage,
       3 => UnknownTopicOrPartition,
       4 => InvalidMessageSize,
       5 => LeaderNotAvailable,
       6 => NotLeaderForPartition,
       7 => RequestTimedOut,
       8 => BrokerNotAvailable,
       9 => ReplicaNotAvailable,
      10 => MessageSizeTooLarge,
      12 => OffsetMetadataTooLarge,
      15 => GroupCoordinatorNotAvailable,
      16 => NotCoordinatorForGroup,
      17 => InvalidTopic,
      18 => RecordListTooLarge,
      19 => NotEnoughReplicas,
      20 => NotEnoughReplicasAfterAppend,
      21 => InvalidRequiredAcks,
      22 => IllegalGeneration,
      25 => UnknownMemberId,
      26 => InvalidSessionTimeout,
      27 => RebalanceInProgress,
      28 => InvalidCommitOffsetSize,
      29 => TopicAuthorizationFailed,
      30 => GroupAuthorizationFailed,
      31 => ClusterAuthorizationFailed,
      32 => InvalidTimestamp,
      33 => UnsupportedSaslMechanism,
      34 => InvalidSaslState,
      35 => UnsupportedVersion,
      36 => TopicAlreadyExists,
      37 => InvalidPartitions,
      38 => InvalidReplicationFactor,
      39 => InvalidReplicaAssignment,
      40 => InvalidConfig,
      41 => NotController,
      42 => InvalidRequest
    }

    # A mapping from int to corresponding resource type in symbol.
    # https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
    RESOURCE_TYPE_UNKNOWN = 0
    RESOURCE_TYPE_ANY = 1
    RESOURCE_TYPE_TOPIC = 2
    RESOURCE_TYPE_GROUP = 3
    RESOURCE_TYPE_CLUSTER = 4
    RESOURCE_TYPE_TRANSACTIONAL_ID = 5
    RESOURCE_TYPE_DELEGATION_TOKEN = 6
    RESOURCE_TYPES = {
      RESOURCE_TYPE_UNKNOWN => :unknown,
      RESOURCE_TYPE_ANY => :any,
      RESOURCE_TYPE_TOPIC => :topic,
      RESOURCE_TYPE_GROUP => :group,
      RESOURCE_TYPE_CLUSTER => :cluster,
      RESOURCE_TYPE_TRANSACTIONAL_ID => :transactional_id,
      RESOURCE_TYPE_DELEGATION_TOKEN => :delegation_token,
    }

    # Handles an error code by either doing nothing (if there was no error) or
    # by raising an appropriate exception.
    #
    # @param error_code Integer
    # @raise [ProtocolError]
    # @return [nil]
    def self.handle_error(error_code, error_message = nil)
      if error_code == 0
        # No errors, yay!
      elsif error = ERRORS[error_code]
        raise error, error_message
      else
        raise UnknownError, "Unknown error with code #{error_code} #{error_message}"
      end
    end

    # Returns the symbolic name for an API key.
    #
    # @param api_key Integer
    # @return [Symbol]
    def self.api_name(api_key)
      APIS.fetch(api_key, :unknown)
    end
  end
end

require "kafka/protocol/metadata_request"
require "kafka/protocol/metadata_response"
require "kafka/protocol/produce_request"
require "kafka/protocol/produce_response"
require "kafka/protocol/fetch_request"
require "kafka/protocol/fetch_response"
require "kafka/protocol/list_offset_request"
require "kafka/protocol/list_offset_response"
require "kafka/protocol/group_coordinator_request"
require "kafka/protocol/group_coordinator_response"
require "kafka/protocol/join_group_request"
require "kafka/protocol/join_group_response"
require "kafka/protocol/sync_group_request"
require "kafka/protocol/sync_group_response"
require "kafka/protocol/leave_group_request"
require "kafka/protocol/leave_group_response"
require "kafka/protocol/heartbeat_request"
require "kafka/protocol/heartbeat_response"
require "kafka/protocol/offset_fetch_request"
require "kafka/protocol/offset_fetch_response"
require "kafka/protocol/offset_commit_request"
require "kafka/protocol/offset_commit_response"
require "kafka/protocol/api_versions_request"
require "kafka/protocol/api_versions_response"
require "kafka/protocol/sasl_handshake_request"
require "kafka/protocol/sasl_handshake_response"
require "kafka/protocol/create_topics_request"
require "kafka/protocol/create_topics_response"
require "kafka/protocol/delete_topics_request"
require "kafka/protocol/delete_topics_response"
require "kafka/protocol/describe_configs_request"
require "kafka/protocol/describe_configs_response"
require "kafka/protocol/alter_configs_request"
require "kafka/protocol/alter_configs_response"
require "kafka/protocol/create_partitions_request"
require "kafka/protocol/create_partitions_response"