class Kafka::FetchOperation
def execute
def execute @cluster.add_target_topics(@topics.keys) @cluster.refresh_metadata_if_necessary! topics_by_broker = {} if @topics.none? {|topic, partitions| partitions.any? } raise NoPartitionsToFetchFrom end @topics.each do |topic, partitions| partitions.each do |partition, options| broker = @cluster.get_leader(topic, partition) topics_by_broker[broker] ||= {} topics_by_broker[broker][topic] ||= {} topics_by_broker[broker][topic][partition] = options end end topics_by_broker.flat_map {|broker, topics| resolve_offsets(broker, topics) options = { max_wait_time: @max_wait_time * 1000, # Kafka expects ms, not secs min_bytes: @min_bytes, max_bytes: @max_bytes, topics: topics, } response = broker.fetch_messages(**options) response.topics.flat_map {|fetched_topic| fetched_topic.partitions.map {|fetched_partition| begin Protocol.handle_error(fetched_partition.error_code) rescue Kafka::OffsetOutOfRange => e e.topic = fetched_topic.name e.partition = fetched_partition.partition e.offset = topics.fetch(e.topic).fetch(e.partition).fetch(:fetch_offset) raise e rescue Kafka::Error => e topic = fetched_topic.name partition = fetched_partition.partition @logger.error "Failed to fetch from #{topic}/#{partition}: #{e.message}" raise e end messages = fetched_partition.messages.map {|message| FetchedMessage.new( message: message, topic: fetched_topic.name, partition: fetched_partition.partition, ) } FetchedBatch.new( topic: fetched_topic.name, partition: fetched_partition.partition, highwater_mark_offset: fetched_partition.highwater_mark_offset, messages: messages, ) } } } rescue Kafka::ConnectionError, Kafka::LeaderNotAvailable, Kafka::NotLeaderForPartition @cluster.mark_as_stale! raise end