class ProcessExecuter::MonitoredPipe


@api public
stdout_buffer.string #=> “Hello Worldn”
end
stdout_pipe.close
ensure
_waited_pid, status = Process.wait2(pid)
pid = Process.spawn(‘echo Hello World’, out: stdout_pipe)
stdout_pipe = ProcessExecuter::MonitoredPipe.new(stdout_buffer)
begin
stdout_buffer = StringIO.new
@example Using a MonitoredPipe with Process.spawn
File.read(“pipe_data.txt”) #=> “Hello World”
pipe_data_file.close
# It is your responsibility to close the file you opened
pipe_data_string.string #=> “Hello World”
end
pipe.close
ensure
pipe.write(“Hello World”)
pipe = ProcessExecuter::MonitoredPipe.new([:tee, pipe_data_string, pipe_data_file])
begin
pipe_data_file = File.open(“pipe_data.txt”, “w”)
pipe_data_string = StringIO.new
@example Collect pipe data into a string AND a file
pipe_data.string #=> “Hello World”
end
pipe.close
ensure
pipe.write(“Hello World”)
pipe = ProcessExecuter::MonitoredPipe.new(pipe_data)
begin
pipe_data = StringIO.new
@example Collect pipe data into a StringIO object
killed.
read from the pipe and written to the destination, and (3) the monitoring thread is
> ‘#close` must be called to ensure that (1) the pipe is closed, (2) all data is
>
> **⚠️ WARNING**
pipe will be closed, and the exception will be saved in `#exception`.
If the destination raises an exception, the monitoring thread will exit, the
it is written to the destination provided in the MonitoredPipe initializer.
a thread is created to read data written to the pipe. As data is read from the pipe,
When a new MonitoredPipe is created, a pipe is created (via IO.pipe) and
another tee or MonitoredPipe).
`destination` can be any value that `MonitoredPipe` itself supports (including
destination in the form `[:tee, destination1, destination2, …]`, where each
destinations simultaneously. This is achieved by providing a redirection
MonitoredPipe supports duplicating (or “teeing”) output to multiple
- **Multiple Destinations**
the command is running
- processing with a streaming parser to parse and process command output as
descriptor
- sending command output to custom logging objects that do not have a file
- capturing command output in in-memory buffers like `StringIO`
`#write` method. This is particularly useful for:
You can redirect subprocess output to any Ruby object that implements the
- **Arbitrary Writers**
supports these additional types of destinations:
In addition to the standard redirection destinations, {ProcessExecuter::MonitoredPipe} also
`Process.spawn`](docs.ruby-lang.org/en/3.4/Process.html#module-Process-label-File+Redirection+-28File+Descriptor-29).
Redirection section of
(this is the `value` part of the file redirection option described in [the File<br>(docs.ruby-lang.org/en/3.4/Process.html#method-c-spawn)
This class’s initializer accepts any redirection destination supported by
and methods derived from it within the ‘ProcessExecuter` module.<br>(docs.ruby-lang.org/en/3.4/Process.html#method-c-spawn)
options for
{ProcessExecuter::MonitoredPipe} was created to expand the output redirection
Acts as a pipe that writes the data written to it to one or more destinations

def assert_destination_is_compatible_with_monitored_pipe

Other tags:
    Api: - private

Raises:
  • (ArgumentError) - if the destination is not compatible with MonitoredPipe

Returns:
  • (void) -
def assert_destination_is_compatible_with_monitored_pipe
  return if destination.compatible_with_monitored_pipe?
  raise ArgumentError, "Destination #{destination.destination} is not compatible with MonitoredPipe"
end

def close

Returns:
  • (void) -
def close
  mutex.synchronize do
    if state == :open
      @state = :closing
      condition_variable.wait(mutex) while @state != :closed
    end
  end
  thread.join
  destination.close
  self.class.remove_open_instance(self)
end

def close_pipe

Other tags:
    Api: - private

Returns:
  • (void) -
def close_pipe
  # Close the write end of the pipe so no more data can be written to it
  pipe_writer.close
  # Read remaining data from pipe_reader (if any)
  # If an exception was already raised by the last call to #write, then don't try to read remaining data
  monitor_pipe while exception.nil? && !pipe_reader.eof?
  # Close the read end of the pipe
  pipe_reader.close
end

def fileno

Returns:
  • (Integer) - the file descriptor for the write end of the pipe
def fileno
  pipe_writer.fileno
end

def initialize(redirection_destination, chunk_size: 100_000)

Parameters:
  • chunk_size (Integer) -- the size of the chunks to read from the pipe
  • redirection_destination (Object) -- as data is read from the pipe,
def initialize(redirection_destination, chunk_size: 100_000)
  @destination = Destinations.factory(redirection_destination)
  assert_destination_is_compatible_with_monitored_pipe
  @mutex = Mutex.new
  @condition_variable = ConditionVariable.new
  @chunk_size = chunk_size
  @pipe_reader, @pipe_writer = IO.pipe
  # Set the encoding of the pipe reader to ASCII_8BIT. This is not strictly
  # necessary since read_nonblock always returns a String where encoding is
  # Encoding::ASCII_8BIT, but it is a good practice to explicitly set the
  # encoding.
  pipe_reader.set_encoding(Encoding::ASCII_8BIT)
  @state = :open
  @thread = start_monitoring_thread
  self.class.add_open_instance(self)
end

def monitor

Other tags:
    Api: - private

Returns:
  • (void) -
def monitor
  monitor_pipe until state == :closing
ensure
  close_pipe
  mutex.synchronize do
    @state = :closed
    condition_variable.signal
  end
end

def monitor_pipe

Other tags:
    Api: - private

Returns:
  • (void) -
def monitor_pipe
  # read_nonblock always returns a String where encoding is Encoding::ASCII_8BIT
  new_data = pipe_reader.read_nonblock(chunk_size)
  write_data(new_data)
rescue IO::WaitReadable
  pipe_reader.wait_readable(0.001)
end

def start_monitoring_thread

Other tags:
    Api: - private

Returns:
  • (void) -
def start_monitoring_thread
  Thread.new do
    Thread.current.report_on_exception = false
    Thread.current.abort_on_exception = false
    monitor
  end
end

def to_io

Returns:
  • (IO) - the write end of the pipe
def to_io
  pipe_writer
end

def write(data)

Raises:
  • (IOError) - if the pipe is not open

Returns:
  • (Integer) - the number of bytes written to the pipe

Parameters:
  • data (String) -- the data to write to the pipe
def write(data)
  mutex.synchronize do
    raise IOError, 'closed stream' unless state == :open
    pipe_writer.write(data)
  end
end

def write_data(data)

Other tags:
    Api: - private

Returns:
  • (void) -

Parameters:
  • data (String) -- the data read from the pipe
def write_data(data)
  destination.write(data)
rescue StandardError => e
  mutex.synchronize do
    @exception = e
    @state = :closing
  end
end