lib/protobuf/rpc/servers/socket/server.rb
require 'protobuf/rpc/server' require 'protobuf/rpc/servers/socket/worker' module Protobuf module Rpc module Socket class Server include ::Protobuf::Rpc::Server include ::Protobuf::Logger::LogMethods AUTO_COLLECT_TIMEOUT = 5 # seconds def initialize(options) @options = options end def cleanup? # every 10 connections run a cleanup routine after closing the response @threads.size > (@threshold - 1) && (@threads.size % @threshold) == 0 end def cleanup_threads log_debug { sign_message("Thread cleanup - #{@threads.size} - start") } @threads = @threads.select do |t| if t[:thread].alive? true else t[:thread].join @working.delete(t[:socket]) false end end log_debug { sign_message("Thread cleanup - #{@threads.size} - complete") } end def log_signature @_log_signature ||= "server-#{self.class.name}" end def new_worker(socket) Thread.new(socket) do |sock| ::Protobuf::Rpc::Socket::Worker.new(sock) do |s| s.close end end end def run log_debug { sign_message("Run") } host = @options[:host] port = @options[:port] backlog = @options[:backlog] @threshold = @options[:threshold] @threads = [] @server = ::TCPServer.new(host, port) raise "The server was unable to start properly." if @server.closed? @server.listen(backlog) @working = [] @listen_fds = [@server] @running = true while running? log_debug { sign_message("Waiting for connections") } ready_cnxns = IO.select(@listen_fds, [], [], AUTO_COLLECT_TIMEOUT) rescue nil if ready_cnxns cnxns = ready_cnxns.first cnxns.each do |client| case when !running? then # no-op when client == @server then log_debug { sign_message("Accepted new connection") } client, sockaddr = @server.accept @listen_fds << client else unless @working.include?(client) @working << @listen_fds.delete(client) log_debug { sign_message("Working") } @threads << { :thread => new_worker(client), :socket => client } cleanup_threads if cleanup? end end end else # Run a cleanup if select times out while waiting cleanup_threads if @threads.size > 1 end end rescue Errno::EADDRINUSE raise rescue # Closing the server causes the loop to raise an exception here raise #if running? end def running? !!@running end def stop @running = false @server.try(:close) end end end end end