# Copyright, 2018, by Samuel G. D. Williams. <http://www.codeotaku.com># # Permission is hereby granted, free of charge, to any person obtaining a copy# of this software and associated documentation files (the "Software"), to deal# in the Software without restriction, including without limitation the rights# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell# copies of the Software, and to permit persons to whom the Software is# furnished to do so, subject to the following conditions:# # The above copyright notice and this permission notice shall be included in# all copies or substantial portions of the Software.# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN# THE SOFTWARE.require_relative'request'require_relative'response'require'async/notification'require'http/2'moduleAsyncmoduleHTTPmoduleProtocol# A server that supports both HTTP1.0 and HTTP1.1 semantics by detecting the version of the request.classHTTP2defself.client(stream)self.new(::HTTP2::Client.new,stream)enddefself.server(stream)self.new(::HTTP2::Server.new,stream)enddefinitialize(controller,stream)@controller=controller@stream=stream@controller.on(:frame)do|data|@stream.write(data)@stream.flushend# @controller.on(:frame_sent) do |frame|# Async.logger.debug(self) {"Sent frame: #{frame.inspect}"}# end# # @controller.on(:frame_received) do |frame|# Async.logger.debug(self) {"Received frame: #{frame.inspect}"}# endif@controller.is_a?::HTTP2::Client@controller.send_connection_preface@reader=read_in_backgroundendend# Multiple requests can be processed at the same time.defmultiplex@controller.remote_settings[:settings_max_concurrent_streams]enddefreusable?@reader.alive?enddefread_in_background(task: Task.current)task.asyncdo|nested_task|whiletrueifdata=@stream.io.read(10)# Async.logger.debug(self) {"Reading data: #{data.size} bytes"}@controller<<dataelseAsync.logger.debug(self){"Connection reset by peer!"}breakendendendenddefcloseAsync.logger.debug(self){"Closing connection"}@reader.stop@stream.closeenddefreceive_requests(&block)# emits new streams opened by the client@controller.on(:stream)do|stream|request=Request.newrequest.version="HTTP/2.0"request.headers={}# stream.on(:active) { } # fires when stream transitions to open state# stream.on(:close) { } # stream is closed by client and serverstream.on(:headers)do|headers|headers.eachdo|key,value|ifkey==':method'request.method=valueelsifkey==':path'request.path=valueelserequest.headers[key]=valueendendendstream.on(:data)do|body|request.body=bodyendstream.on(:half_close)doresponse=yieldrequest# send responsestream.headers(':status'=>response[0].to_s)stream.headers(response[1])unlessresponse[1].empty?response[2].eachdo|chunk|stream.data(chunk,end_stream: false)endstream.data("",end_stream: true)endendwhiledata=@stream.io.read(1024)@controller<<dataendenddefsend_request(method,path,headers={},body=nil)stream=@controller.new_streaminternal_headers={':scheme'=>'https',':method'=>method,':path'=>path,}.merge(headers)stream.headers(internal_headers,end_stream: true)# if body# body.each do |chunk|# stream.data(chunk, end_stream: false)# end# # stream.data("", end_stream: true)# endresponse=Response.newresponse.version="HTTP/2"response.headers={}response.body=Async::IO::BinaryString.newstream.on(:headers)do|headers|# Async.logger.debug(self) {"Stream headers: #{headers.inspect}"}headers.eachdo|key,value|ifkey==':status'response.status=value.to_ielsifkey==':reason'response.reason=valueelseresponse.headers[key]=valueendendendstream.on(:data)do|body|# Async.logger.debug(self) {"Stream data: #{body.size} bytes"}response.body<<bodyendfinished=Async::Notification.newstream.on(:half_close)do# Async.logger.debug(self) {"Stream half-closed."}endstream.on(:close)do# Async.logger.debug(self) {"Stream closed, sending signal."}finished.signalend@stream.flush# Async.logger.debug(self) {"Stream flushed, waiting for signal."}finished.wait# Async.logger.debug(self) {"Stream finished: #{response.inspect}"}returnresponseendendendendend