class Anthropic::Internal::Transport::PooledNetRequester
@api private
def build_request(request, &blk)
-
(Array(Net::HTTPGenericRequest, Proc))-
Other tags:
- Yieldparam: -
Parameters:
-
blk(Proc) -- -
request(Hash{Symbol=>Object}) -- .
Options Hash:
(**request)-
:headers(Hash{String=>String}) -- -
:url(URI::Generic) -- -
:method(Symbol) --
Other tags:
- Api: - private
def build_request(request, &blk) method, url, headers, body = request.fetch_values(:method, :url, :headers, :body) req = Net::HTTPGenericRequest.new( method.to_s.upcase, !body.nil?, method != :head, url.to_s ) headers.each { req[_1] = _2 } case body in nil nil in String req["content-length"] ||= body.bytesize.to_s unless req["transfer-encoding"] req.body_stream = Anthropic::Internal::Util::ReadIOAdapter.new(body, &blk) in StringIO req["content-length"] ||= body.size.to_s unless req["transfer-encoding"] req.body_stream = Anthropic::Internal::Util::ReadIOAdapter.new(body, &blk) in Pathname | IO | Enumerator req["transfer-encoding"] ||= "chunked" unless req["content-length"] req.body_stream = Anthropic::Internal::Util::ReadIOAdapter.new(body, &blk) end [req, req.body_stream&.method(:close)] end
def calibrate_socket_timeout(conn, deadline)
-
deadline(Float) -- -
conn(Net::HTTP) --
Other tags:
- Api: - private
def calibrate_socket_timeout(conn, deadline) timeout = deadline - Anthropic::Internal::Util.monotonic_secs conn.open_timeout = conn.read_timeout = conn.write_timeout = conn.continue_timeout = timeout end
def connect(url)
-
(Net::HTTP)-
Parameters:
-
url(URI::Generic) --
Other tags:
- Api: - private
def connect(url) port = case [url.port, url.scheme] in [Integer, _] url.port in [nil, "http" | "ws"] Net::HTTP.http_default_port in [nil, "https" | "wss"] Net::HTTP.https_default_port end Net::HTTP.new(url.host, port).tap do _1.use_ssl = %w[https wss].include?(url.scheme) _1.max_retries = 0 end end
def execute(request)
-
(Array(Integer, Net::HTTPResponse, Enumerable-))
Options Hash:
(**request)-
:deadline(Float) -- -
:body(Object) -- -
:headers(Hash{String=>String}) -- -
:url(URI::Generic) -- -
:method(Symbol) --
Parameters:
-
request(Hash{Symbol=>Object}) -- .
Other tags:
- Api: - private
def execute(request) url, deadline = request.fetch_values(:url, :deadline) req = nil eof = false finished = false closing = nil # rubocop:disable Metrics/BlockLength enum = Enumerator.new do |y| with_pool(url, deadline: deadline) do |conn| next if finished req, closing = self.class.build_request(request) do self.class.calibrate_socket_timeout(conn, deadline) end self.class.calibrate_socket_timeout(conn, deadline) unless conn.started? conn.keep_alive_timeout = self.class::KEEP_ALIVE_TIMEOUT conn.start end self.class.calibrate_socket_timeout(conn, deadline) conn.request(req) do |rsp| y << [conn, req, rsp] break if finished rsp.read_body do |bytes| y << bytes break if finished self.class.calibrate_socket_timeout(conn, deadline) end eof = true end end rescue Timeout::Error raise Anthropic::Errors::APITimeoutError.new(url: url, request: req) rescue StandardError raise Anthropic::Errors::APIConnectionError.new(url: url, request: req) end # rubocop:enable Metrics/BlockLength conn, _, response = enum.next body = Anthropic::Internal::Util.fused_enum(enum, external: true) do finished = true tap do enum.next rescue StopIteration nil end ensure conn.finish if !eof && conn&.started? closing&.call end [Integer(response.code), response, (response.body = body)] end
def initialize(size: Etc.nprocessors)
-
size(Integer) --
Other tags:
- Api: - private
def initialize(size: Etc.nprocessors) @mutex = Mutex.new @size = size @pools = {} end
def with_pool(url, deadline:, &blk)
- Yieldparam: -
Raises:
-
(Timeout::Error)-
Parameters:
-
blk(Proc) -- -
deadline(Float) -- -
url(URI::Generic) --
Other tags:
- Api: - private
def with_pool(url, deadline:, &blk) = Anthropic::Internal::Util.uri_origin(url) t = deadline - Anthropic::Internal::Util.monotonic_secs ex.synchronize do ools[origin] ||= ConnectionPool.new(size: @size) do self.class.connect(url) d ith(timeout: timeout, &blk)