class Anthropic::Internal::Transport::PooledNetRequester

@api private

def build_request(request, &blk)

Returns:
  • (Array(Net::HTTPGenericRequest, Proc)) -

Other tags:
    Yieldparam: -

Parameters:
  • blk (Proc) --
  • request (Hash{Symbol=>Object}) -- .

Options Hash: (**request)
  • :headers (Hash{String=>String}) --
  • :url (URI::Generic) --
  • :method (Symbol) --

Other tags:
    Api: - private
def build_request(request, &blk)
  method, url, headers, body = request.fetch_values(:method, :url, :headers, :body)
  req = Net::HTTPGenericRequest.new(
    method.to_s.upcase,
    !body.nil?,
    method != :head,
    URI(url.to_s) # ensure we construct a URI class of the right scheme
  )
  headers.each { req[_1] = _2 }
  case body
  in nil
    nil
  in String
    req["content-length"] ||= body.bytesize.to_s unless req["transfer-encoding"]
    req.body_stream = Anthropic::Internal::Util::ReadIOAdapter.new(body, &blk)
  in StringIO
    req["content-length"] ||= body.size.to_s unless req["transfer-encoding"]
    req.body_stream = Anthropic::Internal::Util::ReadIOAdapter.new(body, &blk)
  in Pathname | IO | Enumerator
    req["transfer-encoding"] ||= "chunked" unless req["content-length"]
    req.body_stream = Anthropic::Internal::Util::ReadIOAdapter.new(body, &blk)
  end
  [req, req.body_stream&.method(:close)]
end

def calibrate_socket_timeout(conn, deadline)

Parameters:
  • deadline (Float) --
  • conn (Net::HTTP) --

Other tags:
    Api: - private
def calibrate_socket_timeout(conn, deadline)
  timeout = deadline - Anthropic::Internal::Util.monotonic_secs
  conn.open_timeout = conn.read_timeout = conn.write_timeout = conn.continue_timeout = timeout
end

def connect(url)

Returns:
  • (Net::HTTP) -

Parameters:
  • url (URI::Generic) --

Other tags:
    Api: - private
def connect(url)
  port =
    case [url.port, url.scheme]
    in [Integer, _]
      url.port
    in [nil, "http" | "ws"]
      Net::HTTP.http_default_port
    in [nil, "https" | "wss"]
      Net::HTTP.https_default_port
    end
  Net::HTTP.new(url.host, port).tap do
    _1.use_ssl = %w[https wss].include?(url.scheme)
    _1.max_retries = 0
  end
end

def execute(request)

Returns:
  • (Array(Integer, Net::HTTPResponse, Enumerable)) -

Options Hash: (**request)
  • :deadline (Float) --
  • :body (Object) --
  • :headers (Hash{String=>String}) --
  • :url (URI::Generic) --
  • :method (Symbol) --

Parameters:
  • request (Hash{Symbol=>Object}) -- .

Other tags:
    Api: - private
def execute(request)
  url, deadline = request.fetch_values(:url, :deadline)
  req = nil
  eof = false
  finished = false
  closing = nil
  # rubocop:disable Metrics/BlockLength
  enum = Enumerator.new do |y|
    with_pool(url, deadline: deadline) do |conn|
      next if finished
      req, closing = self.class.build_request(request) do
        self.class.calibrate_socket_timeout(conn, deadline)
      end
      self.class.calibrate_socket_timeout(conn, deadline)
      unless conn.started?
        conn.keep_alive_timeout = self.class::KEEP_ALIVE_TIMEOUT
        conn.start
      end
      self.class.calibrate_socket_timeout(conn, deadline)
      conn.request(req) do |rsp|
        y << [conn, req, rsp]
        break if finished
        rsp.read_body do |bytes|
          y << bytes.force_encoding(Encoding::BINARY)
          break if finished
          self.class.calibrate_socket_timeout(conn, deadline)
        end
        eof = true
      end
    end
  rescue Timeout::Error
    raise Anthropic::Errors::APITimeoutError.new(url: url, request: req)
  rescue StandardError
    raise Anthropic::Errors::APIConnectionError.new(url: url, request: req)
  end
  # rubocop:enable Metrics/BlockLength
  conn, _, response = enum.next
  body = Anthropic::Internal::Util.fused_enum(enum, external: true) do
    finished = true
    tap do
      enum.next
    rescue StopIteration
      nil
    end
  ensure
    conn.finish if !eof && conn&.started?
    closing&.call
  end
  [Integer(response.code), response, body]
end

def initialize(size: self.class::DEFAULT_MAX_CONNECTIONS)

Parameters:
  • size (Integer) --

Other tags:
    Api: - private
def initialize(size: self.class::DEFAULT_MAX_CONNECTIONS)
  @mutex = Mutex.new
  @size = size
  @pools = {}
end

def with_pool(url, deadline:, &blk)

Other tags:
    Yieldparam: -

Raises:
  • (Timeout::Error) -

Parameters:
  • blk (Proc) --
  • deadline (Float) --
  • url (URI::Generic) --

Other tags:
    Api: - private
def with_pool(url, deadline:, &blk)
 = Anthropic::Internal::Util.uri_origin(url)
t = deadline - Anthropic::Internal::Util.monotonic_secs

ex.synchronize do
ools[origin] ||= ConnectionPool.new(size: @size) do
self.class.connect(url)
d
ith(timeout: timeout, &blk)