class Protobuf::Rpc::SocketServer
def cleanup?
def cleanup? # every 10 connections run a cleanup routine after closing the response @threads.size > (@thread_threshold - 1) && (@threads.size % @thread_threshold) == 0 end
def cleanup_threads
def cleanup_threads log_debug "[#{log_signature}] Thread cleanup - #{@threads.size} - start" @threads = @threads.select do |t| if t[:thread].alive? true else t[:thread].join @working.delete(t[:socket]) false end end log_debug "[#{log_signature}] Thread cleanup - #{@threads.size} - complete" end
def log_signature
def log_signature @log_signature ||= "server-#{self}" end
def new_worker(socket)
def new_worker(socket) Thread.new(socket) do |sock| Protobuf::Rpc::SocketServer::Worker.new(sock) do |s| s.close end end end
def run(host = "127.0.0.1", port = 9399, backlog = 100, thread_threshold = 100)
def run(host = "127.0.0.1", port = 9399, backlog = 100, thread_threshold = 100) log_debug "[#{log_signature}] Run" @running = true @threads = [] @thread_threshold = thread_threshold @server = TCPServer.new(host, port) @server.listen(backlog) @working = [] @listen_fds = [@server] while running? log_debug "[#{log_signature}] Waiting for connections" if ready_cnxns = IO.select(@listen_fds, [], [], 20) cnxns = ready_cnxns.first cnxns.each do |client| case when !running? then # no-op when client == @server then log_debug "[#{log_signature}] Accepted new connection" client, sockaddr = @server.accept @listen_fds << client else if !@working.include?(client) @working << @listen_fds.delete(client) log_debug "[#{log_signature}] Working" @threads << { :thread => new_worker(client), :socket => client } cleanup_threads if cleanup? end end end else # Run a cleanup if select times out while waiting cleanup_threads if @threads.size > 1 end end rescue # Closing the server causes the loop to raise an exception here raise if running? end
def running?
def running? @running end
def stop
def stop @running = false @server.close end