require "set"
require "tmpdir"
require "einhorn/command/interface"
require "einhorn/prctl"
module Einhorn
module Command
def self.reap
loop do
Einhorn.log_debug("Going to reap a child process")
pid = Process.wait(-1, Process::WNOHANG)
return unless pid
cleanup(pid)
Einhorn::Event.break_loop
end
rescue Errno::ECHILD
end
def self.cleanup(pid)
unless (spec = Einhorn::State.children[pid])
Einhorn.log_error("Could not find any config for exited child #{pid.inspect}! This probably indicates a bug in Einhorn.")
return
end
Einhorn::State.children.delete(pid)
# Unacked worker
if spec[:type] == :worker && !spec[:acked]
Einhorn::State.consecutive_deaths_before_ack += 1
extra = " before it was ACKed"
else
extra = nil
end
case type = spec[:type]
when :worker
Einhorn.log_info("===> Exited worker #{pid.inspect}#{extra}", :upgrade)
when :state_passer
Einhorn.log_debug("===> Exited state passing process #{pid.inspect}", :upgrade)
else
Einhorn.log_error("===> Exited process #{pid.inspect} has unrecgonized type #{type.inspect}: #{spec.inspect}", :upgrade)
end
end
def self.register_ping(pid, request_id)
unless (spec = Einhorn::State.children[pid])
Einhorn.log_error("Could not find state for PID #{pid.inspect}; ignoring ACK.")
return
end
spec[:pinged_at] = Time.now
spec[:pinged_request_id] = request_id
end
def self.register_manual_ack(pid)
ack_mode = Einhorn::State.ack_mode
unless ack_mode[:type] == :manual
Einhorn.log_error("Received a manual ACK for #{pid.inspect}, but ack_mode is #{ack_mode.inspect}. Ignoring ACK.")
return
end
Einhorn.log_info("Received a manual ACK from #{pid.inspect}")
register_ack(pid)
end
def self.register_timer_ack(time, pid)
ack_mode = Einhorn::State.ack_mode
unless ack_mode[:type] == :timer
Einhorn.log_error("Received a timer ACK for #{pid.inspect}, but ack_mode is #{ack_mode.inspect}. Ignoring ACK.")
return
end
unless Einhorn::State.children[pid]
# TODO: Maybe cancel pending ACK timers upon death?
Einhorn.log_debug("Worker #{pid.inspect} died before its timer ACK happened.")
return
end
Einhorn.log_info("Worker #{pid.inspect} has been up for #{time}s, so we are considering it alive.")
register_ack(pid)
end
def self.register_ack(pid)
unless (spec = Einhorn::State.children[pid])
Einhorn.log_error("Could not find state for PID #{pid.inspect}; ignoring ACK.")
return
end
if spec[:acked]
Einhorn.log_error("Pid #{pid.inspect} already ACKed; ignoring new ACK.")
return
end
extra = if Einhorn::State.consecutive_deaths_before_ack > 0
", breaking the streak of #{Einhorn::State.consecutive_deaths_before_ack} consecutive unacked workers dying"
end
Einhorn::State.consecutive_deaths_before_ack = 0
spec[:acked] = true
Einhorn.log_info("Up to #{Einhorn::WorkerPool.ack_count} / #{Einhorn::WorkerPool.ack_target} #{Einhorn::State.ack_mode[:type]} ACKs#{extra}")
# Could call cull here directly instead, I believe.
Einhorn::Event.break_loop
end
def self.signal_all(signal, children = nil, record = true)
children ||= Einhorn::WorkerPool.workers
signaled = {}
Einhorn.log_info("Sending #{signal} to #{children.inspect}", :upgrade)
children.each do |child|
unless (spec = Einhorn::State.children[child])
Einhorn.log_error("Trying to send #{signal} to dead child #{child.inspect}. The fact we tried this probably indicates a bug in Einhorn.", :upgrade)
next
end
if record
if spec[:signaled].include?(signal)
Einhorn.log_error("Re-sending #{signal} to already-signaled child #{child.inspect}. It may be slow to spin down, or it may be swallowing #{signal}s.", :upgrade)
end
spec[:signaled].add(signal)
spec[:last_signaled_at] = Time.now
end
begin
Process.kill(signal, child)
rescue Errno::ESRCH
Einhorn.log_debug("Attempted to #{signal} child #{child.inspect} but the process does not exist", :upgrade)
else
signaled[child] = spec
end
end
if Einhorn::State.signal_timeout && record
Einhorn::Event::Timer.open(Einhorn::State.signal_timeout) do
children.each do |child|
spec = Einhorn::State.children[child]
next unless spec # Process is already dead and removed by cleanup
signaled_spec = signaled[child]
next unless signaled_spec # We got ESRCH when trying to signal
if spec[:spinup_time] != signaled_spec[:spinup_time]
Einhorn.log_info("Different spinup time recorded for #{child} after #{Einhorn::State.signal_timeout}s. This probably indicates a PID rollover.", :upgrade)
next
end
Einhorn.log_info("Child #{child.inspect} is still active after #{Einhorn::State.signal_timeout}s. Sending SIGKILL.")
begin
Process.kill("KILL", child)
rescue Errno::ESRCH
end
spec[:signaled].add("KILL")
end
end
Einhorn.log_info("Successfully sent #{signal}s to #{signaled.length} processes: #{signaled.keys}")
end
end
def self.increment
Einhorn::Event.break_loop
old = Einhorn::State.config[:number]
new = (Einhorn::State.config[:number] += 1)
output = "Incrementing number of workers from #{old} -> #{new}"
warn(output)
output
end
def self.decrement
if Einhorn::State.config[:number] <= 1
output = "Can't decrease number of workers (already at #{Einhorn::State.config[:number]}). Run kill #{$$} if you really want to kill einhorn."
warn(output)
return output
end
Einhorn::Event.break_loop
old = Einhorn::State.config[:number]
new = (Einhorn::State.config[:number] -= 1)
output = "Decrementing number of workers from #{old} -> #{new}"
warn(output)
output
end
def self.set_workers(new)
if new == Einhorn::State.config[:number]
return ""
end
Einhorn::Event.break_loop
old = Einhorn::State.config[:number]
Einhorn::State.config[:number] = new
output = "Altering worker count, #{old} -> #{new}. Will "
output << if old < new
"spin up additional workers."
else
"gracefully terminate workers."
end
warn(output)
output
end
def self.dumpable_state
global_state = Einhorn::State.dumpable_state
descriptor_state = Einhorn::Event.persistent_descriptors.map do |descriptor|
descriptor.to_state
end
{
state: global_state,
persistent_descriptors: descriptor_state
}
end
def self.reload
unless Einhorn::State.respawn
Einhorn.log_info("Not reloading einhorn because we're exiting")
return
end
Einhorn.log_info("Reloading einhorn master (#{Einhorn::TransientState.script_name})...", :reload)
# In case there's anything lurking
$stdout.flush
# Spawn a child to pass the state through the pipe
read, write = Einhorn::Compat.pipe
fork do
Einhorn::TransientState.whatami = :state_passer
Einhorn::State.children[Process.pid] = {type: :state_passer}
Einhorn::State.generation += 1
read.close
begin
write.write(YAML.dump(dumpable_state))
rescue Errno::EPIPE => e
e.message << " (state worker could not write state, which likely means the parent process has died)"
raise e
end
write.close
exit(0)
end
write.close
unless Einhorn.can_safely_reload?
Einhorn.log_error("Can not initiate einhorn master reload safely, aborting", :reload)
Einhorn::State.reloading_for_upgrade = false
read.close
return
end
begin
Einhorn.initialize_reload_environment
respawn_commandline = Einhorn.upgrade_commandline(["--with-state-fd", read.fileno.to_s])
respawn_commandline << {close_others: false}
Einhorn.log_info("About to re-exec einhorn master as #{respawn_commandline.inspect}", :reload)
Einhorn::Compat.exec(*respawn_commandline)
rescue SystemCallError => e
Einhorn.log_error("Could not reload! Attempting to continue. Error was: #{e}", :reload)
Einhorn::State.reloading_for_upgrade = false
read.close
end
end
def self.next_index
all_indexes = Set.new(Einhorn::State.children.map { |k, st| st[:index] })
0.upto(all_indexes.length) do |i|
return i unless all_indexes.include?(i)
end
end
def self.spinup(cmd = nil)
cmd ||= Einhorn::State.cmd
index = next_index
expected_ppid = Process.pid
pid = if Einhorn::State.preloaded
fork do
Einhorn::TransientState.whatami = :worker
prepare_child_process
Einhorn.log_info("About to tear down Einhorn state and run einhorn_main")
Einhorn::Command::Interface.uninit
Einhorn::Event.close_all_for_worker
Einhorn.set_argv(cmd, true)
reseed_random
setup_parent_watch(expected_ppid)
prepare_child_environment(index)
einhorn_main
end
else
fork do
Einhorn::TransientState.whatami = :worker
prepare_child_process
Einhorn.log_info("About to exec #{cmd.inspect}")
Einhorn::Command::Interface.uninit
# Here's the only case where cloexec would help. Since we
# have to track and manually close FDs for other cases, we
# may as well just reuse close_all rather than also set
# cloexec on everything.
#
# Note that Ruby 1.9's close_others option is useful here.
Einhorn::Event.close_all_for_worker
setup_parent_watch(expected_ppid)
prepare_child_environment(index)
Einhorn::Compat.exec(cmd[0], cmd[1..-1], close_others: false)
end
end
Einhorn.log_info("===> Launched #{pid} (index: #{index})", :upgrade)
Einhorn::State.last_spinup = Time.now
Einhorn::State.children[pid] = {
type: :worker,
version: Einhorn::State.version,
acked: false,
signaled: Set.new,
last_signaled_at: nil,
index: index,
spinup_time: Einhorn::State.last_spinup
}
# Set up whatever's needed for ACKing
ack_mode = Einhorn::State.ack_mode
case type = ack_mode[:type]
when :timer
Einhorn::Event::ACKTimer.open(ack_mode[:timeout], pid)
when :manual
# nothing to do
else
Einhorn.log_error("Unrecognized ACK mode #{type.inspect}")
end
end
def self.prepare_child_environment(index)
# This is run from the child
ENV["EINHORN_MASTER_PID"] = Process.ppid.to_s
ENV["EINHORN_SOCK_PATH"] = Einhorn::Command::Interface.socket_path
if Einhorn::State.command_socket_as_fd
socket = UNIXSocket.open(Einhorn::Command::Interface.socket_path)
Einhorn::TransientState.socket_handles << socket
ENV["EINHORN_SOCK_FD"] = socket.fileno.to_s
end
ENV["EINHORN_FD_COUNT"] = Einhorn::State.bind_fds.length.to_s
Einhorn::State.bind_fds.each_with_index { |fd, i| ENV["EINHORN_FD_#{i}"] = fd.to_s }
ENV["EINHORN_CHILD_INDEX"] = index.to_s
end
# Reseed common ruby random number generators.
#
# OpenSSL::Random uses the PID to reseed after fork, which means that if a
# long-lived master process over its lifetime spawns two workers with the
# same PID, those workers will start with the same OpenSSL seed.
#
# Ruby >= 1.9 has a guard against this in SecureRandom, but any direct
# users of OpenSSL::Random will still be affected.
#
# Ruby 1.8 didn't even reseed the default random number generator used by
# Kernel#rand in certain releases.
#
# https://bugs.ruby-lang.org/issues/4579
#
def self.reseed_random
# reseed Kernel#rand
srand
# reseed OpenSSL::Random if it's loaded
if defined?(OpenSSL::Random)
seed = if defined?(Random)
Random.new_seed
else
# Ruby 1.8
rand
end
OpenSSL::Random.seed(seed.to_s)
end
end
def self.prepare_child_process
Process.setpgrp
Einhorn.renice_self
end
def self.setup_parent_watch(expected_ppid)
if Einhorn::State.kill_children_on_exit
begin
# NB: Having the USR2 signal handler set to terminate (the default) at
# this point is required. If it's set to a ruby handler, there are
# race conditions that could cause the worker to leak.
Einhorn::Prctl.set_pdeathsig("USR2")
if Process.ppid != expected_ppid
Einhorn.log_error("Parent process died before we set pdeathsig; cowardly refusing to exec child process.")
exit(1)
end
rescue NotImplementedError
# Unsupported OS; silently continue.
end
end
end
# @param options [Hash]
#
# @option options [Boolean] :smooth (false) Whether to perform a smooth or
# fleet upgrade. In a smooth upgrade, bring up new workers and cull old
# workers one by one as soon as there is a replacement. In a fleet
# upgrade, bring up all the new workers and don't cull any old workers
# until they're all up.
#
def self.full_upgrade(options = {})
options = {smooth: false}.merge(options)
Einhorn::State.smooth_upgrade = options.fetch(:smooth)
reload_for_upgrade
end
def self.full_upgrade_smooth
full_upgrade(smooth: true)
end
def self.full_upgrade_fleet
full_upgrade(smooth: false)
end
def self.reload_for_upgrade
Einhorn::State.reloading_for_upgrade = true
reload
end
def self.upgrade_workers
if Einhorn::State.upgrading
Einhorn.log_info("Currently upgrading (#{Einhorn::WorkerPool.ack_count} / #{Einhorn::WorkerPool.ack_target} ACKs; bumping version and starting over)...", :upgrade)
else
Einhorn::State.upgrading = true
u_type = Einhorn::State.smooth_upgrade ? "smooth" : "fleet"
Einhorn.log_info("Starting #{u_type} upgrade from version" \
" #{Einhorn::State.version}...", :upgrade)
end
# Reset this, since we've just upgraded to a new universe (I'm
# not positive this is the right behavior, but it's not
# obviously wrong.)
Einhorn::State.consecutive_deaths_before_ack = 0
Einhorn::State.last_upgraded = Time.now
Einhorn::State.version += 1
if Einhorn::State.smooth_upgrade
replenish_gradually
else
replenish_immediately
end
end
def self.cull
acked = Einhorn::WorkerPool.ack_count
unsignaled = Einhorn::WorkerPool.unsignaled_count
target = Einhorn::WorkerPool.ack_target
if Einhorn::State.upgrading && acked >= target
Einhorn::State.upgrading = false
Einhorn.log_info("Upgraded successfully to version #{Einhorn::State.version} (Einhorn #{Einhorn::VERSION}).", :upgrade)
Einhorn.send_tagged_message(:upgrade, "Upgrade done", true)
end
old_workers = Einhorn::WorkerPool.old_workers
Einhorn.log_debug("#{acked} acked, #{unsignaled} unsignaled, #{target} target, #{old_workers.length} old workers")
if !Einhorn::State.upgrading && old_workers.length > 0
Einhorn.log_info("Killing off #{old_workers.length} old workers.", :upgrade)
signal_all("USR2", old_workers)
elsif Einhorn::State.upgrading && Einhorn::State.smooth_upgrade
# In a smooth upgrade, kill off old workers one by one when we have
# sufficiently many new workers.
excess = (old_workers.length + acked) - target
if excess > 0
Einhorn.log_info("Smooth upgrade: killing off #{excess} old workers.", :upgrade)
signal_all("USR2", old_workers.take(excess))
else
Einhorn.log_debug("Not killing old workers, as excess is #{excess}.")
end
end
if unsignaled > target
excess = Einhorn::WorkerPool.unsignaled_modern_workers_with_priority[0...(unsignaled - target)]
Einhorn.log_info("Have too many workers at the current version, so killing off #{excess.length} of them.")
signal_all("USR2", excess)
end
# Ensure all signaled workers that have outlived signal_timeout get killed.
kill_expired_signaled_workers if Einhorn::State.signal_timeout
end
def self.kill_expired_signaled_workers
now = Time.now
children = Einhorn::State.children.select do |_, c|
# Only interested in USR2 signaled workers
next unless c[:signaled] && c[:signaled].length > 0
next unless c[:signaled].include?("USR2")
# Ignore processes that have received KILL since it can't be trapped.
next if c[:signaled].include?("KILL")
# Filter out those children that have not reached signal_timeout yet.
next unless c[:last_signaled_at]
expires_at = c[:last_signaled_at] + Einhorn::State.signal_timeout
next unless now >= expires_at
true
end
Einhorn.log_info("#{children.size} expired signaled workers found.") if children.size > 0
children.each do |pid, child|
Einhorn.log_info("Child #{pid.inspect} was signaled #{(child[:last_signaled_at] - now).abs.to_i}s ago. Sending SIGKILL as it is still active after #{Einhorn::State.signal_timeout}s timeout.", :upgrade)
begin
Process.kill("KILL", pid)
rescue Errno::ESRCH
Einhorn.log_debug("Attempted to SIGKILL child #{pid.inspect} but the process does not exist.")
end
child[:signaled].add("KILL")
child[:last_signaled_at] = Time.now
end
end
def self.stop_respawning
Einhorn::State.respawn = false
Einhorn::Event.break_loop
end
def self.replenish
return unless Einhorn::State.respawn
if !Einhorn::State.last_spinup
replenish_immediately
else
replenish_gradually
end
end
def self.replenish_immediately
missing = Einhorn::WorkerPool.missing_worker_count
if missing <= 0
Einhorn.log_error("Missing is currently #{missing.inspect}, but should always be > 0 when replenish_immediately is called. This probably indicates a bug in Einhorn.")
return
end
Einhorn.log_info("Launching #{missing} new workers")
missing.times { spinup }
end
# Unbounded exponential backoff is not a thing: we run into problems if
# e.g., each of our hundred workers simultaneously fail to boot for the same
# ephemeral reason. Instead cap backoff by some reasonable maximum, so we
# don't wait until the heat death of the universe to spin up new capacity.
MAX_SPINUP_INTERVAL = 30.0
def self.replenish_gradually(max_unacked = nil)
return if Einhorn::TransientState.has_outstanding_spinup_timer
return unless Einhorn::WorkerPool.missing_worker_count > 0
max_unacked ||= Einhorn::State.config[:max_unacked]
# default to spinning up at most NCPU workers at once
unless max_unacked
begin
@processor_count ||= Einhorn::Compat.processor_count
rescue => err
Einhorn.log_error(err.inspect)
@processor_count = 1
end
max_unacked = @processor_count
end
if max_unacked <= 0
raise ArgumentError.new("max_unacked must be positive")
end
# Exponentially backoff automated spinup if we're just having
# things die before ACKing
spinup_interval = Einhorn::State.config[:seconds] * (1.5**Einhorn::State.consecutive_deaths_before_ack)
spinup_interval = [spinup_interval, MAX_SPINUP_INTERVAL].min
seconds_ago = (Time.now - Einhorn::State.last_spinup).to_f
if seconds_ago > spinup_interval
if trigger_spinup?(max_unacked)
msg = "Last spinup was #{seconds_ago}s ago, and spinup_interval is #{spinup_interval}s, so spinning up a new process."
if Einhorn::State.consecutive_deaths_before_ack > 0
Einhorn.log_info("#{msg} (there have been #{Einhorn::State.consecutive_deaths_before_ack} consecutive unacked worker deaths)", :upgrade)
else
Einhorn.log_debug(msg)
end
spinup
end
else
Einhorn.log_debug("Last spinup was #{seconds_ago}s ago, and spinup_interval is #{spinup_interval}s, so not spinning up a new process.")
end
Einhorn::TransientState.has_outstanding_spinup_timer = true
Einhorn::Event::Timer.open(spinup_interval) do
Einhorn::TransientState.has_outstanding_spinup_timer = false
replenish
end
end
def self.quieter(log = true)
Einhorn::State.verbosity += 1 if Einhorn::State.verbosity < 2
output = "Verbosity set to #{Einhorn::State.verbosity}"
Einhorn.log_info(output) if log
output
end
def self.louder(log = true)
Einhorn::State.verbosity -= 1 if Einhorn::State.verbosity > 0
output = "Verbosity set to #{Einhorn::State.verbosity}"
Einhorn.log_info(output) if log
output
end
def self.trigger_spinup?(max_unacked)
unacked = Einhorn::WorkerPool.unacked_unsignaled_modern_workers.length
if unacked >= max_unacked
Einhorn.log_info("There are #{unacked} unacked new workers, and max_unacked is #{max_unacked}, so not spinning up a new process.")
return false
elsif Einhorn::State.config[:max_upgrade_additional]
capacity_exceeded = (Einhorn::State.config[:number] + Einhorn::State.config[:max_upgrade_additional]) - Einhorn::WorkerPool.workers_with_state.length
if capacity_exceeded < 0
Einhorn.log_info("Over worker capacity by #{capacity_exceeded.abs} during upgrade, #{Einhorn::WorkerPool.modern_workers.length} new workers of #{Einhorn::WorkerPool.workers_with_state.length} total. Waiting for old workers to exit before spinning up a process.")
return false
end
end
true
end
end
end