class Async::HTTP::Body::Pipe
def close
def close @reader&.stop @writer&.stop @tail.close end
def close_head
def close_head @head.close # Both tasks are done, don't keep references: @reader = nil @writer = nil end
def initialize(input, output = Writable.new, task: Task.current)
def initialize(input, output = Writable.new, task: Task.current) @input = input @output = output head, tail = ::Socket.pair(Socket::AF_UNIX, Socket::SOCK_STREAM) @head = ::IO::Stream(head) @tail = tail @reader = nil @writer = nil task.async(transient: true, &self.method(:reader)) task.async(transient: true, &self.method(:writer)) end
def reader(task)
def reader(task) @reader = task task.annotate "#{self.class} reader." while chunk = @input.read @head.write(chunk) @head.flush end @head.close_write rescue => error raise ensure @input.close(error) close_head if @writer&.finished? end
def to_io
def to_io @tail end
def writer(task)
Read from the head of the pipe and write to the @output stream.
def writer(task) @writer = task task.annotate "#{self.class} writer." while chunk = @head.read_partial @output.write(chunk) end rescue => error raise ensure @output.close_write(error) close_head if @reader&.finished? end