class Kafka::Client

def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **options)

def async_producer(delivery_interval: 0, delivery_threshold: 0, max_queue_size: 1000, **options)
  sync_producer = producer(**options)
  AsyncProducer.new(
    sync_producer: sync_producer,
    delivery_interval: delivery_interval,
    delivery_threshold: delivery_threshold,
    max_queue_size: max_queue_size,
  )
end

def close

def close
  @cluster.disconnect
end

def fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576)

Returns:
  • (Array) - the messages returned from the broker.

Parameters:
  • max_bytes (Integer) -- the maximum number of bytes to include in the
  • min_bytes (Integer) -- the minimum number of bytes to wait for. If set to
  • max_wait_time (Integer) -- the maximum amount of time to wait before
  • offset (Integer, Symbol) -- the offset to start reading from. Default is
  • partition (Integer) -- the partition that messages should be fetched from.
  • topic (String) -- the topic that messages should be fetched from.

Other tags:
    Note: - This API is still alpha level. Don't try to use it in production.
def fetch_messages(topic:, partition:, offset: :latest, max_wait_time: 5, min_bytes: 1, max_bytes: 1048576)
  operation = FetchOperation.new(
    cluster: @cluster,
    logger: @logger,
    min_bytes: min_bytes,
    max_wait_time: max_wait_time,
  )
  operation.fetch_from_partition(topic, partition, offset: offset, max_bytes: max_bytes)
  operation.execute
end

def initialize(seed_brokers:, client_id: DEFAULT_CLIENT_ID, logger: DEFAULT_LOGGER, connect_timeout: nil, socket_timeout: nil)

Returns:
  • (Client) -

Parameters:
  • socket_timeout (Integer, nil) -- the timeout setting for socket
  • connect_timeout (Integer, nil) -- the timeout setting for connecting
  • logger (Logger) --
  • client_id (String) -- the identifier for this application.
  • seed_brokers (Array) -- the list of brokers used to initialize
def initialize(seed_brokers:, client_id: DEFAULT_CLIENT_ID, logger: DEFAULT_LOGGER, connect_timeout: nil, socket_timeout: nil)
  @logger = logger
  broker_pool = BrokerPool.new(
    client_id: client_id,
    connect_timeout: connect_timeout,
    socket_timeout: socket_timeout,
    logger: logger,
  )
  @cluster = Cluster.new(
    seed_brokers: seed_brokers,
    broker_pool: broker_pool,
    logger: logger,
  )
end

def partitions_for(topic)

Returns:
  • (Integer) - the number of partitions in the topic.

Parameters:
  • topic (String) --
def partitions_for(topic)
  @cluster.partitions_for(topic).count
end

def producer(**options)

Returns:
  • (Kafka::Producer) - the Kafka producer.

Other tags:
    See: Producer#initialize -
def producer(**options)
  Producer.new(cluster: @cluster, logger: @logger, **options)
end

def topics

Returns:
  • (Array) - the list of topic names.
def topics
  @cluster.topics
end