lib/tp2/server.rb
# frozen_string_literal: true require 'tp2/http1_connection' require 'tp2/request_extensions' require 'tp2/rack_adapter' module TP2 class Server PENDING_REQUESTS_GRACE_PERIOD = 0.1 PENDING_REQUESTS_TIMEOUT_PERIOD = 5 def self.rack_app(opts) raise 'Missing app location' if !opts[:app_location] TP2::RackAdapter.load(opts[:app_location]) end def self.tp2_app(_machine, opts) if opts[:app_location] opts[:logger]&.call("Loading app at #{opts[:app_location]}") require opts[:app_location] opts.merge!(TP2.config) end opts[:app] end def self.static_app(opts); end def initialize(machine, opts, &app) @machine = machine @opts = opts @app = app || app_from_opts @server_fds = [] @accept_fibers = [] end def app_from_opts case @opts[:app_type] when nil, :tp2 Server.tp2_app(@machine, @opts) when :rack Server.rack_app(@opts) when :static Server.static_app(@opts) else raise "Invalid app type #{@opts[:app_type].inspect}" end end def run setup @machine.join(*@accept_fibers) rescue UM::Terminate graceful_shutdown end private def setup bind_info = get_bind_entries bind_info.each do |(host, port)| fd = setup_server_socket(host, port) @server_fds << fd @accept_fibers << @machine.spin { accept_incoming(fd) } end bind_string = bind_info.map { it.join(':') }.join(', ') @opts[:logger]&.call("Listening on #{bind_string}") # map fibers @connection_fiber_map = {} end def get_bind_entries bind = @opts[:bind] case bind when Array bind.map { bind_info(it) } when String [bind_info(bind)] else # default [['0.0.0.0', 1234]] end end def bind_info(bind_string) parts = bind_string.split(':') [parts[0], parts[1].to_i] end def setup_server_socket(host, port) fd = @machine.socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0) @machine.setsockopt(fd, UM::SOL_SOCKET, UM::SO_REUSEADDR, true) @machine.setsockopt(fd, UM::SOL_SOCKET, UM::SO_REUSEPORT, true) @machine.bind(fd, host, port) @machine.listen(fd, UM::SOMAXCONN) fd end def accept_incoming(listen_fd) @machine.accept_each(listen_fd) do |fd| conn = HTTP1Connection.new(@machine, fd, @opts, &@app) f = @machine.spin(conn) do it.run ensure @connection_fiber_map.delete(f) end @connection_fiber_map[f] = true end rescue UM::Terminate # terminated rescue Exception => e p e p e.backtrace exit end def close_all_server_fds @server_fds.each { @machine.close_async(it) } end def graceful_shutdown @opts[:logger]&.call('Shutting down gracefully...') # stop listening close_all_server_fds @machine.snooze return if @connection_fiber_map.empty? # sleep for a bit, let requests finish @machine.sleep(PENDING_REQUESTS_GRACE_PERIOD) return if @connection_fiber_map.empty? # terminate pending fibers pending = @connection_fiber_map.keys signal = UM::Terminate.new pending.each { @machine.schedule(it, signal) } @machine.timeout(PENDING_REQUESTS_TIMEOUT_PERIOD, UM::Terminate) do @machine.join(*@connection_fiber_map.keys) rescue UM::Terminate # timeout on waiting for adapters to finish running, do nothing end end end end