class Puma::Cluster
def run
def run @status = :run output_header "cluster" # This is aligned with the output from Runner, see Runner#output_header log "* Workers: #{@options[:workers]}" if preload? # Threads explicitly marked as fork safe will be ignored. Used in Rails, # but may be used by anyone. fork_safe = ->(t) { t.thread_variable_get(:fork_safe) } before = Thread.list.reject(&fork_safe) log "* Restarts: (\u2714) hot (\u2716) phased (#{@options[:fork_worker] ? "\u2714" : "\u2716"}) refork" log "* Preloading application" load_and_bind after = Thread.list.reject(&fork_safe) if after.size > before.size threads = (after - before) if threads.first.respond_to? :backtrace log "! WARNING: Detected #{after.size-before.size} Thread(s) started in app boot:" threads.each do |t| log "! #{t.inspect} - #{t.backtrace ? t.backtrace.first : ''}" end else log "! WARNING: Detected #{after.size-before.size} Thread(s) started in app boot" end end else log "* Restarts: (\u2714) hot (\u2714) phased (#{@options[:fork_worker] ? "\u2714" : "\u2716"}) refork" unless @config.app_configured? error "No application configured, nothing to run" exit 1 end @launcher.binder.parse @options[:binds] end read, @wakeup = Puma::Util.pipe setup_signals # Used by the workers to detect if the master process dies. # If select says that @check_pipe is ready, it's because the # master has exited and @suicide_pipe has been automatically # closed. # @check_pipe, @suicide_pipe = Puma::Util.pipe # Separate pipe used by worker 0 to receive commands to # fork new worker processes. @fork_pipe, @fork_writer = Puma::Util.pipe log "Use Ctrl-C to stop" warn_ruby_mn_threads single_worker_warning redirect_io Plugins.fire_background @launcher.write_state start_control @master_read, @worker_write = read, @wakeup @options[:worker_write] = @worker_write @config.run_hooks(:before_fork, nil, @log_writer) spawn_workers Signal.trap "SIGINT" do stop end begin booted = false in_phased_restart = false workers_not_booted = @options[:workers] while @status == :run begin if @options[:idle_timeout] && all_workers_idle_timed_out? log "- All workers reached idle timeout" break end if @pending_phased_restart start_phased_restart(@pending_phased_restart == :refork) in_phased_restart = @pending_phased_restart @pending_phased_restart = false workers_not_booted = @options[:workers] # worker 0 is not restarted on refork workers_not_booted -= 1 if in_phased_restart == :refork end check_workers(in_phased_restart == :refork) if read.wait_readable([0, @next_check - Time.now].max) req = read.read_nonblock(1) next unless req if req == PIPE_WAKEUP @next_check = Time.now next end result = read.gets pid = result.to_i if req == PIPE_BOOT || req == PIPE_FORK pid, idx = result.split(':').map(&:to_i) w = worker_at idx w.pid = pid if w.pid.nil? end if w = @workers.find { |x| x.pid == pid } case req when PIPE_BOOT w.boot! log "- Worker #{w.index} (PID: #{pid}) booted in #{w.uptime.round(2)}s, phase: #{w.phase}" @next_check = Time.now workers_not_booted -= 1 when PIPE_EXTERNAL_TERM # external term, see worker method, Signal.trap "SIGTERM" w.term! when PIPE_TERM w.term unless w.term? when PIPE_PING status = result.sub(/^\d+/,'').chomp w.ping!(status) @events.fire(:ping!, w) if in_phased_restart && @options[:fork_worker] && workers_not_booted.positive? && w0 = worker_at(0) w0.ping!(status) @events.fire(:ping!, w0) end if !booted && @workers.none? {|worker| worker.last_status.empty?} @events.fire_after_booted! debug_loaded_extensions("Loaded Extensions - master:") if @log_writer.debug? booted = true end when PIPE_IDLE if idle_workers[pid] idle_workers.delete pid else idle_workers[pid] = true end end else log "! Out-of-sync worker list, no #{pid} worker" end end if in_phased_restart && workers_not_booted.zero? @events.fire_after_booted! debug_loaded_extensions("Loaded Extensions - master:") if @log_writer.debug? in_phased_restart = false end rescue Interrupt @status = :stop end end stop_workers unless @status == :halt ensure @check_pipe.close @suicide_pipe.close read.close @wakeup.close end end