class Kafka::Producer
end
producer.shutdown
producer.send_messages
# Make sure to send any remaining messages.
ensure
end
producer.send_messages if index % 10 == 0
# Send messages for every 10 lines.
producer.produce(line, topic: topic)
$stdin.each_with_index do |line, index|
begin
producer = kafka.get_producer
)
logger: logger,
client_id: “simple-producer”,
seed_brokers: brokers,
kafka = Kafka.new(
topic = “random-messages”
# cluster to auto-create topics.
# Make sure to create this topic in your Kafka cluster or configure the
brokers = ENV.fetch(“KAFKA_BROKERS”).split(“,”)
logger = Logger.new($stderr)
require “kafka”
to Kafka:
This is an example of an application which reads lines from stdin and writes them
## Example
We do this for as long as ‘max_retries` permits.
not, we do another round of requests, this time with just the remaining messages.
After this, we check if the buffer is empty. If it is, we’re all done. If it’s
from the buffer – otherwise, we log the error and keep the messages in the buffer.
write to a given partition was successful, we clear the corresponding messages
response and inspect the error code for each partition that we wrote to. If the
request to each broker. A request can be a partial success, so we go through the
group the buffered messages by the broker they need to be sent to and fire off a
for all topics/partitions. Whenever we want to send messages to the cluster, we
The design of the error handling is based on having a {MessageBuffer} hold messages
## Error Handling and Retries
message delays depends on your use case.
try to avoid sending messages after every write. The tradeoff between throughput and
Buffering messages and sending them in batches greatly improves performance, so
to periodically call {#send_messages} or set ‘max_buffer_size` to an appropriate value.
buffer has reached this size will result in a BufferOverflow exception. Make sure
a maximum buffer size (default is 1,000 messages) and writing messages after the
The producer buffers pending messages until {#send_messages} is called. Note that there is
## Buffering
you can pass in.
`logger` options to `#get_producer`. See {#initialize} for the list of other options
different producers. This also means that you don’t need to pass the ‘broker_pool` and
This is done in order to share a logger as well as a pool of broker connections across
producer = kafka.get_producer
# Will instantiate Kafka::Producer
kafka = Kafka.new(…)
# Will instantiate Kafka::Client
do it for you, e.g.
Typically you won’t instantiate this class yourself, but rather have {Kafka::Client}
Allows sending messages to a Kafka cluster.
def buffer_size
-
(Integer)
- buffer size.
def buffer_size @buffer.size end
def initialize(broker_pool:, logger:, ack_timeout: 5, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000)
-
max_buffer_size
(Integer
) -- the number of messages allowed in the buffer -
retry_backoff
(Integer
) -- the number of seconds to wait between retries. -
max_retries
(Integer
) -- the number of retries that should be attempted -
required_acks
(Integer
) -- The number of replicas that must acknowledge -
ack_timeout
(Integer
) -- The number of seconds a broker can wait for -
logger
(Logger
) -- the logger that should be used. Typically passed -
broker_pool
(BrokerPool
) -- the broker pool representing the cluster.
def initialize(broker_pool:, logger:, ack_timeout: 5, required_acks: 1, max_retries: 2, retry_backoff: 1, max_buffer_size: 1000) @broker_pool = broker_pool @logger = logger @required_acks = required_acks @ack_timeout = ack_timeout @max_retries = max_retries @retry_backoff = retry_backoff @max_buffer_size = max_buffer_size @buffer = MessageBuffer.new end
def produce(value, key: nil, topic:, partition: nil, partition_key: nil)
-
(nil)
-
Raises:
-
(BufferOverflow)
- if the maximum buffer size has been reached.
Parameters:
-
partition_key
(String
) -- the key that should be used to assign a partition. -
partition
(Integer
) -- the partition that the message should be written to. -
topic
(String
) -- the topic that the message should be written to. -
key
(String
) -- the message key. -
value
(String
) -- the message data.
def produce(value, key: nil, topic:, partition: nil, partition_key: nil) unless buffer_size < @max_buffer_size raise BufferOverflow, "Max buffer size #{@max_buffer_size} exceeded" end if partition.nil? # If no explicit partition key is specified we use the message key instead. partition_key ||= key partitioner = Partitioner.new(@broker_pool.partitions_for(topic)) partition = partitioner.partition_for_key(partition_key) end message = Protocol::Message.new(key: key, value: value) @buffer.write(message, topic: topic, partition: partition) partition end
def send_messages
-
(nil)
-
Raises:
-
(FailedToSendMessages)
- if not all messages could be successfully sent.
def send_messages attempt = 0 transmission = Transmission.new( broker_pool: @broker_pool, buffer: @buffer, required_acks: @required_acks, ack_timeout: @ack_timeout, logger: @logger, ) loop do @logger.info "Sending #{@buffer.size} messages" attempt += 1 transmission.send_messages if @buffer.empty? @logger.info "Successfully transmitted all messages" break elsif attempt <= @max_retries @logger.warn "Failed to transmit all messages, retry #{attempt} of #{@max_retries}" @logger.info "Waiting #{@retry_backoff}s before retrying" sleep @retry_backoff else @logger.error "Failed to transmit all messages; keeping remaining messages in buffer" break end end if @required_acks == 0 # No response is returned by the brokers, so we can't know which messages # have been successfully written. Our only option is to assume that they all # have. @buffer.clear end unless @buffer.empty? partitions = @buffer.map {|topic, partition, _| "#{topic}/#{partition}" }.join(", ") raise FailedToSendMessages, "Failed to send messages to #{partitions}" end end
def shutdown
-
(nil)
-
def shutdown @broker_pool.shutdown end