lib/async/container/notify/pipe.rb
# frozen_string_literal: true # Released under the MIT License. # Copyright, 2020-2026, by Samuel Williams. # Copyright, 2020, by Juan Antonio MartÃn Lucas. require_relative "client" require "json" module Async module Container module Notify # Implements a process readiness protocol using an inherited pipe file descriptor. class Pipe < Client # The environment variable key which contains the pipe file descriptor. NOTIFY_PIPE = "NOTIFY_PIPE" # Open a notification client attached to the current {NOTIFY_PIPE} if possible. def self.open!(environment = ENV) if descriptor = environment.delete(NOTIFY_PIPE) self.new(::IO.for_fd(descriptor.to_i)) end rescue Errno::EBADF => error Console.error(self){error} return nil end # Initialize the notification client. # @parameter io [IO] An IO instance used for sending messages. def initialize(io) @io = io end # Inserts or duplicates the environment given an argument array. # Sets or clears it in a way that is suitable for {::Process.spawn}. def before_spawn(arguments, options) environment = environment_for(arguments) # Use `notify_pipe` option if specified: if notify_pipe = options.delete(:notify_pipe) options[notify_pipe] = @io environment[NOTIFY_PIPE] = notify_pipe.to_s # Use stdout if it's not redirected: # This can cause issues if the user expects stdout to be connected to a terminal. # elsif !options.key?(:out) # options[:out] = @io # environment[NOTIFY_PIPE] = "1" # Use fileno 3 if it's available: elsif !options.key?(3) options[3] = @io environment[NOTIFY_PIPE] = "3" # Otherwise, give up! else raise ArgumentError, "Please specify valid file descriptor for notify_pipe!" end end # Formats the message using JSON and sends it to the parent controller. # This is suitable for use with {Channel}. def send(**message) data = ::JSON.dump(message) << "\n" @io.write(data) @io.flush end private def environment_for(arguments) # Insert or duplicate the environment hash which is the first argument: if arguments.first.is_a?(Hash) environment = arguments[0] = arguments.first.dup else arguments.unshift(environment = Hash.new) end return environment end end end end end