class ProcessExecuter::MonitoredPipe
@api public
File.read(“pipe_data.txt”) #=> “Hello World”
pipe_data_string.string #=> “Hello World”
end
pipe.close
ensure
pipe.write(“Hello World”)
pipe = MonitoredPipe.new(pipe_data_string, pipe_data_file)
begin
pipe_data_file = File.open(“pipe_data.txt”, “w”)
pipe_data_string = StringIO.new
@example Collect pipe data into a string AND a file
pipe_data.string #=> “Hello World”
end
pipe.close
ensure
pipe.write(“Hello World”)
pipe = MonitoredPipe.new(pipe_data)
begin
pipe_data = StringIO.new
@example Collect pipe data into a string
killed.
read from the pipe and written to the destination, and (3) the monitoring thread is
`#close` must be called to ensure that (1) the pipe is closed, (2) all data is
pipe will be closed, and the exception will be saved in ‘#exception`.
If the destination raises an exception, the monitoring thread will exit, the
a thread is created to read data written to the pipe.
When a new MonitoredPipe is created, a pipe is created (via IO.pipe) and
Write data sent through a pipe to a destination
def assert_destination_is_compatible_with_monitored_pipe
- Api: - private
Raises:
-
(ArgumentError)- if the destination is not compatible with MonitoredPipe
Returns:
-
(void)-
def assert_destination_is_compatible_with_monitored_pipe return if destination.compatible_with_monitored_pipe? raise ArgumentError, "Destination #{destination.destination} is not compatible with MonitoredPipe" end
def close
-
(void)-
def close mutex.synchronize do return unless state == :open @state = :closing end mutex.synchronize do condition_variable.wait(mutex) while @state != :closed end thread.join destination.close end
def close_pipe
- Api: - private
Returns:
-
(void)-
def close_pipe # Close the write end of the pipe so no more data can be written to it pipe_writer.close # Read remaining data from pipe_reader (if any) # If an exception was already raised by the last call to #write, then don't try to read remaining data monitor_pipe while exception.nil? && !pipe_reader.eof? # Close the read end of the pipe pipe_reader.close end
def fileno
- Api: - private
Returns:
-
(Integer)- the file descriptor for the write end of the pipe
def fileno pipe_writer.fileno end
def initialize(redirection_destination, chunk_size: 100_000)
-
chunk_size(Integer) -- the size of the chunks to read from the pipe -
redirection_destination(Array<#write>) -- as data is read from the pipe,
def initialize(redirection_destination, chunk_size: 100_000) @destination = Destinations.factory(redirection_destination) assert_destination_is_compatible_with_monitored_pipe @mutex = Mutex.new @condition_variable = ConditionVariable.new @chunk_size = chunk_size @pipe_reader, @pipe_writer = IO.pipe @state = :open @thread = start_monitoring_thread end
def monitor
- Api: - private
Returns:
-
(void)-
def monitor monitor_pipe until state == :closing ensure close_pipe mutex.synchronize do @state = :closed condition_variable.signal end end
def monitor_pipe
- Api: - private
Returns:
-
(void)-
def monitor_pipe new_data = pipe_reader.read_nonblock(chunk_size) write_data(new_data) rescue IO::WaitReadable pipe_reader.wait_readable(0.001) end
def start_monitoring_thread
- Api: - private
Returns:
-
(void)-
def start_monitoring_thread Thread.new do Thread.current.report_on_exception = false Thread.current.abort_on_exception = false monitor end end
def to_io
- Api: - private
Returns:
-
(IO)- the write end of the pipe
def to_io pipe_writer end
def write(data)
- Api: - private
Returns:
-
(Integer)- the number of bytes written to the pipe
Parameters:
-
data(String) -- the data to write to the pipe
def write(data) mutex.synchronize do raise IOError, 'closed stream' unless state == :open pipe_writer.write(data) end end
def write_data(data)
- Api: - private
Returns:
-
(void)-
Parameters:
-
data(String) -- the data read from the pipe
def write_data(data) destination.write(data) rescue StandardError => e mutex.synchronize do @exception = e @state = :closing end end