class Async::Container::Notify::Pipe
Implements a process readiness protocol using an inherited pipe file descriptor.
def self.open!(environment = ENV)
def self.open!(environment = ENV) if descriptor = environment.delete(NOTIFY_PIPE) self.new(::IO.for_fd(descriptor.to_i)) end rescue Errno::EBADF => error Console.error(self) {error} return nil end
def before_spawn(arguments, options)
Inserts or duplicates the environment given an argument array.
def before_spawn(arguments, options) environment = environment_for(arguments) # Use `notify_pipe` option if specified: if notify_pipe = options.delete(:notify_pipe) options[notify_pipe] = @io environment[NOTIFY_PIPE] = notify_pipe.to_s # Use stdout if it's not redirected: # This can cause issues if the user expects stdout to be connected to a terminal. # elsif !options.key?(:out) # options[:out] = @io # environment[NOTIFY_PIPE] = "1" # Use fileno 3 if it's available: elsif !options.key?(3) options[3] = @io environment[NOTIFY_PIPE] = "3" # Otherwise, give up! else raise ArgumentError, "Please specify valid file descriptor for notify_pipe!" end end
def environment_for(arguments)
def environment_for(arguments) # Insert or duplicate the environment hash which is the first argument: if arguments.first.is_a?(Hash) environment = arguments[0] = arguments.first.dup else arguments.unshift(environment = Hash.new) end return environment end
def initialize(io)
Initialize the notification client.
def initialize(io) @io = io end
def send(**message)
Formats the message using JSON and sends it to the parent controller.
def send(**message) data = ::JSON.dump(message) @io.puts(data) @io.flush end