# Copyright, 2018, by Samuel G. D. Williams. <http://www.codeotaku.com># # Permission is hereby granted, free of charge, to any person obtaining a copy# of this software and associated documentation files (the "Software"), to deal# in the Software without restriction, including without limitation the rights# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell# copies of the Software, and to permit persons to whom the Software is# furnished to do so, subject to the following conditions:# # The above copyright notice and this permission notice shall be included in# all copies or substantial portions of the Software.# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN# THE SOFTWARE.require_relative'../request'require_relative'../response'require_relative'../headers'require_relative'../body/writable'require'async/notification'require'http/2'moduleAsyncmoduleHTTPmoduleProtocol# A server that supports both HTTP1.0 and HTTP1.1 semantics by detecting the version of the request.classHTTP2defself.client(stream)self.new(::HTTP2::Client.new,stream)enddefself.server(stream)self.new(::HTTP2::Server.new,stream)endHTTPS='https'.freezeSCHEME=':scheme'.freezeMETHOD=':method'.freezePATH=':path'.freezeAUTHORITY=':authority'.freezeREASON=':reason'.freezeSTATUS=':status'.freezeVERSION='HTTP/2.0'.freezedefinitialize(controller,stream)@controller=controller@stream=stream@controller.on(:frame)do|data|@stream.write(data)@stream.flushend@controller.on(:frame_sent)do|frame|Async.logger.debug(self){"Sent frame: #{frame.inspect}"}end@controller.on(:frame_received)do|frame|Async.logger.debug(self){"Received frame: #{frame.inspect}"}end@count=0endattr:count# Multiple requests can be processed at the same time.defmultiplex@controller.remote_settings[:settings_max_concurrent_streams]enddefreusable?@reader.alive?enddefversionVERSIONenddefstart_connection@reader||=read_in_backgroundenddefread_in_background(task: Task.current)task.asyncdo|nested_task|nested_task.annotate("#{version} reading data")whilebuffer=@stream.read_partial@controller<<bufferendAsync.logger.debug(self){"Connection reset by peer!"}endenddefcloseAsync.logger.debug(self){"Closing connection"}@reader.stop@stream.closeenddefreceive_requests(task: Task.current,&block)# emits new streams opened by the client@controller.on(:stream)do|stream|@count+=1request=Request.newrequest.version=self.versionrequest.headers=Headers.newbody=Body::Writable.newrequest.body=bodystream.on(:headers)do|headers|headers.eachdo|key,value|ifkey==METHODrequest.method=valueelsifkey==PATHrequest.path=valueelsifkey==AUTHORITYrequest.authority=valueelserequest.headers[key]=valueendendendstream.on(:data)do|chunk|# puts "Got request data: #{chunk.inspect}"body.write(chunk.to_s)unlesschunk.empty?endstream.on(:half_close)do# puts "Generating response..."response=yieldrequest# puts "Finishing body..."body.finish# puts "Sending response..."# send responseheaders={STATUS=>response.status.to_s}headers.update(response.headers)# puts "Sending headers #{headers}"ifresponse.body.nil?orresponse.body.empty?stream.headers(headers,end_stream: true)response.body.readifresponse.bodyelsestream.headers(headers,end_stream: false)# puts "Streaming body..."response.body.eachdo|chunk|# puts "Sending chunk #{chunk.inspect}"stream.data(chunk,end_stream: false)end# puts "Ending stream..."stream.data("",end_stream: true)endendendstart_connection@reader.waitenddefcall(request)@count+=1request.version||=self.versionstream=@controller.new_streamheaders={SCHEME=>HTTPS,METHOD=>request.method.to_s,PATH=>request.path.to_s,AUTHORITY=>request.authority.to_s,}.merge(request.headers)finished=Async::Notification.newresponse=Response.newresponse.version=self.versionresponse.headers={}body=Body::Writable.newresponse.body=bodystream.on(:headers)do|headers|headers.eachdo|key,value|ifkey==STATUSresponse.status=value.to_ielsifkey==REASONresponse.reason=valueelseresponse.headers[key]=valueendendfinished.signalendstream.on(:data)do|chunk|body.write(chunk.to_s)unlesschunk.empty?endstream.on(:close)dobody.finishendifrequest.body.nil?orrequest.body.empty?stream.headers(headers,end_stream: true)request.body.readifrequest.bodyelsestream.headers(headers,end_stream: false)request.body.eachdo|chunk|stream.data(chunk,end_stream: false)endstream.data("",end_stream: true)endstart_connection@stream.flush# Async.logger.debug(self) {"Stream flushed, waiting for signal."}finished.wait# Async.logger.debug(self) {"Stream finished: #{response.inspect}"}returnresponseendendendendend