class Parallel::Worker
def close_pipes
might be passed to started_processes and simultaneously closed by another thread
def close_pipes read.close unless read.closed? write.close unless write.closed? end
def initialize(read, write, pid)
def initialize(read, write, pid) @read = read @write = write @pid = pid end
def stop
def stop close_pipes wait # if it goes zombie, rather wait here to be able to debug end
def wait
def wait Process.wait(pid) rescue Interrupt # process died end
def work(data)
def work(data) begin Marshal.dump(data, write) rescue Errno::EPIPE raise DeadWorker end result = begin Marshal.load(read) rescue EOFError raise DeadWorker end raise result.exception if result.is_a?(ExceptionWrapper) result end