lib/tp2/server.rb



# frozen_string_literal: true

require 'tp2/http1_connection'
require 'tp2/request_extensions'
require 'tp2/rack_adapter'

module TP2
  class Server
    PENDING_REQUESTS_GRACE_PERIOD = 0.1
    PENDING_REQUESTS_TIMEOUT_PERIOD = 5

    def self.rack_app(opts)
      raise 'Missing app location' if !opts[:app_location]

      TP2::RackAdapter.load(opts[:app_location])
    end

    def self.tp2_app(_machine, opts)
      if opts[:app_location]
        opts[:logger]&.call("Loading app at #{opts[:app_location]}")
        require opts[:app_location]

        opts.merge!(TP2.config)
      end
      opts[:app]
    end

    def self.static_app(opts); end

    def initialize(machine, opts, &app)
      @machine = machine
      @opts = opts
      @app = app || app_from_opts
      @server_fds = []
      @accept_fibers = []
    end

    def app_from_opts
      case @opts[:app_type]
      when nil, :tp2
        Server.tp2_app(@machine, @opts)
      when :rack
        Server.rack_app(@opts)
      when :static
        Server.static_app(@opts)
      else
        raise "Invalid app type #{@opts[:app_type].inspect}"
      end
    end

    def run
      setup
      @machine.join(*@accept_fibers)
    rescue UM::Terminate
      graceful_shutdown
    end

    private

    def setup
      bind_info = get_bind_entries
      bind_info.each do |(host, port)|
        fd = setup_server_socket(host, port)
        @server_fds << fd
        @accept_fibers << @machine.spin { accept_incoming(fd) }
      end
      bind_string = bind_info.map { it.join(':') }.join(', ')
      @opts[:logger]&.call("Listening on #{bind_string}")

      # map fibers
      @connection_fiber_map = {}
    end

    def get_bind_entries
      bind = @opts[:bind]
      case bind
      when Array
        bind.map { bind_info(it) }
      when String
        [bind_info(bind)]
      else
        # default
        [['0.0.0.0', 1234]]
      end
    end

    def bind_info(bind_string)
      parts = bind_string.split(':')
      [parts[0], parts[1].to_i]
    end

    def setup_server_socket(host, port)
      fd = @machine.socket(UM::AF_INET, UM::SOCK_STREAM, 0, 0)
      @machine.setsockopt(fd, UM::SOL_SOCKET, UM::SO_REUSEADDR, true)
      @machine.setsockopt(fd, UM::SOL_SOCKET, UM::SO_REUSEPORT, true)
      @machine.bind(fd, host, port)
      @machine.listen(fd, UM::SOMAXCONN)
      fd
    end

    def accept_incoming(listen_fd)
      @machine.accept_each(listen_fd) do |fd|
        conn = HTTP1Connection.new(@machine, fd, @opts, &@app)
        f = @machine.spin(conn) do
          it.run
        ensure
          @connection_fiber_map.delete(f)
        end
        @connection_fiber_map[f] = true
      end
    rescue UM::Terminate
      # terminated
    rescue Exception => e
      p e
      p e.backtrace
      exit
    end

    def close_all_server_fds
      @server_fds.each { @machine.close_async(it) }
    end

    def graceful_shutdown
      @opts[:logger]&.call('Shutting down gracefully...')

      # stop listening
      close_all_server_fds
      @machine.snooze

      return if @connection_fiber_map.empty?

      # sleep for a bit, let requests finish
      @machine.sleep(PENDING_REQUESTS_GRACE_PERIOD)
      return if @connection_fiber_map.empty?

      # terminate pending fibers
      pending = @connection_fiber_map.keys
      signal = UM::Terminate.new
      pending.each { @machine.schedule(it, signal) }

      @machine.timeout(PENDING_REQUESTS_TIMEOUT_PERIOD, UM::Terminate) do
        @machine.join(*@connection_fiber_map.keys)
      rescue UM::Terminate
        # timeout on waiting for adapters to finish running, do nothing
      end
    end
  end
end