lib/protobuf/rpc/servers/zmq/worker.rb



require 'protobuf/rpc/server'
require 'protobuf/rpc/servers/zmq/util'

module Protobuf
  module Rpc
    module Zmq
      class Worker
        include ::Protobuf::Rpc::Server
        include ::Protobuf::Rpc::Zmq::Util

        ##
        # Constructor
        #
        def initialize(server)
          @server = server

          init_zmq_context
          init_backend_socket
        rescue
          teardown
          raise
        end

        ##
        # Instance Methods
        #
        def process_request
          @client_address, _, @request_data = read_from_backend

          unless @request_data.nil?
            log_debug { sign_message("handling request") }
            handle_client
          end
        end

        def run
          poller = ::ZMQ::Poller.new
          poller.register_readable(@backend_socket)
          poller.register_readable(@shutdown_socket)

          # Send request to broker telling it we are ready
          write_to_backend([::Protobuf::Rpc::Zmq::WORKER_READY_MESSAGE])

          loop do
            rc = poller.poll(500)

            # The server was shutdown and no requests are pending
            break if rc == 0 && !running?

            # Something went wrong
            break if rc == -1

            if rc > 0
              initialize_request!
              process_request
            end
          end
        ensure
          teardown
        end

        def running?
          @server.running?
        end

        def send_data
          data = @response.encode

          @stats.response_size = data.size

          write_to_backend([@client_address, "", data])
        end

        private

        def init_zmq_context
          @zmq_context = ZMQ::Context.new
        end

        def init_backend_socket
          @backend_socket = @zmq_context.socket(ZMQ::REQ)
          zmq_error_check(@backend_socket.connect(@server.backend_uri))
        end

        def read_from_backend
          frames = []
          zmq_error_check(@backend_socket.recv_strings(frames))
          frames
        end

        def teardown
          @backend_socket.try(:close)
          @zmq_context.try(:terminate)
        end

        def write_to_backend(frames)
          zmq_error_check(@backend_socket.send_strings(frames))
        end
      end
    end
  end
end