# Copyright, 2018, by Samuel G. D. Williams. <http://www.codeotaku.com># # Permission is hereby granted, free of charge, to any person obtaining a copy# of this software and associated documentation files (the "Software"), to deal# in the Software without restriction, including without limitation the rights# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell# copies of the Software, and to permit persons to whom the Software is# furnished to do so, subject to the following conditions:# # The above copyright notice and this permission notice shall be included in# all copies or substantial portions of the Software.# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN# THE SOFTWARE.require_relative'../request'require_relative'../response'require'async/notification'require'http/2'moduleAsyncmoduleHTTPmoduleProtocol# A server that supports both HTTP1.0 and HTTP1.1 semantics by detecting the version of the request.classHTTP2defself.client(stream)self.new(::HTTP2::Client.new,stream)enddefself.server(stream)self.new(::HTTP2::Server.new,stream)endHTTPS='https'.freezeSCHEME=':scheme'.freezeMETHOD=':method'.freezePATH=':path'.freezeAUTHORITY=':authority'.freezeREASON=':reason'.freezeSTATUS=':status'.freezeVERSION='HTTP/2.0'.freezedefinitialize(controller,stream)@controller=controller@stream=stream@controller.on(:frame)do|data|@stream.write(data)@stream.flushend@controller.on(:frame_sent)do|frame|Async.logger.debug(self){"Sent frame: #{frame.inspect}"}end@controller.on(:frame_received)do|frame|Async.logger.debug(self){"Received frame: #{frame.inspect}"}endif@controller.is_a?::HTTP2::Client@controller.send_connection_prefaceend@reader=read_in_backgroundend# Multiple requests can be processed at the same time.defmultiplex@controller.remote_settings[:settings_max_concurrent_streams]enddefreusable?@reader.alive?enddefread_in_background(task: Task.current)task.asyncdo|nested_task|buffer=Async::IO::BinaryString.newwhiledata=@stream.io.read(1024*8,buffer)@controller<<dataendAsync.logger.debug(self){"Connection reset by peer!"}endenddefcloseAsync.logger.debug(self){"Closing connection"}@reader.stop@stream.closeenddefreceive_requests(task: Task.current,&block)# emits new streams opened by the client@controller.on(:stream)do|stream|request=Request.newrequest.version=VERSIONrequest.headers={}request.body=Body.newstream.on(:headers)do|headers|headers.eachdo|key,value|ifkey==METHODrequest.method=valueelsifkey==PATHrequest.path=valueelsifkey==AUTHORITYrequest.authority=valueelserequest.headers[key]=valueendendendstream.on(:data)do|chunk|request.body.write(chunk.to_s)unlesschunk.empty?endstream.on(:half_close)doresponse=yieldrequestrequest.body.close# send responseheaders={STATUS=>response[0].to_s}headers.update(response[1])stream.headers(headers,end_stream: false)response[2].eachdo|chunk|stream.data(chunk,end_stream: false)endstream.data("",end_stream: true)endend@reader.waitendRESPONSE_VERSION='HTTP/2'.freezedefsend_request(authority,method,path,headers={},body=nil)stream=@controller.new_streaminternal_headers={SCHEME=>HTTPS,METHOD=>method,PATH=>path,AUTHORITY=>authority,}.merge(headers)stream.headers(internal_headers,end_stream: body.nil?)ifbodybody.eachdo|chunk|stream.data(chunk,end_stream: false)endstream.data("",end_stream: true)endfinished=Async::Notification.newresponse=Response.newresponse.version=RESPONSE_VERSIONresponse.headers={}response.body=Body.newstream.on(:headers)do|headers|# Async.logger.debug(self) {"Stream headers: #{headers.inspect}"}headers.eachdo|key,value|ifkey==STATUSresponse.status=value.to_ielsifkey==REASONresponse.reason=valueelseresponse.headers[key]=valueendendfinished.signalendstream.on(:data)do|chunk|Async.logger.debug(self){"Stream data: #{chunk.inspect}"}response.body.write(chunk.to_s)unlesschunk.empty?endstream.on(:half_close)doAsync.logger.debug(self){"Stream half-closed."}endstream.on(:close)doAsync.logger.debug(self){"Stream closed, sending signal."}response.body.closeend@stream.flush# Async.logger.debug(self) {"Stream flushed, waiting for signal."}finished.wait# Async.logger.debug(self) {"Stream finished: #{response.inspect}"}returnresponseendendendendend