lib/httpx/pool.rb



# frozen_string_literal: true

require "forwardable"
require "httpx/selector"
require "httpx/connection"
require "httpx/resolver"

module HTTPX
  class Pool
    using ArrayExtensions::FilterMap
    extend Forwardable

    def_delegator :@timers, :after

    def initialize
      @resolvers = {}
      @timers = Timers.new
      @selector = Selector.new
      @connections = []
      @connected_connections = 0
    end

    def empty?
      @connections.empty?
    end

    def next_tick
      catch(:jump_tick) do
        timeout = next_timeout
        if timeout && timeout.negative?
          @timers.fire
          throw(:jump_tick)
        end

        begin
          @selector.select(timeout, &:call)
          @timers.fire
        rescue TimeoutError => e
          @timers.fire(e)
        end
      end
    rescue StandardError => e
      @connections.each do |connection|
        connection.emit(:error, e)
      end
    rescue Exception # rubocop:disable Lint/RescueException
      @connections.each(&:reset)
      raise
    end

    def close(connections = @connections)
      return if connections.empty?

      @timers.cancel
      connections = connections.reject(&:inflight?)
      connections.each(&:close)
      next_tick until connections.none? { |c| c.state != :idle && @connections.include?(c) }

      # close resolvers
      outstanding_connections = @connections
      resolver_connections = @resolvers.each_value.flat_map(&:connections).compact
      outstanding_connections -= resolver_connections

      return unless outstanding_connections.empty?

      @resolvers.each_value do |resolver|
        resolver.close unless resolver.closed?
      end
      # for https resolver
      resolver_connections.each(&:close)
      next_tick until resolver_connections.none? { |c| c.state != :idle && @connections.include?(c) }
    end

    def init_connection(connection, _options)
      resolve_connection(connection) unless connection.family
      connection.timers = @timers
      connection.on(:open) do
        @connected_connections += 1
      end
      connection.on(:activate) do
        select_connection(connection)
      end
      connection.on(:close) do
        unregister_connection(connection)
      end
    end

    def deactivate(connections)
      connections.each do |connection|
        connection.deactivate
        deselect_connection(connection) if connection.state == :inactive
      end
    end

    # opens a connection to the IP reachable through +uri+.
    # Many hostnames are reachable through the same IP, so we try to
    # maximize pipelining by opening as few connections as possible.
    #
    def find_connection(uri, options)
      @connections.find do |connection|
        connection.match?(uri, options)
      end
    end

    private

    def resolve_connection(connection)
      @connections << connection unless @connections.include?(connection)

      if connection.addresses || connection.open?
        #
        # there are two cases in which we want to activate initialization of
        # connection immediately:
        #
        # 1. when the connection already has addresses, i.e. it doesn't need to
        #    resolve a name (not the same as name being an IP, yet)
        # 2. when the connection is initialized with an external already open IO.
        #
        connection.once(:connect_error, &connection.method(:handle_error))
        on_resolver_connection(connection)
        return
      end

      find_resolver_for(connection) do |resolver|
        resolver << try_clone_connection(connection, resolver.family)
        next if resolver.empty?

        select_connection(resolver)
      end
    end

    def try_clone_connection(connection, family)
      connection.family ||= family

      return connection if connection.family == family

      new_connection = connection.class.new(connection.type, connection.origin, connection.options)
      new_connection.family = family

      connection.once(:tcp_open) { new_connection.force_reset }
      connection.once(:connect_error) do |err|
        if new_connection.connecting?
          new_connection.merge(connection)
          connection.force_reset
        else
          connection.__send__(:handle_error, err)
        end
      end

      new_connection.once(:tcp_open) do |new_conn|
        if new_conn != connection
          new_conn.merge(connection)
          connection.force_reset
        end
      end
      new_connection.once(:connect_error) do |err|
        if connection.connecting?
          # main connection has the requests
          connection.merge(new_connection)
          new_connection.force_reset
        else
          new_connection.__send__(:handle_error, err)
        end
      end

      init_connection(new_connection, connection.options)
      new_connection
    end

    def on_resolver_connection(connection)
      @connections << connection unless @connections.include?(connection)
      found_connection = @connections.find do |ch|
        ch != connection && ch.mergeable?(connection)
      end
      return register_connection(connection) unless found_connection

      if found_connection.open?
        coalesce_connections(found_connection, connection)
        throw(:coalesced, found_connection) unless @connections.include?(connection)
      else
        found_connection.once(:open) do
          coalesce_connections(found_connection, connection)
        end
      end
    end

    def on_resolver_error(connection, error)
      return connection.emit(:connect_error, error) if connection.connecting? && connection.callbacks_for?(:connect_error)

      connection.emit(:error, error)
    end

    def on_resolver_close(resolver)
      resolver_type = resolver.class
      return if resolver.closed?

      @resolvers.delete(resolver_type)

      deselect_connection(resolver)
      resolver.close unless resolver.closed?
    end

    def register_connection(connection)
      if connection.state == :open
        # if open, an IO was passed upstream, therefore
        # consider it connected already.
        @connected_connections += 1
      end
      select_connection(connection)
      connection.on(:close) do
        unregister_connection(connection)
      end
    end

    def unregister_connection(connection)
      @connections.delete(connection)
      @connected_connections -= 1 if deselect_connection(connection)
    end

    def select_connection(connection)
      @selector.register(connection)
    end

    def deselect_connection(connection)
      @selector.deregister(connection)
    end

    def coalesce_connections(conn1, conn2)
      return register_connection(conn2) unless conn1.coalescable?(conn2)

      conn2.emit(:tcp_open, conn1)
      conn1.merge(conn2)
      @connections.delete(conn2)
    end

    def next_timeout
      [
        @timers.wait_interval,
        *@resolvers.values.reject(&:closed?).filter_map(&:timeout),
        *@connections.filter_map(&:timeout),
      ].compact.min
    end

    def find_resolver_for(connection)
      connection_options = connection.options
      resolver_type = connection_options.resolver_class
      resolver_type = Resolver.resolver_for(resolver_type)

      @resolvers[resolver_type] ||= begin
        resolver_manager = if resolver_type.multi?
          Resolver::Multi.new(resolver_type, connection_options)
        else
          resolver_type.new(connection_options)
        end
        resolver_manager.on(:resolve, &method(:on_resolver_connection))
        resolver_manager.on(:error, &method(:on_resolver_error))
        resolver_manager.on(:close, &method(:on_resolver_close))
        resolver_manager
      end

      manager = @resolvers[resolver_type]

      (manager.is_a?(Resolver::Multi) && manager.early_resolve(connection)) || manager.resolvers.each do |resolver|
        resolver.pool = self
        yield resolver
      end

      manager
    end
  end
end