lib/process_executer/monitored_pipe.rb



# frozen_string_literal: true

require 'stringio'
require 'io/wait'

module ProcessExecuter
  # Stream data sent through a pipe to one or more writers
  #
  # When a new MonitoredPipe is created, a pipe is created (via IO.pipe) and
  # a thread is created to read data written to the pipe.
  #
  # Data that is read from that pipe is written one or more writers passed to
  # `#initialize`.
  #
  # If any of the writers raise an exception, the monitoring thread will exit, the
  # pipe will be closed, and the exception will be saved in `#exception`.
  #
  # `#close` must be called to ensure that (1) the pipe is closed, (2) all data is
  # read from the pipe and written to the writers, and (3) the monitoring thread is
  # killed.
  #
  # @example Collect pipe data into a string
  #   pipe_data = StringIO.new
  #   begin
  #     pipe = MonitoredPipe.new(pipe_data)
  #     pipe.write("Hello World")
  #   ensure
  #     pipe.close
  #   end
  #   pipe_data.string #=> "Hello World"
  #
  # @example Collect pipe data into a string AND a file
  #   pipe_data_string = StringIO.new
  #   pipe_data_file = File.open("pipe_data.txt", "w")
  #   begin
  #     pipe = MonitoredPipe.new(pipe_data_string, pipe_data_file)
  #     pipe.write("Hello World")
  #   ensure
  #     pipe.close
  #   end
  #   pipe_data_string.string #=> "Hello World"
  #   File.read("pipe_data.txt") #=> "Hello World"
  #
  # @api public
  #
  class MonitoredPipe
    # Create a new monitored pipe
    #
    # Creates a IO.pipe and starts a monitoring thread to read data written to the pipe.
    #
    # @example
    #   data_collector = StringIO.new
    #   pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
    #
    # @param writers [Array<#write>] as data is read from the pipe, it is written to these writers
    # @param chunk_size [Integer] the size of the chunks to read from the pipe
    #
    def initialize(*writers, chunk_size: 100_000)
      @writers = writers
      @chunk_size = chunk_size
      @pipe_reader, @pipe_writer = IO.pipe
      @state = :open
      @thread = Thread.new do
        Thread.current.report_on_exception = false
        Thread.current.abort_on_exception = false
        monitor
      end
    end

    # Set the state to `:closing` and wait for the state to be set to `:closed`
    #
    # The monitoring thread will see that the state has changed and will close the pipe.
    #
    # @example
    #   data_collector = StringIO.new
    #   pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
    #   pipe.state #=> :open
    #   pipe.write('Hello World')
    #   pipe.close
    #   pipe.state #=> :closed
    #   data_collector.string #=> "Hello World"
    #
    # @return [void]
    #
    def close
      return unless state == :open

      @state = :closing
      sleep 0.001 until state == :closed
    end

    # Return the write end of the pipe so that data can be written to it
    #
    # Data written to this end of the pipe will be read by the monitor thread and
    # written to the writers passed to `#initialize`.
    #
    # This is so we can provide a MonitoredPipe to Process.spawn as a FD
    #
    # @example
    #   require 'stringio'
    #   data_collector = StringIO.new
    #   pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
    #   pipe.to_io.write('Hello World')
    #   pipe.close
    #   data_collector.string #=> "Hello World"
    #
    # @return [IO] the write end of the pipe
    #
    # @api private
    #
    def to_io
      pipe_writer
    end

    # @!attribute [r] fileno
    #
    # The file descriptor for the write end of the pipe
    #
    # @example
    #   require 'stringio'
    #   data_collector = StringIO.new
    #   pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
    #   pipe.fileno == pipe.to_io.fileno #=> true
    #
    # @return [Integer] the file descriptor for the write end of the pipe
    #
    # @api private
    #
    def fileno
      pipe_writer.fileno
    end

    # Writes data to the pipe so that it can be read by the monitor thread
    #
    # Primarily used for testing.
    #
    # @example
    #   require 'stringio'
    #   data_collector = StringIO.new
    #   pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
    #   pipe.write('Hello World')
    #   pipe.close
    #   data_collector.string #=> "Hello World"
    #
    # @param data [String] the data to write to the pipe
    #
    # @return [Integer] the number of bytes written to the pipe
    #
    # @api private
    #
    def write(data)
      raise IOError, 'closed stream' unless state == :open

      pipe_writer.write(data)
    end

    # @!attribute [r]
    #
    # The size of the chunks to read from the pipe
    #
    # @example
    #   require 'stringio'
    #   data_collector = StringIO.new
    #   pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
    #   pipe.chunk_size #=> 1000
    #
    # @return [Integer] the size of the chunks to read from the pipe
    #
    attr_reader :chunk_size

    # @!attribute [r]
    #
    # An array of writers to write data that is read from the pipe
    #
    # @example with one writer
    #   require 'stringio'
    #   data_collector = StringIO.new
    #   pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
    #   pipe.writers #=> [data_collector]
    #
    # @example with an array of writers
    #   require 'stringio'
    #   data_collector1 = StringIO.new
    #   data_collector2 = StringIO.new
    #   pipe = ProcessExecuter::MonitoredPipe.new(data_collector1, data_collector2)
    #   pipe.writers #=> [data_collector1, data_collector2]]
    #
    # @return [Array<#write>]
    #
    attr_reader :writers

    # @!attribute [r]
    #
    # The thread that monitors the pipe
    #
    # @example
    #   require 'stringio'
    #   data_collector = StringIO.new
    #   pipe = ProcessExecuter::MonitoredPipe.new(data_collector)
    #   pipe.thread #=> #<Thread:0x00007f8b1a0b0e00>
    #
    # @return [Thread]
    attr_reader :thread

    # @!attribute [r]
    #
    # The read end of the pipe
    #
    # @example
    #   pipe = ProcessExecuter::MonitoredPipe.new($stdout)
    #   pipe.pipe_reader #=> #<IO:fd 11>
    #
    # @return [IO]
    attr_reader :pipe_reader

    # @!attribute [r]
    #
    # The write end of the pipe
    #
    # @example
    #   pipe = ProcessExecuter::MonitoredPipe.new($stdout)
    #   pipe.pipe_writer #=> #<IO:fd 12>
    #
    # @return [IO] the write end of the pipe
    attr_reader :pipe_writer

    # @!attribute [r]
    #
    # The state of the pipe
    #
    # Must be either `:open`, `:closing`, or `:closed`
    #
    # * `:open` - the pipe is open and data can be written to it
    # * `:closing` - the pipe is being closed and data can no longer be written to it
    # * `:closed` - the pipe is closed and data can no longer be written to it
    #
    # @example
    #   pipe = ProcessExecuter::MonitoredPipe.new($stdout)
    #   pipe.state #=> :open
    #   pipe.close
    #   pipe.state #=> :closed
    #
    # @return [Symbol] the state of the pipe
    #
    attr_reader :state

    # @!attribute [r]
    #
    # The exception raised by a writer
    #
    # If an exception is raised by a writer, it is stored here. Otherwise, it is `nil`.
    #
    # @example
    #   pipe.exception #=> nil
    #
    # @return [Exception, nil] the exception raised by a writer or `nil` if no exception was raised
    #
    attr_reader :exception

    private

    # Read data from the pipe until `#state` is changed to `:closing`
    #
    # The state is changed to `:closed` by calling `#close`.
    #
    # Before this method returns, state is set to `:closed`
    #
    # @return [void]
    # @api private
    def monitor
      monitor_pipe until state == :closing
      close_pipe
      @state = :closed
    end

    # Read data from the pipe until `#state` is changed to `:closing`
    #
    # Data read from the pipe is written to the writers given to the constructor.
    #
    # @return [void]
    # @api private
    def monitor_pipe
      new_data = pipe_reader.read_nonblock(chunk_size)
      # SimpleCov under JRuby reports the begin statement as not covered, but it is
      # :nocov:
      begin
        # :nocov:
        writers.each { |w| w.write(new_data) }
      rescue StandardError => e
        @exception = e
        @state = :closing
      end
    rescue IO::WaitReadable
      pipe_reader.wait_readable(0.001)
    end

    # Read any remaining data from the pipe and close it
    #
    # @return [void]
    # @api private
    def close_pipe
      # Close the write end of the pipe so no more data can be written to it
      pipe_writer.close

      # Read remaining data from pipe_reader (if any)
      # If an exception was already raised by the last call to #write, then don't try to read remaining data
      monitor_pipe while exception.nil? && !pipe_reader.eof?

      # Close the read end of the pipe
      pipe_reader.close
    end
  end
end