lib/protobuf/rpc/servers/socket_server.rb



require 'protobuf/rpc/server'

module Protobuf
  module Rpc
    class SocketServer  
      include Protobuf::Rpc::Server
      include Protobuf::Logger::LogMethods

      class << self

        def cleanup? 
          # every 10 connections run a cleanup routine after closing the response
          @threads.size > (@thread_threshold - 1) && (@threads.size % @thread_threshold) == 0
        end

        def cleanup_threads
          log_debug "[#{log_signature}] Thread cleanup - #{@threads.size} - start"

          @threads = @threads.select do |t| 
            if t[:thread].alive? 
              true  
            else
              t[:thread].join
              @working.delete(t[:socket])
              false
            end
          end

          log_debug "[#{log_signature}] Thread cleanup - #{@threads.size} - complete"
        end

        def log_signature
          @log_signature ||= "server-#{self}"
        end

        def new_worker(socket)
          Thread.new(socket) do |sock|
            Protobuf::Rpc::SocketServer::Worker.new(sock) do |s|
              s.close
            end
          end
        end

        # TODO make listen/backlog part of config
        def run(host = "127.0.0.1", port = 9399, backlog = 100, thread_threshold = 100)
          log_debug "[#{log_signature}] Run"
          @running = true
          @threads = []
          @thread_threshold = thread_threshold
          @server = TCPServer.new(host, port)
          @server.listen(backlog)
          @working = []
          @listen_fds = [@server]

          while running?
            log_debug "[#{log_signature}] Waiting for connections"

            if ready_cnxns = IO.select(@listen_fds, [], [], 20)
              cnxns = ready_cnxns.first
              cnxns.each do |client|
                case 
                when !running? then
                  # no-op
                when client == @server then 
                  log_debug "[#{log_signature}] Accepted new connection"
                  client, sockaddr = @server.accept
                  @listen_fds << client
                else 
                  if !@working.include?(client)
                    @working << @listen_fds.delete(client)
                    log_debug "[#{log_signature}] Working" 
                    @threads << { :thread => new_worker(client), 
                                  :socket => client }

                    cleanup_threads if cleanup?
                  end
                end
              end
            else
              # Run a cleanup if select times out while waiting
              cleanup_threads if @threads.size > 1
            end
          end

        rescue 
          # Closing the server causes the loop to raise an exception here
          raise if running?
        end

        def running?
          @running
        end

        def stop 
          @running = false
          @server.close
        end

      end

      class Worker 
        include Protobuf::Rpc::Server
        include Protobuf::Logger::LogMethods

        def initialize(sock, &complete_cb)
          @did_response = false
          @socket = sock
          @request = Protobuf::Socketrpc::Request.new
          @response = Protobuf::Socketrpc::Response.new
          @buffer = Protobuf::Rpc::Buffer.new(:read)
          @stats = Protobuf::Rpc::Stat.new(:SERVER, true)
          @complete_cb = complete_cb
          log_debug "[#{log_signature}] Post init, new read buffer created"

          @stats.client = Socket.unpack_sockaddr_in(@socket.getpeername)
          @buffer << read_data 
          log_debug "[#{log_signature}] handling request"
          handle_client if @buffer.flushed?
        end

        def log_signature
          @log_signature ||= "server-#{self.class}-#{object_id}"
        end

        def read_data
          size_io = StringIO.new

          while((size_reader = @socket.getc) != "-")
            size_io << size_reader
          end
          str_size_io = size_io.string

          "#{str_size_io}-#{@socket.read(str_size_io.to_i)}"
        end

        def send_data(data)
          log_debug "[#{log_signature}] sending data : %s" % data
          @socket.write(data)
          @socket.flush
          @complete_cb.call(@socket)
        end
      end
    end
  end
end