# Copyright, 2018, by Samuel G. D. Williams. <http://www.codeotaku.com># # Permission is hereby granted, free of charge, to any person obtaining a copy# of this software and associated documentation files (the "Software"), to deal# in the Software without restriction, including without limitation the rights# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell# copies of the Software, and to permit persons to whom the Software is# furnished to do so, subject to the following conditions:# # The above copyright notice and this permission notice shall be included in# all copies or substantial portions of the Software.# # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN# THE SOFTWARE.require'protocol/http2/stream'moduleAsyncmoduleHTTPmoduleProtocolmoduleHTTP2classStream<::Protocol::HTTP2::StreamclassBufferdefinitialize(stream,body,task: Task.current)@stream=stream@body=body@remainder=nil@window_updated=Async::Condition.new@task=task.async(&self.method(:passthrough))enddefpassthrough(task)whilechunk=self.readmaximum_size=@stream.available_frame_sizewhilemaximum_size<=0@window_updated.waitmaximum_size=@stream.available_frame_sizeendself.send_data(chunk,maximum_size)endself.end_streamrescueAsync::Stop# Ignore.ensure@body&.close($!)@body=nilenddefreadif@remainderremainder=@remainder@remainder=nilreturnremainderelse@body&.readendenddefpush(chunk)@remainder=chunkend# Send `maximum_size` bytes of data using the specified `stream`. If the buffer has no more chunks, `END_STREAM` will be sent on the final chunk.# @param maximum_size [Integer] send up to this many bytes of data.# @param stream [Stream] the stream to use for sending data frames.defsend_data(chunk,maximum_size)ifchunk.bytesize<=maximum_size@stream.send_data(chunk,maximum_size: maximum_size)else@stream.send_data(chunk.byteslice(0,maximum_size),maximum_size: maximum_size)# The window was not big enough to send all the data, so we save it for next time:self.push(chunk.byteslice(maximum_size,chunk.bytesize-maximum_size))endenddefend_stream@stream.send_data(nil,::Protocol::HTTP2::END_STREAM)enddefwindow_updated(size)@window_updated.signalenddefclose(error)if@body@body.close(error)@body=nilend@task&.stopendenddefinitialize(*)super@headers=nil@trailers=nil# Input buffer (receive_data):@length=nil@input=nil# Output buffer (window_updated):@output=nilendattr_accessor:headersdefadd_header(key,value)ifkey==CONNECTIONraise::Protocol::HTTP2::HeaderError,"Connection header is not allowed!"elsifkey.start_with?':'raise::Protocol::HTTP2::HeaderError,"Invalid pseudo-header #{key}!"elsifkey=~/[A-Z]/raise::Protocol::HTTP2::HeaderError,"Invalid upper-case characters in header #{key}!"else@headers.add(key,value)endenddefadd_trailer(key,value)if@trailers.include(key)add_header(key,value)elseraise::Protocol::HTTP2::HeaderError,"Cannot add trailer #{key} as it was not specified in trailers!"endenddefreceive_trailing_headers(headers,end_stream)headers.eachdo|key,value|add_trailer(key,value)endenddefreceive_headers(frame)if@headers.nil?@headers=::Protocol::HTTP::Headers.newself.receive_initial_headers(super,frame.end_stream?)@trailers=@headers[TRAILERS]elsif@trailersandframe.end_stream?self.receive_trailing_headers(super,frame.end_stream?)elseraise::Protocol::HTTP2::HeaderError,"Unable to process headers!"endrescue::Protocol::HTTP2::HeaderError=>errorAsync.logger.error(self,error)send_reset_stream(error.code)enddefprocess_data(frame)data=frame.unpackif@inputunlessdata.empty?@input.write(data)endifframe.end_stream?@input.close@input=nilendendreturndatarescue::Protocol::HTTP2::ProtocolErrorraiserescue# Anything else...send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR)end# Set the body and begin sending it.defsend_body(body)@output=Buffer.new(self,body)enddefwindow_updated(size)super@output&.window_updated(size)enddefclose(error=nil)superif@input@input.close(error)@input=nilendif@output@output.close(error)@output=nilendendendendendendend