class Kafka::FetchOperation

def execute

def execute
  @cluster.add_target_topics(@topics.keys)
  @cluster.refresh_metadata_if_necessary!
  topics_by_broker = {}
  if @topics.none? {|topic, partitions| partitions.any? }
    raise NoPartitionsToFetchFrom
  end
  @topics.each do |topic, partitions|
    partitions.each do |partition, options|
      broker = @cluster.get_leader(topic, partition)
      topics_by_broker[broker] ||= {}
      topics_by_broker[broker][topic] ||= {}
      topics_by_broker[broker][topic][partition] = options
    end
  end
  topics_by_broker.flat_map {|broker, topics|
    resolve_offsets(broker, topics)
    options = {
      max_wait_time: @max_wait_time * 1000, # Kafka expects ms, not secs
      min_bytes: @min_bytes,
      max_bytes: @max_bytes,
      topics: topics,
    }
    response = broker.fetch_messages(**options)
    response.topics.flat_map {|fetched_topic|
      fetched_topic.partitions.map {|fetched_partition|
        begin
          Protocol.handle_error(fetched_partition.error_code)
        rescue Kafka::OffsetOutOfRange => e
          e.topic = fetched_topic.name
          e.partition = fetched_partition.partition
          e.offset = topics.fetch(e.topic).fetch(e.partition).fetch(:fetch_offset)
          raise e
        rescue Kafka::Error => e
          topic = fetched_topic.name
          partition = fetched_partition.partition
          @logger.error "Failed to fetch from #{topic}/#{partition}: #{e.message}"
          raise e
        end
        messages = fetched_partition.messages.map {|message|
          FetchedMessage.new(
            message: message,
            topic: fetched_topic.name,
            partition: fetched_partition.partition,
          )
        }
        FetchedBatch.new(
          topic: fetched_topic.name,
          partition: fetched_partition.partition,
          highwater_mark_offset: fetched_partition.highwater_mark_offset,
          messages: messages,
        )
      }
    }
  }
rescue Kafka::ConnectionError, Kafka::LeaderNotAvailable, Kafka::NotLeaderForPartition
  @cluster.mark_as_stale!
  raise
end