# frozen_string_literal: truebeginrequire'http/2'rescueLoadError;endrequire'securerandom'moduleSeahorsemoduleClient# @api privatemoduleH2NETWORK_ERRORS=[SocketError,EOFError,IOError,Timeout::Error,Errno::ECONNABORTED,Errno::ECONNRESET,Errno::EPIPE,Errno::EINVAL,Errno::ETIMEDOUT,OpenSSL::SSL::SSLError,Errno::EHOSTUNREACH,Errno::ECONNREFUSED,# OpenSSL::SSL::SSLErrorWaitReadable]# @api privateDNS_ERROR_MESSAGES=['getaddrinfo: nodename nor servname provided, or not known',# MacOS'getaddrinfo: Name or service not known'# GNU]classHandler<Client::Handlerdefcall(context)span_wrapper(context){_call(context)}endprivatedef_call(context)stream=nilbeginconn=context.client.connectionstream=conn.new_streamstream_mutex=Mutex.newclose_condition=ConditionVariable.newsync_queue=Queue.newconn.connect(context.http_request.endpoint)_register_callbacks(context.http_response,stream,stream_mutex,close_condition,sync_queue)conn.debug_output("sending initial request ...")ifinput_emitter=context[:input_event_emitter]_send_initial_headers(context.http_request,stream)# prepare for sending events laterinput_emitter.stream=stream# request sigv4 serves as the initial #prior_signatureinput_emitter.encoder.prior_signature=context.http_request.headers['authorization'].split('Signature=').lastinput_emitter.validate_event=context.config.validate_paramselse_send_initial_headers(context.http_request,stream)_send_initial_data(context.http_request,stream)endconn.start(stream)rescue*NETWORK_ERRORS=>errorerror=NetworkingError.new(error,error_message(context.http_request,error))context.http_response.signal_error(error)rescue=>errorconn.debug_output(error.inspect)# not retryablecontext.http_response.signal_error(error)endAsyncResponse.new(context: context,stream: stream,stream_mutex: stream_mutex,close_condition: close_condition,sync_queue: sync_queue)enddef_register_callbacks(resp,stream,stream_mutex,close_condition,sync_queue)stream.on(:headers)do|headers|resp.signal_headers(headers)endstream.on(:data)do|data|resp.signal_data(data)endstream.on(:close)doresp.signal_done# block until #wait is ready for signal# else deadlock may happen because #signal happened# eariler than #wait (see AsyncResponse#wait)sync_queue.popstream_mutex.synchronize{close_condition.signal}endenddef_send_initial_headers(req,stream)beginheaders=_h2_headers(req)stream.headers(headers,end_stream: false)rescue=>eraiseHttp2InitialRequestError.new(e)endenddef_send_initial_data(req,stream)begindata=req.body.readstream.data(data,end_stream: true)rescue=>eraiseHttp2InitialRequestError.new(e)enddataend# H2 pseudo headers# https://http2.github.io/http2-spec/#rfc.section.8.1.2.3def_h2_headers(req)headers={}headers[':authority']=req.endpoint.hostheaders[':method']=req.http_method.upcaseheaders[':scheme']=req.endpoint.schemeheaders[':path']=req.endpoint.path.empty??'/':req.endpoint.pathifreq.endpoint.query&&!req.endpoint.query.empty?headers[':path']+="?#{req.endpoint.query}"endreq.headers.each{|k,v|headers[k.downcase]=v}headersenddeferror_message(req,error)iferror.is_a?(SocketError)&&DNS_ERROR_MESSAGES.include?(error.message)host=req.endpoint.host"unable to connect to `#{host}`; SocketError: #{error.message}"elseerror.messageendenddefspan_wrapper(context,&block)context.tracer.in_span('Handler.H2',attributes: Aws::Telemetry.http_request_attrs(context),&block)endendendendend