class Anthropic::Internal::Transport::PooledNetRequester
@api private
def build_request(request, &blk)
-
(Array(Net::HTTPGenericRequest, Proc))
-
Other tags:
- Yieldparam: -
Parameters:
-
blk
(Proc
) -- -
request
(Hash{Symbol=>Object}
) -- .
Options Hash:
(**request)
-
:headers
(Hash{String=>String}
) -- -
:url
(URI::Generic
) -- -
:method
(Symbol
) --
Other tags:
- Api: - private
def build_request(request, &blk) method, url, headers, body = request.fetch_values(:method, :url, :headers, :body) req = Net::HTTPGenericRequest.new( method.to_s.upcase, !body.nil?, method != :head, URI(url.to_s) # ensure we construct a URI class of the right scheme ) headers.each { req[_1] = _2 } case body in nil nil in String req["content-length"] ||= body.bytesize.to_s unless req["transfer-encoding"] req.body_stream = Anthropic::Internal::Util::ReadIOAdapter.new(body, &blk) in StringIO req["content-length"] ||= body.size.to_s unless req["transfer-encoding"] req.body_stream = Anthropic::Internal::Util::ReadIOAdapter.new(body, &blk) in Pathname | IO | Enumerator req["transfer-encoding"] ||= "chunked" unless req["content-length"] req.body_stream = Anthropic::Internal::Util::ReadIOAdapter.new(body, &blk) end [req, req.body_stream&.method(:close)] end
def calibrate_socket_timeout(conn, deadline)
-
deadline
(Float
) -- -
conn
(Net::HTTP
) --
Other tags:
- Api: - private
def calibrate_socket_timeout(conn, deadline) timeout = deadline - Anthropic::Internal::Util.monotonic_secs conn.open_timeout = conn.read_timeout = conn.write_timeout = conn.continue_timeout = timeout end
def connect(url)
-
(Net::HTTP)
-
Parameters:
-
url
(URI::Generic
) --
Other tags:
- Api: - private
def connect(url) port = case [url.port, url.scheme] in [Integer, _] url.port in [nil, "http" | "ws"] Net::HTTP.http_default_port in [nil, "https" | "wss"] Net::HTTP.https_default_port end Net::HTTP.new(url.host, port).tap do _1.use_ssl = %w[https wss].include?(url.scheme) _1.max_retries = 0 end end
def execute(request)
-
(Array(Integer, Net::HTTPResponse, Enumerable
-))
Options Hash:
(**request)
-
:deadline
(Float
) -- -
:body
(Object
) -- -
:headers
(Hash{String=>String}
) -- -
:url
(URI::Generic
) -- -
:method
(Symbol
) --
Parameters:
-
request
(Hash{Symbol=>Object}
) -- .
Other tags:
- Api: - private
def execute(request) url, deadline = request.fetch_values(:url, :deadline) req = nil eof = false finished = false closing = nil # rubocop:disable Metrics/BlockLength enum = Enumerator.new do |y| with_pool(url, deadline: deadline) do |conn| next if finished req, closing = self.class.build_request(request) do self.class.calibrate_socket_timeout(conn, deadline) end self.class.calibrate_socket_timeout(conn, deadline) unless conn.started? conn.keep_alive_timeout = self.class::KEEP_ALIVE_TIMEOUT conn.start end self.class.calibrate_socket_timeout(conn, deadline) conn.request(req) do |rsp| y << [conn, req, rsp] break if finished rsp.read_body do |bytes| y << bytes.force_encoding(Encoding::BINARY) break if finished self.class.calibrate_socket_timeout(conn, deadline) end eof = true end end rescue Timeout::Error raise Anthropic::Errors::APITimeoutError.new(url: url, request: req) rescue StandardError raise Anthropic::Errors::APIConnectionError.new(url: url, request: req) end # rubocop:enable Metrics/BlockLength conn, _, response = enum.next body = Anthropic::Internal::Util.fused_enum(enum, external: true) do finished = true tap do enum.next rescue StopIteration nil end ensure conn.finish if !eof && conn&.started? closing&.call end [Integer(response.code), response, body] end
def initialize(size: self.class::DEFAULT_MAX_CONNECTIONS)
-
size
(Integer
) --
Other tags:
- Api: - private
def initialize(size: self.class::DEFAULT_MAX_CONNECTIONS) @mutex = Mutex.new @size = size @pools = {} end
def with_pool(url, deadline:, &blk)
- Yieldparam: -
Raises:
-
(Timeout::Error)
-
Parameters:
-
blk
(Proc
) -- -
deadline
(Float
) -- -
url
(URI::Generic
) --
Other tags:
- Api: - private
def with_pool(url, deadline:, &blk) = Anthropic::Internal::Util.uri_origin(url) t = deadline - Anthropic::Internal::Util.monotonic_secs ex.synchronize do ools[origin] ||= ConnectionPool.new(size: @size) do self.class.connect(url) d ith(timeout: timeout, &blk)