lib/async/http/protocol/http2/stream.rb



# Copyright, 2018, by Samuel G. D. Williams. <http://www.codeotaku.com>
# 
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# 
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
# 
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.

require 'protocol/http2/stream'
require_relative '../../body/writable'

module Async
	module HTTP
		module Protocol
			module HTTP2
				class Stream < ::Protocol::HTTP2::Stream
					class Input < Body::Writable
						def initialize(stream, length)
							super(length)
							
							@stream = stream
						end
						
						def read
							if chunk = super
								# If we read a chunk fron the stream, we want to extend the window if required so more data will be provided.
								@stream.request_window_update
							end
							
							return chunk
						end
					end
					
					class Output
						def initialize(stream, body, task: Task.current)
							@stream = stream
							
							@body = body
							@remainder = nil
							
							@window_updated = Async::Condition.new
							
							@task = task.async(&self.method(:passthrough))
						end
						
						# Reads chunks from the given body and writes them to the stream as fast as possible.
						def passthrough(task)
							while chunk = self.read
								maximum_size = @stream.available_frame_size
								
								while maximum_size <= 0
									@window_updated.wait
									
									maximum_size = @stream.available_frame_size
								end
								
								self.send_data(chunk, maximum_size)
							end
							
							self.end_stream
						rescue Async::Stop
							# Ignore.
						ensure
							@body&.close($!)
							@body = nil
						end
						
						def read
							if @remainder
								remainder = @remainder
								@remainder = nil
								
								return remainder
							else
								@body&.read
							end
						end
						
						def push(chunk)
							@remainder = chunk
						end
						
						# Send `maximum_size` bytes of data using the specified `stream`. If the buffer has no more chunks, `END_STREAM` will be sent on the final chunk.
						# @param maximum_size [Integer] send up to this many bytes of data.
						# @param stream [Stream] the stream to use for sending data frames.
						def send_data(chunk, maximum_size)
							if chunk.bytesize <= maximum_size
								@stream.send_data(chunk, maximum_size: maximum_size)
							else
								@stream.send_data(chunk.byteslice(0, maximum_size), maximum_size: maximum_size)
								
								# The window was not big enough to send all the data, so we save it for next time:
								self.push(
									chunk.byteslice(maximum_size, chunk.bytesize - maximum_size)
								)
							end
						end
						
						def end_stream
							@stream.send_data(nil, ::Protocol::HTTP2::END_STREAM)
						end
						
						def window_updated(size)
							@window_updated.signal
						end
						
						def close(error)
							if @body
								@body.close(error)
								@body = nil
							end
							
							@task&.stop
						end
					end
					
					def initialize(*)
						super
						
						@headers = nil
						@trailers = nil
						
						# Input buffer, reading request body, or response body (receive_data):
						@length = nil
						@input = nil
						
						# Output buffer, writing request body or response body (window_updated):
						@output = nil
					end
					
					attr_accessor :headers
					
					def add_header(key, value)
						if key == CONNECTION
							raise ::Protocol::HTTP2::HeaderError, "Connection header is not allowed!"
						elsif key.start_with? ':'
							raise ::Protocol::HTTP2::HeaderError, "Invalid pseudo-header #{key}!"
						elsif key =~ /[A-Z]/
							raise ::Protocol::HTTP2::HeaderError, "Invalid upper-case characters in header #{key}!"
						else
							@headers.add(key, value)
						end
					end
					
					def add_trailer(key, value)
						if @trailers.include(key)
							add_header(key, value)
						else
							raise ::Protocol::HTTP2::HeaderError, "Cannot add trailer #{key} as it was not specified in trailers!"
						end
					end
					
					def receive_trailing_headers(headers, end_stream)
						headers.each do |key, value|
							add_trailer(key, value)
						end
					end
					
					def receive_headers(frame)
						if @headers.nil?
							@headers = ::Protocol::HTTP::Headers.new
							self.receive_initial_headers(super, frame.end_stream?)
							@trailers = @headers[TRAILERS]
						elsif @trailers and frame.end_stream?
							self.receive_trailing_headers(super, frame.end_stream?)
						else
							raise ::Protocol::HTTP2::HeaderError, "Unable to process headers!"
						end
					rescue ::Protocol::HTTP2::HeaderError => error
						Async.logger.error(self, error)
						
						send_reset_stream(error.code)
					end
					
					def prepare_input(length)
						if @input.nil?
							@input = Input.new(self, length)
						else
							raise ArgumentError, "Input body already prepared!"
						end
					end
					
					def update_local_window(frame)
						consume_local_window(frame)
						
						# This is done on demand.
						# request_window_update
					end
					
					def process_data(frame)
						data = frame.unpack
						
						if @input
							unless data.empty?
								@input.write(data)
							end
							
							if frame.end_stream?
								@input.close
								@input = nil
							end
						end
						
						return data
					rescue ::Protocol::HTTP2::ProtocolError
						raise
					rescue # Anything else...
						send_reset_stream(::Protocol::HTTP2::Error::INTERNAL_ERROR)
					end
					
					# Set the body and begin sending it.
					def send_body(body)
						@output = Output.new(self, body)
					end
					
					def window_updated(size)
						super
						
						@output&.window_updated(size)
					end
					
					def close(error = nil)
						super
						
						if @input
							@input.close(error)
							@input = nil
						end
						
						if @output
							@output.close(error)
							@output = nil
						end
					end
				end
			end
		end
	end
end