class ProcessExecuter::MonitoredPipe


@api public
File.read(“pipe_data.txt”) #=> “Hello World”
pipe_data_string.string #=> “Hello World”
end
pipe.close
ensure
pipe.write(“Hello World”)
pipe = MonitoredPipe.new(pipe_data_string, pipe_data_file)
begin
pipe_data_file = File.open(“pipe_data.txt”, “w”)
pipe_data_string = StringIO.new
@example Collect pipe data into a string AND a file
pipe_data.string #=> “Hello World”
end
pipe.close
ensure
pipe.write(“Hello World”)
pipe = MonitoredPipe.new(pipe_data)
begin
pipe_data = StringIO.new
@example Collect pipe data into a string
killed.
read from the pipe and written to the writers, and (3) the monitoring thread is
`#close` must be called to ensure that (1) the pipe is closed, (2) all data is
pipe will be closed, and the exception will be saved in ‘#exception`.
If any of the writers raise an exception, the monitoring thread will exit, the
`#initialize`.
Data that is read from that pipe is written one or more writers passed to
a thread is created to read data written to the pipe.
When a new MonitoredPipe is created, a pipe is created (via IO.pipe) and
Stream data sent through a pipe to one or more writers

def close

Returns:
  • (void) -
def close
  return unless state == :open
  @state = :closing
  sleep 0.001 until state == :closed
end

def close_pipe

Other tags:
    Api: - private

Returns:
  • (void) -
def close_pipe
  # Close the write end of the pipe so no more data can be written to it
  pipe_writer.close
  # Read remaining data from pipe_reader (if any)
  # If an exception was already raised by the last call to #write, then don't try to read remaining data
  monitor_pipe while exception.nil? && !pipe_reader.eof?
  # Close the read end of the pipe
  pipe_reader.close
end

def file_descriptor?(writer) = writer.is_a?(Integer) || writer.is_a?(Symbol)

Other tags:
    Api: - private

Returns:
  • (Boolean) - true if the writer is a file descriptor

Parameters:
  • writer (#write) -- the writer to check
def file_descriptor?(writer) = writer.is_a?(Integer) || writer.is_a?(Symbol)

def fileno

Other tags:
    Api: - private

Returns:
  • (Integer) - the file descriptor for the write end of the pipe
def fileno
  pipe_writer.fileno
end

def initialize(*writers, chunk_size: 100_000)

Parameters:
  • chunk_size (Integer) -- the size of the chunks to read from the pipe
  • writers (Array<#write>) -- as data is read from the pipe, it is written to these writers
def initialize(*writers, chunk_size: 100_000)
  @writers = writers
  @chunk_size = chunk_size
  @pipe_reader, @pipe_writer = IO.pipe
  @state = :open
  @thread = Thread.new do
    Thread.current.report_on_exception = false
    Thread.current.abort_on_exception = false
    monitor
  end
end

def monitor

Other tags:
    Api: - private

Returns:
  • (void) -
def monitor
  monitor_pipe until state == :closing
  close_pipe
  @state = :closed
end

def monitor_pipe

Other tags:
    Api: - private

Returns:
  • (void) -
def monitor_pipe
  new_data = pipe_reader.read_nonblock(chunk_size)
  write_data(new_data)
rescue IO::WaitReadable
  pipe_reader.wait_readable(0.001)
end

def to_io

Other tags:
    Api: - private

Returns:
  • (IO) - the write end of the pipe
def to_io
  pipe_writer
end

def write(data)

Other tags:
    Api: - private

Returns:
  • (Integer) - the number of bytes written to the pipe

Parameters:
  • data (String) -- the data to write to the pipe
def write(data)
  raise IOError, 'closed stream' unless state == :open
  pipe_writer.write(data)
end

def write_data(data)

Other tags:
    Api: - private

Returns:
  • (void) -

Parameters:
  • data (String) -- the data read from the pipe
def write_data(data)
  writers.each do |w|
    file_descriptor?(w) ? write_data_to_fd(w, data) : w.write(data)
  end
rescue StandardError => e
  @exception = e
  @state = :closing
end

def write_data_to_fd(file_descriptor, data)

Other tags:
    Api: - private

Returns:
  • (void) -

Parameters:
  • data (String) -- the data to write
  • file_descriptor (Integer, Symbol) -- the file descriptor to write to (either an integer or :out or :err)
def write_data_to_fd(file_descriptor, data)
  # The case line is not marked as not covered only when using TruffleRuby
  # :nocov:
  case file_descriptor
  # :nocov:
  when :out, 1
    $stdout.write data
  when :err, 2
    $stderr.write data
  else
    io = IO.open(file_descriptor, mode: 'a', autoclose: false)
    io.write(data)
    io.close
  end
end