class Kafka::Consumer
end
puts message.offset
puts message.value
puts message.key
puts message.partition
puts message.topic
consumer.each_message do |message|
# subscribed to.
# Loop forever, reading in messages from all topics that have been
consumer.subscribe(“messages”)
# Subscribe to a Kafka topic:
consumer = kafka.consumer(group_id: “my-group”)
# Create a new Consumer instance in the group ‘my-group`:
kafka = Kafka.new([“kafka1:9092”, “kafka2:9092”])
require “kafka”
console.
A simple producer that simply writes the messages it consumes to the
## Example
all members have some partitions to read from.
making sure that all partitions are assigned to a single member, and that
partitions. When group members join or leave, the group synchronizes,
same *group id* then agree on who should read from the individual topic
A Consumer subscribes to one or more Kafka topics; all consumers with the
other clients.
A client that consumes messages from a Kafka cluster in coordination with
def commit_offsets
def commit_offsets @offset_manager.commit_offsets end
def consumer_loop
def consumer_loop @running = true @fetcher.start while @running begin @instrumenter.instrument("loop.consumer") do yield end rescue HeartbeatError, OffsetCommitError join_group rescue RebalanceInProgress @logger.warn "Group rebalance in progress, re-joining..." join_group rescue FetchError, NotLeaderForPartition, UnknownTopicOrPartition @cluster.mark_as_stale! rescue LeaderNotAvailable => e @logger.error "Leader not available; waiting 1s before retrying" @cluster.mark_as_stale! sleep 1 rescue ConnectionError => e @logger.error "Connection error #{e.class}: #{e.message}" @cluster.mark_as_stale! rescue SignalException => e @logger.warn "Received signal #{e.message}, shutting down" @running = false end end ensure @fetcher.stop # In order to quickly have the consumer group re-balance itself, it's # important that members explicitly tell Kafka when they're leaving. make_final_offsets_commit! @group.leave rescue nil @running = false end
def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true)
-
(nil)
-
Other tags:
- Yieldparam: batch - a message batch fetched from Kafka.
Parameters:
-
automatically_mark_as_processed
(Boolean
) -- whether to automatically -
max_wait_time
(Integer, Float
) -- the maximum duration of time to wait before -
max_bytes
(Integer
) -- the maximum number of bytes to read before -
min_bytes
(Integer
) -- the minimum number of bytes to read before
def each_batch(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true) @fetcher.configure( min_bytes: min_bytes, max_bytes: max_bytes, max_wait_time: max_wait_time, ) consumer_loop do batches = fetch_batches batches.each do |batch| unless batch.empty? notification = { topic: batch.topic, partition: batch.partition, last_offset: batch.last_offset, offset_lag: batch.offset_lag, highwater_mark_offset: batch.highwater_mark_offset, message_count: batch.messages.count, } # Instrument an event immediately so that subscribers don't have to wait until # the block is completed. @instrumenter.instrument("start_process_batch.consumer", notification) @instrumenter.instrument("process_batch.consumer", notification) do begin yield batch @current_offsets[batch.topic][batch.partition] = batch.last_offset rescue => e offset_range = (batch.first_offset..batch.last_offset) location = "#{batch.topic}/#{batch.partition} in offset range #{offset_range}" backtrace = e.backtrace.join("\n") @logger.error "Exception raised when processing #{location} -- #{e.class}: #{e}\n#{backtrace}" raise ProcessingError.new(batch.topic, batch.partition, offset_range) end end mark_message_as_processed(batch.messages.last) if automatically_mark_as_processed # We've successfully processed a batch from the partition, so we can clear # the pause. pause_for(batch.topic, batch.partition).reset! end @offset_manager.commit_offsets_if_necessary @heartbeat.send_if_necessary return if !@running end # We may not have received any messages, but it's still a good idea to # commit offsets if we've processed messages in the last set of batches. # This also ensures the offsets are retained if we haven't read any messages # since the offset retention period has elapsed. @offset_manager.commit_offsets_if_necessary end end
def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true)
-
(nil)
-
Raises:
-
(Kafka::ProcessingError)
- if there was an error processing a message.
Other tags:
- Yieldparam: message - a message fetched from Kafka.
Parameters:
-
automatically_mark_as_processed
(Boolean
) -- whether to automatically -
max_wait_time
(Integer, Float
) -- the maximum duration of time to wait before -
max_bytes
(Integer
) -- the maximum number of bytes to read before -
min_bytes
(Integer
) -- the minimum number of bytes to read before
def each_message(min_bytes: 1, max_bytes: 10485760, max_wait_time: 1, automatically_mark_as_processed: true) @fetcher.configure( min_bytes: min_bytes, max_bytes: max_bytes, max_wait_time: max_wait_time, ) consumer_loop do batches = fetch_batches batches.each do |batch| batch.messages.each do |message| notification = { topic: message.topic, partition: message.partition, offset: message.offset, offset_lag: batch.highwater_mark_offset - message.offset - 1, create_time: message.create_time, key: message.key, value: message.value, } # Instrument an event immediately so that subscribers don't have to wait until # the block is completed. @instrumenter.instrument("start_process_message.consumer", notification) @instrumenter.instrument("process_message.consumer", notification) do begin yield message @current_offsets[message.topic][message.partition] = message.offset rescue => e location = "#{message.topic}/#{message.partition} at offset #{message.offset}" backtrace = e.backtrace.join("\n") @logger.error "Exception raised when processing #{location} -- #{e.class}: #{e}\n#{backtrace}" raise ProcessingError.new(message.topic, message.partition, message.offset) end end mark_message_as_processed(message) if automatically_mark_as_processed @offset_manager.commit_offsets_if_necessary @heartbeat.send_if_necessary return if !@running end # We've successfully processed a batch from the partition, so we can clear # the pause. pause_for(batch.topic, batch.partition).reset! end # We may not have received any messages, but it's still a good idea to # commit offsets if we've processed messages in the last set of batches. # This also ensures the offsets are retained if we haven't read any messages # since the offset retention period has elapsed. @offset_manager.commit_offsets_if_necessary end end
def fetch_batches
def fetch_batches # Return early if the consumer has been stopped. return [] if !@running join_group unless @group.member? @heartbeat.send_if_necessary resume_paused_partitions! if !@fetcher.data? @logger.debug "No batches to process" sleep 2 [] else tag, message = @fetcher.poll case tag when :batches message when :exception raise message end end rescue OffsetOutOfRange => e @logger.error "Invalid offset #{e.offset} for #{e.topic}/#{e.partition}, resetting to default offset" @offset_manager.seek_to_default(e.topic, e.partition) retry rescue ConnectionError => e @logger.error "Connection error while fetching messages: #{e}" raise FetchError, e end
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:)
def initialize(cluster:, logger:, instrumenter:, group:, fetcher:, offset_manager:, session_timeout:, heartbeat:) @cluster = cluster @logger = logger @instrumenter = instrumenter @group = group @offset_manager = offset_manager @session_timeout = session_timeout @fetcher = fetcher @heartbeat = heartbeat @pauses = Hash.new {|h, k| h[k] = Hash.new {|h2, k2| h2[k2] = Pause.new } } # Whether or not the consumer is currently consuming messages. @running = false # Hash containing offsets for each topic and partition that has the # automatically_mark_as_processed feature disabled. Offset manager is only active # when everything is suppose to happen automatically. Otherwise we need to keep track of the # offset manually in memory for all the time # The key structure for this equals an array with topic and partition [topic, partition] # The value is equal to the offset of the last message we've received # @note It won't be updated in case user marks message as processed, because for the case # when user commits message other than last in a batch, this would make ruby-kafka refetch # some already consumed messages @current_offsets = Hash.new { |h, k| h[k] = {} } end
def join_group
def join_group old_generation_id = @group.generation_id @group.join if old_generation_id && @group.generation_id != old_generation_id + 1 # We've been out of the group for at least an entire generation, no # sense in trying to hold on to offset data @offset_manager.clear_offsets else # After rejoining the group we may have been assigned a new set of # partitions. Keeping the old offset commits around forever would risk # having the consumer go back and reprocess messages if it's assigned # a partition it used to be assigned to way back. For that reason, we # only keep commits for the partitions that we're still assigned. @offset_manager.clear_offsets_excluding(@group.assigned_partitions) end @fetcher.reset @group.assigned_partitions.each do |topic, partitions| partitions.each do |partition| if paused?(topic, partition) @logger.warn "Not fetching from #{topic}/#{partition} due to pause" else seek_to_next(topic, partition) end end end end
def make_final_offsets_commit!(attempts = 3)
def make_final_offsets_commit!(attempts = 3) @offset_manager.commit_offsets rescue ConnectionError, OffsetCommitError, EOFError # It's important to make sure final offsets commit is done # As otherwise messages that have been processed after last auto-commit # will be processed again and that may be huge amount of messages return if attempts.zero? @logger.error "Retrying to make final offsets commit (#{attempts} attempts left)" sleep(0.1) make_final_offsets_commit!(attempts - 1) rescue Kafka::Error => e @logger.error "Encountered error while shutting down; #{e.class}: #{e.message}" end
def mark_message_as_processed(message)
def mark_message_as_processed(message) @offset_manager.mark_as_processed(message.topic, message.partition, message.offset) end
def pause(topic, partition, timeout: nil, max_timeout: nil, exponential_backoff: false)
-
(nil)
-
Parameters:
-
exponential_backoff
(Boolean
) -- whether to enable exponential backoff. -
max_timeout
(nil, Integer
) -- the maximum number of seconds to pause for, -
timeout
(nil, Integer
) -- the number of seconds to pause the partition for, -
partition
(Integer
) -- -
topic
(String
) --
def pause(topic, partition, timeout: nil, max_timeout: nil, exponential_backoff: false) if max_timeout && !exponential_backoff raise ArgumentError, "`max_timeout` only makes sense when `exponential_backoff` is enabled" end pause_for(topic, partition).pause!( timeout: timeout, max_timeout: max_timeout, exponential_backoff: exponential_backoff, ) end
def pause_for(topic, partition)
def pause_for(topic, partition) @pauses[topic][partition] end
def paused?(topic, partition)
-
(Boolean)
- true if the partition is paused, false otherwise.
Parameters:
-
partition
(Integer
) -- -
topic
(String
) --
Other tags:
- See: #pause -
def paused?(topic, partition) pause_for(topic, partition).paused? end
def resume(topic, partition)
-
(nil)
-
Parameters:
-
partition
(Integer
) -- -
topic
(String
) --
Other tags:
- See: #pause -
def resume(topic, partition) pause_for(topic, partition).resume! seek_to_next(topic, partition) end
def resume_paused_partitions!
def resume_paused_partitions! @pauses.each do |topic, partitions| partitions.each do |partition, pause| @instrumenter.instrument("pause_status.consumer", { topic: topic, partition: partition, duration: pause.pause_duration, }) if pause.paused? && pause.expired? @logger.info "Automatically resuming partition #{topic}/#{partition}, pause timeout expired" resume(topic, partition) end end end end
def seek(topic, partition, offset)
-
(nil)
-
Parameters:
-
offset
(Integer
) -- -
partition
(Integer
) -- -
topic
(String
) --
def seek(topic, partition, offset) @offset_manager.seek_to(topic, partition, offset) end
def seek_to_next(topic, partition)
def seek_to_next(topic, partition) # When automatic marking is off, the first poll needs to be based on the last committed # offset from Kafka, that's why we fallback in case of nil (it may not be 0) if @current_offsets[topic].key?(partition) offset = @current_offsets[topic][partition] + 1 else offset = @offset_manager.next_offset_for(topic, partition) end @fetcher.seek(topic, partition, offset) end
def send_heartbeat_if_necessary
def send_heartbeat_if_necessary @heartbeat.send_if_necessary end
def stop
-
(nil)
-
def stop @running = false @cluster.disconnect end
def subscribe(topic, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576)
-
(nil)
-
Parameters:
-
max_bytes_per_partition
(Integer
) -- the maximum amount of data fetched -
start_from_beginning
(Boolean
) -- whether to start from the beginning -
default_offset
(Symbol
) -- whether to start from the beginning or the -
topic
(String
) -- the name of the topic to subscribe to.
def subscribe(topic, default_offset: nil, start_from_beginning: true, max_bytes_per_partition: 1048576) default_offset ||= start_from_beginning ? :earliest : :latest @group.subscribe(topic) @offset_manager.set_default_offset(topic, default_offset) @fetcher.subscribe(topic, max_bytes_per_partition: max_bytes_per_partition) nil end