lib/kafka/fetch_operation.rb
# frozen_string_literal: true
require "kafka/fetched_batch"
module Kafka
# Fetches messages from one or more partitions.
#
# operation = Kafka::FetchOperation.new(
# cluster: cluster,
# logger: logger,
# min_bytes: 1,
# max_wait_time: 10,
# )
#
# # These calls will schedule fetches from the specified topics/partitions.
# operation.fetch_from_partition("greetings", 42, offset: :latest, max_bytes: 100000)
# operation.fetch_from_partition("goodbyes", 13, offset: :latest, max_bytes: 100000)
#
# operation.execute
#
class FetchOperation
def initialize(cluster:, logger:, min_bytes: 1, max_bytes: 10485760, max_wait_time: 5)
@cluster = cluster
@logger = logger
@min_bytes = min_bytes
@max_bytes = max_bytes
@max_wait_time = max_wait_time
@topics = {}
end
def fetch_from_partition(topic, partition, offset: :latest, max_bytes: 1048576)
if offset == :earliest
offset = -2
elsif offset == :latest
offset = -1
end
@topics[topic] ||= {}
@topics[topic][partition] = {
fetch_offset: offset,
max_bytes: max_bytes,
}
end
def execute
@cluster.add_target_topics(@topics.keys)
@cluster.refresh_metadata_if_necessary!
topics_by_broker = {}
if @topics.none? {|topic, partitions| partitions.any? }
raise NoPartitionsToFetchFrom
end
@topics.each do |topic, partitions|
partitions.each do |partition, options|
broker = @cluster.get_leader(topic, partition)
topics_by_broker[broker] ||= {}
topics_by_broker[broker][topic] ||= {}
topics_by_broker[broker][topic][partition] = options
end
end
topics_by_broker.flat_map {|broker, topics|
resolve_offsets(broker, topics)
options = {
max_wait_time: @max_wait_time * 1000, # Kafka expects ms, not secs
min_bytes: @min_bytes,
max_bytes: @max_bytes,
topics: topics,
}
response = broker.fetch_messages(**options)
response.topics.flat_map {|fetched_topic|
fetched_topic.partitions.map {|fetched_partition|
begin
Protocol.handle_error(fetched_partition.error_code)
rescue Kafka::OffsetOutOfRange => e
e.topic = fetched_topic.name
e.partition = fetched_partition.partition
e.offset = topics.fetch(e.topic).fetch(e.partition).fetch(:fetch_offset)
raise e
rescue Kafka::Error => e
topic = fetched_topic.name
partition = fetched_partition.partition
@logger.error "Failed to fetch from #{topic}/#{partition}: #{e.message}"
raise e
end
messages = fetched_partition.messages.map {|message|
FetchedMessage.new(
message: message,
topic: fetched_topic.name,
partition: fetched_partition.partition,
)
}
FetchedBatch.new(
topic: fetched_topic.name,
partition: fetched_partition.partition,
highwater_mark_offset: fetched_partition.highwater_mark_offset,
messages: messages,
)
}
}
}
rescue Kafka::ConnectionError, Kafka::LeaderNotAvailable, Kafka::NotLeaderForPartition
@cluster.mark_as_stale!
raise
end
private
def resolve_offsets(broker, topics)
pending_topics = {}
topics.each do |topic, partitions|
partitions.each do |partition, options|
offset = options.fetch(:fetch_offset)
next if offset >= 0
@logger.debug "Resolving offset `#{offset}` for #{topic}/#{partition}..."
pending_topics[topic] ||= []
pending_topics[topic] << {
partition: partition,
time: offset,
max_offsets: 1,
}
end
end
return topics if pending_topics.empty?
response = broker.list_offsets(topics: pending_topics)
pending_topics.each do |topic, partitions|
partitions.each do |options|
partition = options.fetch(:partition)
resolved_offset = response.offset_for(topic, partition)
@logger.debug "Offset for #{topic}/#{partition} is #{resolved_offset.inspect}"
topics[topic][partition][:fetch_offset] = resolved_offset || 0
end
end
end
end
end