class Kafka::Client

def alter_topic(name, configs = {})

Returns:
  • (nil) -

Parameters:
  • configs (Hash) -- hash of desired config keys and values.
  • name (String) -- the name of the topic.

Other tags:
    Example: Describing the cleanup policy config of a topic -

Other tags:
    Note: - This is an alpha level API and is subject to change.
def alter_topic(name, configs = {})
  @cluster.alter_topic(name, configs)
end

def apis

def apis
  @cluster.apis
end

def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **options)

Returns:
  • (AsyncProducer) -

Other tags:
    See: AsyncProducer -

Parameters:
  • delivery_interval (Integer) -- if greater than zero, the number of
  • delivery_threshold (Integer) -- if greater than zero, the number of
  • max_queue_size (Integer) -- the maximum number of messages allowed in
def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **options)
  sync_producer = producer(**options)
  AsyncProducer.new(
    sync_producer: sync_producer,
    delivery_interval: delivery_interval,
    delivery_threshold: delivery_threshold,
    max_queue_size: max_queue_size,
    instrumenter: @instrumenter,
    logger: @logger,
  )
end

def close

Returns:
  • (nil) -
def close
  @cluster.disconnect
end

def consumer(group_id:, session_timeout: 30, offset_commit_interval: 10, offset_commit_threshold: 0, heartbeat_interval: 10, offset_retention_time: nil)

Returns:
  • (Consumer) -

Parameters:
  • offset_retention_time (Integer) -- the time period that committed
  • heartbeat_interval (Integer) -- the interval between heartbeats; must be less
  • offset_commit_threshold (Integer) -- the number of messages that can be
  • offset_commit_interval (Integer) -- the interval between offset commits,
  • session_timeout (Integer) -- the number of seconds after which, if a client
  • group_id (String) -- the id of the group that the consumer should join.
def consumer(group_id:, session_timeout: 30, offset_commit_interval: 10, offset_commit_threshold: 0, heartbeat_interval: 10, offset_retention_time: nil)
  cluster = initialize_cluster
  instrumenter = DecoratingInstrumenter.new(@instrumenter, {
    group_id: group_id,
  })
  # The Kafka protocol expects the retention time to be in ms.
  retention_time = (offset_retention_time && offset_retention_time * 1_000) || -1
  group = ConsumerGroup.new(
    cluster: cluster,
    logger: @logger,
    group_id: group_id,
    session_timeout: session_timeout,
    retention_time: retention_time,
    instrumenter: instrumenter,
  )
  fetcher = Fetcher.new(
    cluster: initialize_cluster,
    logger: @logger,
    instrumenter: instrumenter,
  )
  offset_manager = OffsetManager.new(
    cluster: cluster,
    group: group,
    fetcher: fetcher,
    logger: @logger,
    commit_interval: offset_commit_interval,
    commit_threshold: offset_commit_threshold,
    offset_retention_time: offset_retention_time
  )
  heartbeat = Heartbeat.new(
    group: group,
    interval: heartbeat_interval,
  )
  Consumer.new(
    cluster: cluster,
    logger: @logger,
    instrumenter: instrumenter,
    group: group,
    offset_manager: offset_manager,
    fetcher: fetcher,
    session_timeout: session_timeout,
    heartbeat: heartbeat,
  )
end

def create_partitions_for(name, num_partitions: 1, timeout: 30)

Returns:
  • (nil) -

Parameters:
  • timeout (Integer) -- a duration of time to wait for the new
  • num_partitions (Integer) -- the number of desired partitions for
  • name (String) -- the name of the topic.
def create_partitions_for(name, num_partitions: 1, timeout: 30)
  @cluster.create_partitions_for(name, num_partitions: num_partitions, timeout: timeout)
end

def create_topic(name, num_partitions: 1, replication_factor: 1, timeout: 30, config: {})

Returns:
  • (nil) -

Raises:
  • (Kafka::TopicAlreadyExists) - if the topic already exists.

Parameters:
  • config (Hash) -- topic configuration entries. See
  • timeout (Integer) -- a duration of time to wait for the topic to be
  • replication_factor (Integer) -- the replication factor of the topic.
  • num_partitions (Integer) -- the number of partitions that should be created
  • name (String) -- the name of the topic.

