class TP2::Server
def self.rack_app(opts)
def self.rack_app(opts) raise 'Missing app location' if !opts[:app_location] TP2::RackAdapter.load(opts[:app_location]) end
def self.static_app(opts); end
def self.static_app(opts); end
def self.tp2_app(_machine, opts)
def self.tp2_app(_machine, opts) if opts[:app_location] opts[:logger]&.info(message: 'Loading web app', location: opts[:app_location]) require opts[:app_location] opts.merge!(TP2.config) end opts[:app] end
def accept_incoming(listen_fd)
def accept_incoming(listen_fd) @machine.accept_each(listen_fd) do |fd| conn = HTTP1Connection.new(@machine, fd, @opts, &@app) f = @machine.spin(conn) do it.run ensure @connection_fiber_map.delete(f) end @connection_fiber_map[f] = true end rescue UM::Terminate # terminated rescue Exception => e p e p e.backtrace exit end
def app_from_opts
def app_from_opts case @opts[:app_type] when nil, :tp2 Server.tp2_app(@machine, @opts) when :rack Server.rack_app(@opts) when :static Server.static_app(@opts) else raise "Invalid app type #{@opts[:app_type].inspect}" end end
def bind_info(bind_string)
def bind_info(bind_string) parts = bind_string.split(':') [parts[0], parts[1].to_i] end
def close_all_server_fds
def close_all_server_fds @server_fds.each { @machine.close_async(it) } end
def get_bind_entries
def get_bind_entries bind = @opts[:bind] case bind when Array bind.map { bind_info(it) } when String [bind_info(bind)] else # default [['0.0.0.0', 1234]] end end
def graceful_shutdown
def graceful_shutdown @opts[:logger]&.info(message: 'Shutting down gracefully...') # stop listening close_all_server_fds @machine.snooze return if @connection_fiber_map.empty? # sleep for a bit, let requests finish @machine.sleep(PENDING_REQUESTS_GRACE_PERIOD) return if @connection_fiber_map.empty? # terminate pending fibers pending = @connection_fiber_map.keys signal = UM::Terminate.new pending.each { @machine.schedule(it, signal) } @machine.timeout(PENDING_REQUESTS_TIMEOUT_PERIOD, UM::Terminate) do @machine.join(*@connection_fiber_map.keys) rescue UM::Terminate # timeout on waiting for adapters to finish running, do nothing end end
def initialize(machine, opts, &app)
def initialize(machine, opts, &app) @machine = machine @opts = opts @app = app || app_from_opts @server_fds = [] @accept_fibers = [] end
def run
def run setup @machine.join(*@accept_fibers) rescue UM::Terminate graceful_shutdown end
def setup
def setup bind_info = get_bind_entries bind_info.each do |(host, port)| fd = setup_server_socket(host, port) @server_fds << fd @accept_fibers << @machine.spin { accept_incoming(fd) } end bind_string = bind_info.map { it.join(':') }.join(', ') @opts[:logger]&.info(message: "Listening on #{bind_string}") # map fibers @connection_fiber_map = {} end
def setup_server_socket(host, port)
def setup_server_socket(host, port) fd = @machine.socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0) @machine.setsockopt(fd, UM::SOL_SOCKET, UM::SO_REUSEADDR, true) @machine.setsockopt(fd, UM::SOL_SOCKET, UM::SO_REUSEPORT, true) @machine.bind(fd, host, port) @machine.listen(fd, UM::SOMAXCONN) fd end