class Kafka::Producer


end
producer.shutdown
producer.deliver_messages
# Make sure to send any remaining messages.
ensure
end
producer.deliver_messages if index % 10 == 0
# Send messages for every 10 lines.
producer.produce(line, topic: topic)
$stdin.each_with_index do |line, index|
begin
producer = kafka.producer
kafka = Kafka.new(brokers, client_id: “simple-producer”, logger: logger)
topic = “random-messages”
# cluster to auto-create topics.
# Make sure to create this topic in your Kafka cluster or configure the
brokers = ENV.fetch(“KAFKA_BROKERS”).split(“,”)
logger = Logger.new($stderr)
require “kafka”
to Kafka:
This is an example of an application which reads lines from stdin and writes them
## Example
* ‘attempts` – the number of attempts made to deliver the messages.
delivered.
* `delivered_message_count` – the number of messages that were successfully
deliver. Note that not all messages may get delivered.
* `message_count` – the total number of messages that the producer tried to
`deliver_messages.producer.kafka` will be emitted with the following payload:
After {#deliver_messages} completes, the notification
* `max_buffer_size` – the maximum allowed buffer size for the producer.
* `buffer_size` – the buffer size after adding the message.
* `topic` – the topic that was produced to.
* `key` – the message key.
* `value` – the message value.
will be emitted with the following payload:
Whenever {#produce} is called, the notification `produce_message.producer.kafka`
## Instrumentation
an integer value higher than one.
codec. To increase the compression threshold, set `compression_threshold` to
By default, all message sets will be compressed if you specify a compression
* `:gzip` for [gzip](en.wikipedia.org/wiki/Gzip) compression.
* `:snappy` for [Snappy](google.github.io/snappy/) compression.
name of one of the algorithms allowed by Kafka:
Compression is enabled by passing the `compression_codec` parameter with the
partition will the messages be compressed.
`compression_threshold`. Only if the defined number of messages are buffered for a
it’s possible to configure a threshold for when to enable compression by setting
Since many workloads have variations in throughput and distribution across partitions,
sets will be larger by the time they’re compressed.
means that compressions works better the larger your buffers get, since the message
rather than on individual messages. This improves the compression rate and generally
bandwidth and space usage. Compression in Kafka is done on entire messages sets
Depending on what kind of data you produce, enabling compression may yield improved
## Compression
We do this for as long as ‘max_retries` permits.
not, we do another round of requests, this time with just the remaining messages.
After this, we check if the buffer is empty. If it is, we’re all done. If it’s
from the buffer – otherwise, we log the error and keep the messages in the buffer.
write to a given partition was successful, we clear the corresponding messages
response and inspect the error code for each partition that we wrote to. If the
request to each broker. A request can be a partial success, so we go through the
group the buffered messages by the broker they need to be sent to and fire off a
for all topics/partitions. Whenever we want to send messages to the cluster, we
The design of the error handling is based on having a {MessageBuffer} hold messages
## Error Handling and Retries
message delays depends on your use case.
try to avoid sending messages after every write. The tradeoff between throughput and
Buffering messages and sending them in batches greatly improves performance, so
to periodically call {#deliver_messages} or set ‘max_buffer_size` to an appropriate value.
buffer has reached this size will result in a BufferOverflow exception. Make sure
a maximum buffer size (default is 1,000 messages) and writing messages after the
The producer buffers pending messages until {#deliver_messages} is called. Note that there is
## Buffering
you can pass in.
`logger` options to `#producer`. See {#initialize} for the list of other options
different producers. This also means that you don’t need to pass the ‘cluster` and
This is done in order to share a logger as well as a pool of broker connections across
producer = kafka.producer
# Will instantiate Kafka::Producer
kafka = Kafka.new([“kafka1:9092”, “kafka2:9092”])
# Will instantiate Kafka::Client
do it for you, e.g.
Typically you won’t instantiate this class yourself, but rather have {Kafka::Client}
Allows sending messages to a Kafka cluster.

def assign_partitions!

def assign_partitions!
  failed_messages = []
  topics_with_failures = Set.new
  @pending_message_queue.each do |message|
    partition = message.partition
    begin
      # If a message for a topic fails to receive a partition all subsequent
      # messages for the topic should be retried to preserve ordering
      if topics_with_failures.include?(message.topic)
        failed_messages << message
        next
      end
      if partition.nil?
        partition_count = @cluster.partitions_for(message.topic).count
        partition = Partitioner.partition_for_key(partition_count, message)
      end
      @buffer.write(
        value: message.value,
        key: message.key,
        topic: message.topic,
        partition: partition,
        create_time: message.create_time,
      )
    rescue Kafka::Error => e
      @instrumenter.instrument("topic_error.producer", {
        topic: message.topic,
        exception: [e.class.to_s, e.message],
      })
      topics_with_failures << message.topic
      failed_messages << message
    end
  end
  if failed_messages.any?
    failed_messages.group_by(&:topic).each do |topic, messages|
      @logger.error "Failed to assign partitions to #{messages.count} messages in #{topic}"
    end
    @cluster.mark_as_stale!
  end
  @pending_message_queue.replace(failed_messages)
end

def buffer_bytesize

def buffer_bytesize
  @pending_message_queue.bytesize + @buffer.bytesize
end

def buffer_messages

def buffer_messages
  messages = []
  @pending_message_queue.each do |message|
    messages << message
  end
  @buffer.each do |topic, partition, messages_for_partition|
    messages_for_partition.each do |message|
      messages << PendingMessage.new(
        message.value,
        message.key,
        topic,
        partition,
        nil,
        message.create_time
      )
    end
  end
  messages
end

def buffer_overflow(topic, message)

def buffer_overflow(topic, message)
  @instrumenter.instrument("buffer_overflow.producer", {
    topic: topic,
  })
  raise BufferOverflow, message
end

def buffer_size

Returns:
  • (Integer) - buffer size.
def buffer_size
  @pending_message_queue.size + @buffer.size
end

def clear_buffer

Returns:
  • (nil) -
def clear_buffer
  @buffer.clear
  @pending_message_queue.clear
end

def deliver_messages

Returns:
  • (nil) -

Raises:
  • (DeliveryFailed) - if not all messages could be successfully sent.
def deliver_messages
  # There's no need to do anything if the buffer is empty.
  return if buffer_size == 0
  @instrumenter.instrument("deliver_messages.producer") do |notification|
    message_count = buffer_size
    notification[:message_count] = message_count
    notification[:attempts] = 0
    begin
      deliver_messages_with_retries(notification)
    ensure
      notification[:delivered_message_count] = message_count - buffer_size
    end
  end
end

def deliver_messages_with_retries(notification)

def deliver_messages_with_retries(notification)
  attempt = 0
  @cluster.add_target_topics(@target_topics)
  operation = ProduceOperation.new(
    cluster: @cluster,
    buffer: @buffer,
    required_acks: @required_acks,
    ack_timeout: @ack_timeout,
    compressor: @compressor,
    logger: @logger,
    instrumenter: @instrumenter,
  )
  loop do
    attempt += 1
    notification[:attempts] = attempt
    begin
      @cluster.refresh_metadata_if_necessary!
    rescue ConnectionError => e
      raise DeliveryFailed.new(e, buffer_messages)
    end
    assign_partitions!
    operation.execute
    if @required_acks.zero?
      # No response is returned by the brokers, so we can't know which messages
      # have been successfully written. Our only option is to assume that they all
      # have.
      @buffer.clear
    end
    if buffer_size.zero?
      break
    elsif attempt <= @max_retries
      @logger.warn "Failed to send all messages; attempting retry #{attempt} of #{@max_retries} after #{@retry_backoff}s"
      sleep @retry_backoff
    else
      @logger.error "Failed to send all messages; keeping remaining messages in buffer"
      break
    end
  end
  unless @pending_message_queue.empty?
    # Mark the cluster as stale in order to force a cluster metadata refresh.
    @cluster.mark_as_stale!
    raise DeliveryFailed.new("Failed to assign partitions to #{@pending_message_queue.size} messages", buffer_messages)
  end
  unless @buffer.empty?
    partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ")
    raise DeliveryFailed.new("Failed to send messages to #{partitions}", buffer_messages)
  end
end

def initialize(cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)

def initialize(cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
  @cluster = cluster
  @logger = logger
  @instrumenter = instrumenter
  @required_acks = required_acks == :all ? -1 : required_acks
  @ack_timeout = ack_timeout
  @max_retries = max_retries
  @retry_backoff = retry_backoff
  @max_buffer_size = max_buffer_size
  @max_buffer_bytesize = max_buffer_bytesize
  @compressor = compressor
  # The set of topics that are produced to.
  @target_topics = Set.new
  # A buffer organized by topic/partition.
  @buffer = MessageBuffer.new
  # Messages added by `#produce` but not yet assigned a partition.
  @pending_message_queue = PendingMessageQueue.new
end

def produce(value, key: nil, topic:, partition: nil, partition_key: nil, create_time: Time.now)

Returns:
  • (nil) -

Raises:
  • (BufferOverflow) - if the maximum buffer size has been reached.

Parameters:
  • create_time (Time) -- the timestamp that should be set on the message.
  • partition_key (String) -- the key that should be used to assign a partition.
  • partition (Integer) -- the partition that the message should be written to.
  • topic (String) -- the topic that the message should be written to.
  • key (String) -- the message key.
  • value (String) -- the message data.
def produce(value, key: nil, topic:, partition: nil, partition_key: nil, create_time: Time.now)
  message = PendingMessage.new(
    value && value.to_s,
    key && key.to_s,
    topic.to_s,
    partition && Integer(partition),
    partition_key && partition_key.to_s,
    create_time,
  )
  if buffer_size >= @max_buffer_size
    buffer_overflow topic,
      "Cannot produce to #{topic}, max buffer size (#{@max_buffer_size} messages) reached"
  end
  if buffer_bytesize + message.bytesize >= @max_buffer_bytesize
    buffer_overflow topic,
      "Cannot produce to #{topic}, max buffer bytesize (#{@max_buffer_bytesize} bytes) reached"
  end
  @target_topics.add(topic)
  @pending_message_queue.write(message)
  @instrumenter.instrument("produce_message.producer", {
    value: value,
    key: key,
    topic: topic,
    create_time: create_time,
    message_size: message.bytesize,
    buffer_size: buffer_size,
    max_buffer_size: @max_buffer_size,
  })
  nil
end

def shutdown

Returns:
  • (nil) -
def shutdown
  @cluster.disconnect
end