Other tags:
    Example: Creating a topic with log compaction -
def create_topic(name, num_partitions: 1, replication_factor: 1, timeout: 30, config: {})
  @cluster.create_topic(
    name,
    num_partitions: num_partitions,
    replication_factor: replication_factor,
    timeout: timeout,
    config: config,
  )
end

def delete_topic(name, timeout: 30)

Returns:
  • (nil) -

Parameters:
  • timeout (Integer) -- a duration of time to wait for the topic to be
  • name (String) -- the name of the topic.
def delete_topic(name, timeout: 30)
  @cluster.delete_topic(name, timeout: timeout)
end

def deliver_message(value, key: nil, topic:, partition: nil, partition_key: nil, retries: 1)

Returns:
  • (nil) -

Parameters:
  • retries (Integer) -- the number of times to retry the delivery before giving
  • partition_key (String) -- a value used to deterministically choose a
  • partition (Integer, nil) -- the partition that the message should be written
  • topic (String) -- the topic that the message should be written to.
  • key (String, nil) -- the message key.
  • value (String, nil) -- the message value.
def deliver_message(value, key: nil, topic:, partition: nil, partition_key: nil, retries: 1)
  create_time = Time.now
  message = PendingMessage.new(
    value,
    key,
    topic,
    partition,
    partition_key,
    create_time,
  )
  if partition.nil?
    partition_count = @cluster.partitions_for(topic).count
    partition = Partitioner.partition_for_key(partition_count, message)
  end
  buffer = MessageBuffer.new
  buffer.write(
    value: message.value,
    key: message.key,
    topic: message.topic,
    partition: partition,
    create_time: message.create_time,
  )
  @cluster.add_target_topics([topic])
  compressor = Compressor.new(
    instrumenter: @instrumenter,
  )
  operation = ProduceOperation.new(
    cluster: @cluster,
    buffer: buffer,
    required_acks: 1,
    ack_timeout: 10,
    compressor: compressor,
    logger: @logger,
    instrumenter: @instrumenter,
  )
  attempt = 1
  begin
    operation.execute
    unless buffer.empty?
      raise DeliveryFailed.new(nil, [message])
    end
  rescue Kafka::Error => e
    @cluster.mark_as_stale!
    if attempt >= (retries + 1)
      raise
    else
      attempt += 1
      @logger.warn "Error while delivering message, #{e.class}: #{e.message}; retrying after 1s..."
      sleep 1
      retry
    end
  end
end

def describe_topic(name, configs = [])

Returns:
  • (Hash) -

Parameters:
  • configs (Array) -- array of desired config names.
  • name (String) -- the name of the topic.

Other tags:
    Example: Describing the cleanup policy config of a topic -

Other tags:
    Note: - This is an alpha level API and is subject to change.
def describe_topic(name, configs = [])
  @cluster.describe_topic(name, configs)
end

def each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576, &block)

Returns:
  • (nil) -

Parameters:
  • max_bytes (Integer) -- the maximum number of bytes to include in the
  • min_bytes (Integer) -- the minimum number of bytes to wait for. If set to
  • max_wait_time (Integer) -- the maximum amount of time to wait before
  • start_from_beginning (Boolean) -- whether to start from the beginning
  • topic (String) -- the topic to consume messages from.
def each_message(topic:, start_from_beginning: true, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576, &block)
  default_offset ||= start_from_beginning ? :earliest : :latest
  offsets = Hash.new { default_offset }
  loop do
    operation = FetchOperation.new(
      cluster: @cluster,
      logger: @logger,
      min_bytes: min_bytes,
      max_wait_time: max_wait_time,
    )
    @cluster.partitions_for(topic).map(&:partition_id).each do |partition|
      partition_offset = offsets[partition]
      operation.fetch_from_partition(topic, partition, offset: partition_offset, max_bytes: max_bytes)
    end
    batches = operation.execute
    batches.each do |batch|
      batch.messages.each(&block)
      offsets[batch.partition] = batch.last_offset + 1
    end
  end
