# Copyright, 2018, by Samuel G. D. Williams. <http://www.codeotaku.com># # Permission is hereby granted, free of charge, to any person obtaining a copy# of this software and associated documentation files (the "Software"), to deal# in the Software without restriction, including without limitation the rights# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell# copies of the Software, and to permit persons to whom the Software is# furnished to do so, subject to the following conditions:# # The above copyright notice and this permission notice shall be included in# all copies or substantial portions of the Software.# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN# THE SOFTWARE.require_relative'../request'require_relative'../response'require_relative'../headers'require_relative'../body/writable'require'async/notification'require'http/2'moduleAsyncmoduleHTTPmoduleProtocol# A server that supports both HTTP1.0 and HTTP1.1 semantics by detecting the version of the request.classHTTP2defself.client(stream)self.new(::HTTP2::Client.new,stream)enddefself.server(stream)self.new(::HTTP2::Server.new,stream)endHTTPS='https'.freezeSCHEME=':scheme'.freezeMETHOD=':method'.freezePATH=':path'.freezeAUTHORITY=':authority'.freezeREASON=':reason'.freezeSTATUS=':status'.freezeVERSION='HTTP/2.0'.freezedefinitialize(controller,stream)@controller=controller@stream=stream@controller.on(:frame)do|data|@stream.write(data)@stream.flushend@controller.on(:frame_sent)do|frame|Async.logger.debug(self){"Sent frame: #{frame.inspect}"}end@controller.on(:frame_received)do|frame|Async.logger.debug(self){"Received frame: #{frame.inspect}"}end@controller.on(:goaway)do|payload|Async.logger.error(self){"goaway: #{payload.inspect}"}@reader.stop@stream.closeend@count=0endattr:count# Multiple requests can be processed at the same time.defmultiplex@controller.remote_settings[:settings_max_concurrent_streams]end# Can we use this connection to make requests?defgood?@stream.connected?enddefreusable?!@stream.closed?enddefversionVERSIONenddefstart_connection@reader||=read_in_backgroundenddefread_in_background(task: Task.current)task.asyncdo|nested_task|nested_task.annotate("#{version} reading data")whilebuffer=@stream.read_partial@controller<<bufferendAsync.logger.debug(self){"Connection reset by peer!"}endenddefcloseAsync.logger.debug(self){"Closing connection"}@reader.stopif@reader@stream.closeenddefreceive_requests(task: Task.current,&block)# emits new streams opened by the client@controller.on(:stream)do|stream|request=Request.newrequest.version=self.versionrequest.headers=Headers.newbody=Body::Writable.newrequest.body=bodystream.on(:headers)do|headers|headers.eachdo|key,value|ifkey==METHODrequest.method=valueelsifkey==PATHrequest.path=valueelsifkey==AUTHORITYrequest.authority=valueelserequest.headers[key]=valueendendendstream.on(:data)do|chunk|# puts "Got request data: #{chunk.inspect}"body.write(chunk.to_s)unlesschunk.empty?endstream.on(:close)do|error|iferrorbody.stop(EOFError.new(error))endendstream.on(:half_close)do# The requirements for this to be in lock-step with other opertaions is minimal.# TODO consider putting this in it's own async task.begin# We are no longer receiving any more data frames:body.finish# Generate the response:response=yieldrequestheaders={STATUS=>response.status.to_s}headers.update(response.headers)ifresponse.body.nil?orresponse.body.empty?stream.headers(headers,end_stream: true)response.body.readifresponse.bodyelsestream.headers(headers,end_stream: false)response.body.eachdo|chunk|stream.data(chunk,end_stream: false)endstream.data("",end_stream: true)endrescueAsync.logger.error(self){$!}# Generating the response failed.stream.close(:internal_error)endendendstart_connection@reader.waitenddefcall(request)request.version||=self.versionstream=@controller.new_stream@count+=1headers={SCHEME=>HTTPS,METHOD=>request.method.to_s,PATH=>request.path.to_s,AUTHORITY=>request.authority.to_s,}.merge(request.headers)finished=Async::Notification.newexception=nilresponse=Response.newresponse.version=self.versionresponse.headers={}body=Body::Writable.newresponse.body=bodystream.on(:headers)do|headers|headers.eachdo|key,value|ifkey==STATUSresponse.status=value.to_ielsifkey==REASONresponse.reason=valueelseresponse.headers[key]=valueendend# At this point, we are now expecting two events: data and close.stream.on(:close)do|error|# If we receive close after this point, it's not a request error, but a failure we need to signal to the body.iferrorbody.stop(EOFError.new(error))elsebody.finishendendfinished.signalendstream.on(:data)do|chunk|body.write(chunk.to_s)unlesschunk.empty?endstream.on(:close)do|error|# The remote server has closed the connection while we were sending the request.iferrorexception=EOFError.new(error)finished.signalendendifrequest.body.nil?orrequest.body.empty?stream.headers(headers,end_stream: true)request.body.readifrequest.bodyelsebeginstream.headers(headers,end_stream: false)rescueraiseRequestFailed.newendrequest.body.eachdo|chunk|stream.data(chunk,end_stream: false)endstream.data("",end_stream: true)endstart_connection@stream.flushAsync.logger.debug(self){"Stream flushed, waiting for signal."}finished.waitifexceptionraiseexceptionendAsync.logger.debug(self){"Stream finished: #{response.inspect}"}returnresponseendendendendend