class Kafka::Producer
end
producer.shutdown
producer.deliver_messages
# Make sure to send any remaining messages.
ensure
end
producer.deliver_messages if index % 10 == 0
# Send messages for every 10 lines.
producer.produce(line, topic: topic)
$stdin.each_with_index do |line, index|
begin
producer = kafka.producer
kafka = Kafka.new(brokers, client_id: “simple-producer”, logger: logger)
topic = “random-messages”
# cluster to auto-create topics.
# Make sure to create this topic in your Kafka cluster or configure the
brokers = ENV.fetch(“KAFKA_BROKERS”).split(“,”)
logger = Logger.new($stderr)
require “kafka”
to Kafka:
This is an example of an application which reads lines from stdin and writes them
## Example
* ‘attempts` – the number of attempts made to deliver the messages.
delivered.
* `delivered_message_count` – the number of messages that were successfully
deliver. Note that not all messages may get delivered.
* `message_count` – the total number of messages that the producer tried to
`deliver_messages.producer.kafka` will be emitted with the following payload:
After {#deliver_messages} completes, the notification
* `max_buffer_size` – the maximum allowed buffer size for the producer.
* `buffer_size` – the buffer size after adding the message.
* `topic` – the topic that was produced to.
* `key` – the message key.
* `value` – the message value.
will be emitted with the following payload:
Whenever {#produce} is called, the notification `produce_message.producer.kafka`
## Instrumentation
an integer value higher than one.
codec. To increase the compression threshold, set `compression_threshold` to
By default, all message sets will be compressed if you specify a compression
* `:gzip` for [gzip](en.wikipedia.org/wiki/Gzip) compression.
* `:snappy` for [Snappy](google.github.io/snappy/) compression.
name of one of the algorithms allowed by Kafka:
Compression is enabled by passing the `compression_codec` parameter with the
partition will the messages be compressed.
`compression_threshold`. Only if the defined number of messages are buffered for a
it’s possible to configure a threshold for when to enable compression by setting
Since many workloads have variations in throughput and distribution across partitions,
sets will be larger by the time they’re compressed.
means that compressions works better the larger your buffers get, since the message
rather than on individual messages. This improves the compression rate and generally
bandwidth and space usage. Compression in Kafka is done on entire messages sets
Depending on what kind of data you produce, enabling compression may yield improved
## Compression
We do this for as long as ‘max_retries` permits.
not, we do another round of requests, this time with just the remaining messages.
After this, we check if the buffer is empty. If it is, we’re all done. If it’s
from the buffer – otherwise, we log the error and keep the messages in the buffer.
write to a given partition was successful, we clear the corresponding messages
response and inspect the error code for each partition that we wrote to. If the
request to each broker. A request can be a partial success, so we go through the
group the buffered messages by the broker they need to be sent to and fire off a
for all topics/partitions. Whenever we want to send messages to the cluster, we
The design of the error handling is based on having a {MessageBuffer} hold messages
## Error Handling and Retries
message delays depends on your use case.
try to avoid sending messages after every write. The tradeoff between throughput and
Buffering messages and sending them in batches greatly improves performance, so
to periodically call {#deliver_messages} or set ‘max_buffer_size` to an appropriate value.
buffer has reached this size will result in a BufferOverflow exception. Make sure
a maximum buffer size (default is 1,000 messages) and writing messages after the
The producer buffers pending messages until {#deliver_messages} is called. Note that there is
## Buffering
you can pass in.
`logger` options to `#producer`. See {#initialize} for the list of other options
different producers. This also means that you don’t need to pass the ‘cluster` and
This is done in order to share a logger as well as a pool of broker connections across
producer = kafka.producer
# Will instantiate Kafka::Producer
kafka = Kafka.new([“kafka1:9092”, “kafka2:9092”])
# Will instantiate Kafka::Client
do it for you, e.g.
Typically you won’t instantiate this class yourself, but rather have {Kafka::Client}
Allows sending messages to a Kafka cluster.
def assign_partitions!
def assign_partitions! failed_messages = [] topics_with_failures = Set.new @pending_message_queue.each do |message| partition = message.partition begin # If a message for a topic fails to receive a partition all subsequent # messages for the topic should be retried to preserve ordering if topics_with_failures.include?(message.topic) failed_messages << message next end if partition.nil? partition_count = @cluster.partitions_for(message.topic).count partition = Partitioner.partition_for_key(partition_count, message) end @buffer.write( value: message.value, key: message.key, topic: message.topic, partition: partition, create_time: message.create_time, ) rescue Kafka::Error => e @instrumenter.instrument("topic_error.producer", { topic: message.topic, exception: [e.class.to_s, e.message], }) topics_with_failures << message.topic failed_messages << message end end if failed_messages.any? failed_messages.group_by(&:topic).each do |topic, messages| @logger.error "Failed to assign partitions to #{messages.count} messages in #{topic}" end @cluster.mark_as_stale! end @pending_message_queue.replace(failed_messages) end
def buffer_bytesize
def buffer_bytesize @pending_message_queue.bytesize + @buffer.bytesize end
def buffer_messages
def buffer_messages messages = [] @pending_message_queue.each do |message| messages << message end @buffer.each do |topic, partition, messages_for_partition| messages_for_partition.each do |message| messages << PendingMessage.new( message.value, message.key, topic, partition, nil, message.create_time ) end end messages end
def buffer_overflow(topic, message)
def buffer_overflow(topic, message) @instrumenter.instrument("buffer_overflow.producer", { topic: topic, }) raise BufferOverflow, message end
def buffer_size
-
(Integer)
- buffer size.
def buffer_size @pending_message_queue.size + @buffer.size end
def clear_buffer
-
(nil)
-
def clear_buffer @buffer.clear @pending_message_queue.clear end
def deliver_messages
-
(nil)
-
Raises:
-
(DeliveryFailed)
- if not all messages could be successfully sent.
def deliver_messages # There's no need to do anything if the buffer is empty. return if buffer_size == 0 @instrumenter.instrument("deliver_messages.producer") do |notification| message_count = buffer_size notification[:message_count] = message_count notification[:attempts] = 0 begin deliver_messages_with_retries(notification) ensure notification[:delivered_message_count] = message_count - buffer_size end end end
def deliver_messages_with_retries(notification)
def deliver_messages_with_retries(notification) attempt = 0 @cluster.add_target_topics(@target_topics) operation = ProduceOperation.new( cluster: @cluster, buffer: @buffer, required_acks: @required_acks, ack_timeout: @ack_timeout, compressor: @compressor, logger: @logger, instrumenter: @instrumenter, ) loop do attempt += 1 notification[:attempts] = attempt begin @cluster.refresh_metadata_if_necessary! rescue ConnectionError => e raise DeliveryFailed.new(e, buffer_messages) end assign_partitions! operation.execute if @required_acks.zero? # No response is returned by the brokers, so we can't know which messages # have been successfully written. Our only option is to assume that they all # have. @buffer.clear end if buffer_size.zero? break elsif attempt <= @max_retries @logger.warn "Failed to send all messages; attempting retry #{attempt} of #{@max_retries} after #{@retry_backoff}s" sleep @retry_backoff else @logger.error "Failed to send all messages; keeping remaining messages in buffer" break end end unless @pending_message_queue.empty? # Mark the cluster as stale in order to force a cluster metadata refresh. @cluster.mark_as_stale! raise DeliveryFailed.new("Failed to assign partitions to #{@pending_message_queue.size} messages", buffer_messages) end unless @buffer.empty? partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ") raise DeliveryFailed.new("Failed to send messages to #{partitions}", buffer_messages) end end
def initialize(cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
def initialize(cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) @cluster = cluster @logger = logger @instrumenter = instrumenter @required_acks = required_acks == :all ? -1 : required_acks @ack_timeout = ack_timeout @max_retries = max_retries @retry_backoff = retry_backoff @max_buffer_size = max_buffer_size @max_buffer_bytesize = max_buffer_bytesize @compressor = compressor # The set of topics that are produced to. @target_topics = Set.new # A buffer organized by topic/partition. @buffer = MessageBuffer.new # Messages added by `#produce` but not yet assigned a partition. @pending_message_queue = PendingMessageQueue.new end
def produce(value, key: nil, topic:, partition: nil, partition_key: nil, create_time: Time.now)
-
(nil)
-
Raises:
-
(BufferOverflow)
- if the maximum buffer size has been reached.
Parameters:
-
create_time
(Time
) -- the timestamp that should be set on the message. -
partition_key
(String
) -- the key that should be used to assign a partition. -
partition
(Integer
) -- the partition that the message should be written to. -
topic
(String
) -- the topic that the message should be written to. -
key
(String
) -- the message key. -
value
(String
) -- the message data.
def produce(value, key: nil, topic:, partition: nil, partition_key: nil, create_time: Time.now) message = PendingMessage.new( value && value.to_s, key && key.to_s, topic.to_s, partition && Integer(partition), partition_key && partition_key.to_s, create_time, ) if buffer_size >= @max_buffer_size buffer_overflow topic, "Cannot produce to #{topic}, max buffer size (#{@max_buffer_size} messages) reached" end if buffer_bytesize + message.bytesize >= @max_buffer_bytesize buffer_overflow topic, "Cannot produce to #{topic}, max buffer bytesize (#{@max_buffer_bytesize} bytes) reached" end @target_topics.add(topic) @pending_message_queue.write(message) @instrumenter.instrument("produce_message.producer", { value: value, key: key, topic: topic, create_time: create_time, message_size: message.bytesize, buffer_size: buffer_size, max_buffer_size: @max_buffer_size, }) nil end
def shutdown
-
(nil)
-
def shutdown @cluster.disconnect end