end

def fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576, retries: 1)

Returns:
  • (Array) - the messages returned from the broker.

Parameters:
  • max_bytes (Integer) -- the maximum number of bytes to include in the
  • min_bytes (Integer) -- the minimum number of bytes to wait for. If set to
  • max_wait_time (Integer) -- the maximum amount of time to wait before
  • offset (Integer, Symbol) -- the offset to start reading from. Default is
  • partition (Integer) -- the partition that messages should be fetched from.
  • topic (String) -- the topic that messages should be fetched from.
def fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576, retries: 1)
  operation = FetchOperation.new(
    cluster: @cluster,
    logger: @logger,
    min_bytes: min_bytes,
    max_bytes: max_bytes,
    max_wait_time: max_wait_time,
  )
  operation.fetch_from_partition(topic, partition, offset: offset, max_bytes: max_bytes)
  attempt = 1
  begin
    operation.execute.flat_map {|batch| batch.messages }
  rescue Kafka::Error => e
    @cluster.mark_as_stale!
    if attempt >= (retries + 1)
      raise
    else
      attempt += 1
      @logger.warn "Error while fetching messages, #{e.class}: #{e.message}; retrying..."
      retry
    end
  end
end

def has_topic?(topic)

def has_topic?(topic)
  @cluster.clear_target_topics
  @cluster.add_target_topics([topic])
  @cluster.topics.include?(topic)
end

def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil,

Returns:
  • (Client) -

Parameters:
  • sasl_scram_mechanism (String, nil) -- Scram mechanism, either "sha256" or "sha512"
  • sasl_scram_password (String, nil) -- SCRAM password
  • sasl_scram_username (String, nil) -- SCRAM username
  • sasl_gssapi_keytab (String, nil) -- a KRB5 keytab filepath
  • sasl_gssapi_principal (String, nil) -- a KRB5 principal
  • ssl_client_cert_key (String, nil) -- a PEM encoded client cert key to use with an
  • ssl_client_cert (String, nil) -- a PEM encoded client cert to use with an
  • ssl_ca_cert_file_path (String, nil) -- a path on the filesystem to a PEM encoded CA cert
  • ssl_ca_cert (String, Array, nil) -- a PEM encoded CA cert, or an Array of
  • socket_timeout (Integer, nil) -- the timeout setting for socket
  • connect_timeout (Integer, nil) -- the timeout setting for connecting
  • logger (Logger) -- the logger that should be used by the client.
  • client_id (String) -- the identifier for this application.
  • seed_brokers (Array, String) -- the list of brokers used to initialize
def initialize(seed_brokers:, client_id: "ruby-kafka", logger: nil, connect_timeout: nil, socket_timeout: nil,
               ssl_ca_cert_file_path: nil, ssl_ca_cert: nil, ssl_client_cert: nil, ssl_client_cert_key: nil,
               sasl_gssapi_principal: nil, sasl_gssapi_keytab: nil,
               sasl_plain_authzid: '', sasl_plain_username: nil, sasl_plain_password: nil,
               sasl_scram_username: nil, sasl_scram_password: nil, sasl_scram_mechanism: nil, ssl_ca_certs_from_system: false)
  @logger = logger || Logger.new(nil)
  @instrumenter = Instrumenter.new(client_id: client_id)
  @seed_brokers = normalize_seed_brokers(seed_brokers)
  ssl_context = SslContext.build(
    ca_cert_file_path: ssl_ca_cert_file_path,
    ca_cert: ssl_ca_cert,
    client_cert: ssl_client_cert,
    client_cert_key: ssl_client_cert_key,
    ca_certs_from_system: ssl_ca_certs_from_system,
  )
  sasl_authenticator = SaslAuthenticator.new(
    sasl_gssapi_principal: sasl_gssapi_principal,
    sasl_gssapi_keytab: sasl_gssapi_keytab,
    sasl_plain_authzid: sasl_plain_authzid,
    sasl_plain_username: sasl_plain_username,
    sasl_plain_password: sasl_plain_password,
    sasl_scram_username: sasl_scram_username,
    sasl_scram_password: sasl_scram_password,
    sasl_scram_mechanism: sasl_scram_mechanism,
    logger: @logger
  )
  if sasl_authenticator.enabled? && ssl_context.nil?
    raise ArgumentError, "SASL authentication requires that SSL is configured"
  end
  @connection_builder = ConnectionBuilder.new(
    client_id: client_id,
    connect_timeout: connect_timeout,
    socket_timeout: socket_timeout,
    ssl_context: ssl_context,
    logger: @logger,
    instrumenter: @instrumenter,
    sasl_authenticator: sasl_authenticator
  )
  @cluster = initialize_cluster
