class Kafka::Producer


end
producer.shutdown
producer.send_messages
# Make sure to send any remaining messages.
ensure
end
producer.send_messages if index % 10 == 0
# Send messages for every 10 lines.
producer.produce(line, topic: topic)
$stdin.each_with_index do |line, index|
begin
producer = kafka.get_producer
)
logger: logger,
client_id: “simple-producer”,
seed_brokers: brokers,
kafka = Kafka.new(
topic = “random-messages”
# cluster to auto-create topics.
# Make sure to create this topic in your Kafka cluster or configure the
brokers = ENV.fetch(“KAFKA_BROKERS”).split(“,”)
logger = Logger.new($stderr)
require “kafka”
to Kafka:
This is an example of an application which reads lines from stdin and writes them
## Example
We do this for as long as ‘max_retries` permits.
not, we do another round of requests, this time with just the remaining messages.
After this, we check if the buffer is empty. If it is, we’re all done. If it’s
from the buffer – otherwise, we log the error and keep the messages in the buffer.
write to a given partition was successful, we clear the corresponding messages
response and inspect the error code for each partition that we wrote to. If the
request to each broker. A request can be a partial success, so we go through the
group the buffered messages by the broker they need to be sent to and fire off a
for all topics/partitions. Whenever we want to send messages to the cluster, we
The design of the error handling is based on having a {MessageBuffer} hold messages
## Error Handling and Retries
message delays depends on your use case.
try to avoid sending messages after every write. The tradeoff between throughput and
Buffering messages and sending them in batches greatly improves performance, so
to periodically call {#send_messages} or set ‘max_buffer_size` to an appropriate value.
buffer has reached this size will result in a BufferOverflow exception. Make sure
a maximum buffer size (default is 1,000 messages) and writing messages after the
The producer buffers pending messages until {#send_messages} is called. Note that there is
## Buffering
you can pass in.
`logger` options to `#get_producer`. See {#initialize} for the list of other options
different producers. This also means that you don’t need to pass the ‘broker_pool` and
This is done in order to share a logger as well as a pool of broker connections across
producer = kafka.get_producer
# Will instantiate Kafka::Producer
kafka = Kafka.new(…)
# Will instantiate Kafka::Client
do it for you, e.g.
Typically you won’t instantiate this class yourself, but rather have {Kafka::Client}
Allows sending messages to a Kafka cluster.

def buffer_size

Returns:
  • (Integer) - buffer size.
def buffer_size
  @buffer.size
end

def initialize(broker_pool:, logger:, ack_timeout: 5, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000)

Parameters:
  • max_buffer_size (Integer) -- the number of messages allowed in the buffer
  • retry_backoff (Integer) -- the number of seconds to wait between retries.
  • max_retries (Integer) -- the number of retries that should be attempted
  • required_acks (Integer) -- The number of replicas that must acknowledge
  • ack_timeout (Integer) -- The number of seconds a broker can wait for
  • logger (Logger) -- the logger that should be used. Typically passed
  • broker_pool (BrokerPool) -- the broker pool representing the cluster.
def initialize(broker_pool:, logger:, ack_timeout: 5, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000)
  @broker_pool = broker_pool
  @logger = logger
  @required_acks = required_acks
  @ack_timeout = ack_timeout
  @max_retries = max_retries
  @retry_backoff = retry_backoff
  @max_buffer_size = max_buffer_size
  @buffer = MessageBuffer.new
end

def produce(value, key: nil, topic:, partition: nil, partition_key: nil)

Returns:
  • (nil) -

Raises:
  • (BufferOverflow) - if the maximum buffer size has been reached.

Parameters:
  • partition_key (String) -- the key that should be used to assign a partition.
  • partition (Integer) -- the partition that the message should be written to.
  • topic (String) -- the topic that the message should be written to.
  • key (String) -- the message key.
  • value (String) -- the message data.
def produce(value, key: nil, topic:, partition: nil, partition_key: nil)
  unless buffer_size < @max_buffer_size
    raise BufferOverflow, "Max buffer size #{@max_buffer_size} exceeded"
  end
  if partition.nil?
    # If no explicit partition key is specified we use the message key instead.
    partition_key ||= key
    partitioner = Partitioner.new(@broker_pool.partitions_for(topic))
    partition = partitioner.partition_for_key(partition_key)
  end
  message = Protocol::Message.new(key: key, value: value)
  @buffer.write(message, topic: topic, partition: partition)
  partition
end

def send_messages

Returns:
  • (nil) -

Raises:
  • (FailedToSendMessages) - if not all messages could be successfully sent.
def send_messages
  attempt = 0
  transmission = Transmission.new(
    broker_pool: @broker_pool,
    buffer: @buffer,
    required_acks: @required_acks,
    ack_timeout: @ack_timeout,
    logger: @logger,
  )
  loop do
    @logger.info "Sending #{@buffer.size} messages"
    attempt += 1
    transmission.send_messages
    if @buffer.empty?
      @logger.info "Successfully transmitted all messages"
      break
    elsif attempt <= @max_retries
      @logger.warn "Failed to transmit all messages, retry #{attempt} of #{@max_retries}"
      @logger.info "Waiting #{@retry_backoff}s before retrying"
      sleep @retry_backoff
    else
      @logger.error "Failed to transmit all messages; keeping remaining messages in buffer"
      break
    end
  end
  if @required_acks == 0
    # No response is returned by the brokers, so we can't know which messages
    # have been successfully written. Our only option is to assume that they all
    # have.
    @buffer.clear
  end
  unless @buffer.empty?
    partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ")
    raise FailedToSendMessages, "Failed to send messages to #{partitions}"
  end
end

def shutdown

Returns:
  • (nil) -
def shutdown
  @broker_pool.shutdown
end