class Async::HTTP::Body::Pipe
def close
def close @reader&.stop @writer&.stop @tail.close end
def initialize(input, output = Writable.new, task: Task.current)
def initialize(input, output = Writable.new, task: Task.current) @input = input @output = output head, tail = IO::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM) @head = IO::Stream.new(head) @tail = tail @reader = nil @writer = nil task.async(&self.method(:reader)) task.async(&self.method(:writer)) end
def reader(task)
def reader(task) @reader = task task.annotate "#{self.class} reader." while chunk = @input.read @head.write(chunk) @head.flush end @head.close_write ensure @reader = nil @input.close($!) @head.close if @writer.nil? end
def to_io
def to_io @tail end
def writer(task)
Read from the head of the pipe and write to the @output stream.
def writer(task) @writer = task task.annotate "#{self.class} writer." while chunk = @head.read_partial @output.write(chunk) end ensure @writer = nil @output.close($!) @head.close if @reader.nil? end