lib/mutant/parallel/connection.rb
# frozen_string_literal: true module Mutant module Parallel class Connection include Anima.new(:marshal, :reader, :writer) Error = Class.new(RuntimeError) HEADER_FORMAT = 'N' HEADER_SIZE = 4 MAX_BYTES = (2**32).pred class Reader include Anima.new(:deadline, :io, :marshal, :response_reader, :log_reader) private(*anima.attribute_names) private_class_method :new attr_reader :log def error Util.max_one(@errors) end def result Util.max_one(@results) end def initialize(*) super @buffer = +'' @log = +'' # Array of size max 1 as surrogate for # terrible default nil ivars. @errors = [] @lengths = [] @results = [] end def self.read_response(job:, **attributes) reader = new(**attributes).read_till_final Response.new( error: reader.error, job:, log: reader.log, result: reader.result ) end # rubocop:disable Metrics/MethodLength def read_till_final readers = [response_reader, log_reader] until !@results.empty? || error status = deadline.status break timeout unless status.ok? reads, _others = io.select(readers, nil, nil, status.time_left) break timeout unless reads reads.each do |ready| if ready.equal?(response_reader) advance_result else advance_log end end end self end # rubocop:enable Metrics/MethodLength private def timeout @errors << Timeout::Error end def advance_result if length if read_buffer(length) @results << marshal.load(@buffer) end elsif read_buffer(HEADER_SIZE) @lengths << Util.one(@buffer.unpack(HEADER_FORMAT)) @buffer = +'' end end def length Util.max_one(@lengths) end def advance_log with_nonblock_read(io: log_reader, max_bytes: 4096, &log.public_method(:<<)) end def read_buffer(max_bytes) with_nonblock_read( io: response_reader, max_bytes: max_bytes - @buffer.bytesize ) do |chunk| @buffer << chunk @buffer.bytesize.equal?(max_bytes) end end # rubocop:disable Metrics/MethodLength def with_nonblock_read(io:, max_bytes:) io.binmode chunk = io.read_nonblock(max_bytes, exception: false) case chunk when nil @errors << EOFError false when String yield chunk else fail "Unexpected nonblocking read return: #{chunk.inspect}" end end # rubocop:enable Metrics/MethodLength end class Frame include Anima.new(:io) def receive_value read(Util.one(read(HEADER_SIZE).unpack(HEADER_FORMAT))) end def send_value(body) bytesize = body.bytesize fail Error, 'message to big' if bytesize > MAX_BYTES io.binmode io.write([bytesize].pack(HEADER_FORMAT)) io.write(body) end private def read(bytes) io.binmode io.read(bytes) or fail Error, 'Unexpected EOF' end end def receive_value marshal.load(reader.receive_value) end def send_value(value) writer.send_value(marshal.dump(value)) self end def self.from_pipes(marshal:, reader:, writer:) new( marshal:, reader: Frame.new(io: reader.to_reader), writer: Frame.new(io: writer.to_writer) ) end end # Connection end # Parallel end # Mutant