end

def initialize_cluster

def initialize_cluster
  broker_pool = BrokerPool.new(
    connection_builder: @connection_builder,
    logger: @logger,
  )
  Cluster.new(
    seed_brokers: @seed_brokers,
    broker_pool: broker_pool,
    logger: @logger,
  )
end

def last_offset_for(topic, partition)

Returns:
  • (Integer) - the offset of the last message in the partition, or -1 if

Parameters:
  • partition (Integer) --
  • topic (String) --
def last_offset_for(topic, partition)
  # The offset resolution API will return the offset of the "next" message to
  # be written when resolving the "latest" offset, so we subtract one.
  @cluster.resolve_offset(topic, partition, :latest) - 1
end

def last_offsets_for(*topics)

Returns:
  • (Hash>) -

Parameters:
  • topics (Array) -- topic names.
def last_offsets_for(*topics)
  @cluster.add_target_topics(topics)
  topics.map {|topic|
    partition_ids = @cluster.partitions_for(topic).collect(&:partition_id)
    partition_offsets = @cluster.resolve_offsets(topic, partition_ids, :latest)
    [topic, partition_offsets.collect { |k, v| [k, v - 1] }.to_h]
  }.to_h
end

def normalize_seed_brokers(seed_brokers)

def normalize_seed_brokers(seed_brokers)
  if seed_brokers.is_a?(String)
    seed_brokers = seed_brokers.split(",")
  end
  seed_brokers.map {|str| BrokerUri.parse(str) }
end

def partitions_for(topic)

Returns:
  • (Integer) - the number of partitions in the topic.

Parameters:
  • topic (String) --
def partitions_for(topic)
  @cluster.partitions_for(topic).count
end

def producer(compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: :all, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000)

Returns:
  • (Kafka::Producer) - the Kafka producer.

Parameters:
  • compression_threshold (Integer) -- the number of messages that needs to
  • compression_codec (Symbol, nil) -- the name of the compression codec to
  • max_buffer_bytesize (Integer) -- the maximum size of the buffer in bytes.
  • max_buffer_size (Integer) -- the number of messages allowed in the buffer
  • retry_backoff (Integer) -- the number of seconds to wait between retries.
  • max_retries (Integer) -- the number of retries that should be attempted
  • required_acks (Integer, Symbol) -- The number of replicas that must acknowledge
  • ack_timeout (Integer) -- The number of seconds a broker can wait for
def producer(compression_codec: nil, compression_threshold: 1, ack_timeout: 5, required_acks: :all, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000, max_buffer_bytesize: 10_000_000)
  compressor = Compressor.new(
    codec_name: compression_codec,
    threshold: compression_threshold,
    instrumenter: @instrumenter,
  )
  Producer.new(
    cluster: initialize_cluster,
    logger: @logger,
    instrumenter: @instrumenter,
    compressor: compressor,
    ack_timeout: ack_timeout,
    required_acks: required_acks,
    max_retries: max_retries,
    retry_backoff: retry_backoff,
    max_buffer_size: max_buffer_size,
    max_buffer_bytesize: max_buffer_bytesize,
  )
end

def supports_api?(api_key, version = nil)

Returns:
  • (Boolean) -

Parameters:
  • version (Integer) -- API version.
  • api_key (Integer) -- API key.
def supports_api?(api_key, version = nil)
  @cluster.supports_api?(api_key, version)
end

def topics

Returns:
  • (Array) - the list of topic names.
def topics
  @cluster.list_topics
end