# frozen_string_literal: true# Released under the MIT License.# Copyright, 2017-2024, by Samuel Williams.# Copyright, 2022, by Ian Ker-Seymer.require"io/endpoint"require"async/pool/controller"require"protocol/http/body/completable"require"protocol/http/methods"require"traces/provider"require_relative"protocol"moduleAsyncmoduleHTTPDEFAULT_RETRIES=3classClient<::Protocol::HTTP::Methods# Provides a robust interface to a server.# * If there are no connections, it will create one.# * If there are already connections, it will reuse it.# * If a request fails, it will retry it up to N times if it was idempotent.# The client object will never become unusable. It internally manages persistent connections (or non-persistent connections if that's required).# @param endpoint [Endpoint] the endpoint to connnect to.# @param protocol [Protocol::HTTP1 | Protocol::HTTP2 | Protocol::HTTPS] the protocol to use.# @param scheme [String] The default scheme to set to requests.# @param authority [String] The default authority to set to requests.definitialize(endpoint,protocol: endpoint.protocol,scheme: endpoint.scheme,authority: endpoint.authority,retries: DEFAULT_RETRIES,**options)@endpoint=endpoint@protocol=protocol@retries=retries@pool=make_pool(**options)@scheme=scheme@authority=authorityenddefas_json(...){endpoint: @endpoint.to_s,protocol: @protocol,retries: @retries,scheme: @scheme,authority: @authority,}enddefto_json(...)as_json.to_json(...)endattr:endpointattr:protocolattr:retriesattr:poolattr:schemeattr:authoritydefsecure?@endpoint.secure?enddefself.open(*arguments,**options,&block)client=self.new(*arguments,**options)returnclientunlessblock_given?beginyieldclientensureclient.closeendenddefclosewhile@pool.busy?Console.logger.warn(self){"Waiting for #{@protocol} pool to drain: #{@pool}"}@pool.waitend@pool.closeenddefcall(request)request.scheme||=self.schemerequest.authority||=self.authorityattempt=0# We may retry the request if it is possible to do so. https://tools.ietf.org/html/draft-nottingham-httpbis-retry-01 is a good guide for how retrying requests should work.beginattempt+=1# As we cache pool, it's possible these pool go bad (e.g. closed by remote host). In this case, we need to try again. It's up to the caller to impose a timeout on this. If this is the last attempt, we force a new connection.connection=@pool.acquireresponse=make_response(request,connection,attempt)# This signals that the ensure block below should not try to release the connection, because it's bound into the response which will be returned:connection=nilreturnresponserescueProtocol::RequestFailed# This is a specific case where the entire request wasn't sent before a failure occurred. So, we can even resend non-idempotent requests.ifconnection@pool.release(connection)connection=nilendifattempt<@retriesretryelseraiseendrescueSocketError,IOError,EOFError,Errno::ECONNRESET,Errno::EPIPEifconnection@pool.release(connection)connection=nilendifrequest.idempotent?andattempt<@retriesretryelseraiseendensureifconnection@pool.release(connection)endendenddefinspect"#<#{self.class} authority=#{@authority.inspect}>"endprotecteddefmake_response(request,connection,attempt)response=request.call(connection)response.pool=@poolreturnresponseenddefassign_default_tags(tags)tags[:endpoint]=@endpoint.to_stags[:protocol]=@protocol.to_senddefmake_pool(**options)ifconnection_limit=options.delete(:connection_limit)warn"The connection_limit: option is deprecated, please use limit: instead.",uplevel: 2options[:limit]=connection_limitendself.assign_default_tags(options[:tags]||={})Async::Pool::Controller.wrap(**options)doConsole.logger.debug(self){"Making connection to #{@endpoint.inspect}"}@protocol.client(@endpoint.connect)endendTraces::Provider(self)dodefcall(request)attributes={'http.method':request.method,'http.authority':request.authority||self.authority,'http.scheme':request.scheme||self.scheme,'http.path':request.path,}ifprotocol=request.protocolattributes["http.protocol"]=protocolendiflength=request.body&.lengthattributes["http.request.length"]=lengthendTraces.trace("async.http.client.call",attributes: attributes)do|span|ifcontext=Traces.trace_contextrequest.headers["traceparent"]=context.to_s# request.headers['tracestate'] = context.stateendsuper.tapdo|response|ifversion=response&.versionspan["http.version"]=versionendifstatus=response&.statusspan["http.status_code"]=statusendiflength=response.body&.lengthspan["http.response.length"]=lengthendendendenddefmake_response(request,connection,attempt)attributes={attempt: attempt,}Traces.trace("async.http.client.make_response",attributes: attributes)dosuperendendendendendend