lib/protobuf/rpc/servers/zmq/broker.rb



require 'resolv'
require 'protobuf/rpc/servers/zmq/util'

module Protobuf
  module Rpc
    module Zmq
      class Broker
        include ::Protobuf::Rpc::Zmq::Util
        attr_reader :frontend, :backend, :poller, :context

        ##
        # Constructor
        #
        def initialize(options = {})
          @context = ::ZMQ::Context.new
          @frontend = setup_frontend(options)
          @backend = setup_backend(options)
          @poller = setup_poller
        end

        ##
        # Instance Methods
        #
        def poll
          poller.poll(1000)
          poller.readables.each do |socket|
            case socket
            when frontend then
              move_to(backend, socket)
            when backend then
              move_to(frontend, socket)
            end
          end
        end

        def teardown
          frontend.close
          backend.close
          context.terminate
        end

        private

          def move_to(frontend_or_backend, socket)
            more_data = true

            while more_data do
              socket.recv_string(data = "")
              more_data = socket.more_parts?
              more_data_flag = (more_data ? ::ZMQ::SNDMORE : 0)
              frontend_or_backend.send_string(data, more_data_flag)
            end
          end

          def setup_backend(options = {})
            dealer_options = options.merge(:port => options[:port] + 1)
            host = dealer_options[:host]
            port = dealer_options[:port]

            zmq_backend = context.socket(::ZMQ::DEALER)
            zmq_error_check(zmq_backend.bind(bind_address(host, port)))
            zmq_backend
          end

          def setup_frontend(options = {})
            host = options[:host]
            port = options[:port]

            zmq_frontend = context.socket(::ZMQ::ROUTER)
            zmq_error_check(zmq_frontend.bind(bind_address(host, port)))
            zmq_frontend
          end

          def bind_address(host, port)
            "tcp://#{resolve_ip(host)}:#{port}"
          end

          def setup_poller
            zmq_poller = ::ZMQ::Poller.new
            zmq_poller.register(frontend, ::ZMQ::POLLIN)
            zmq_poller.register(backend, ::ZMQ::POLLIN)
            zmq_poller
          end
      end
    end
  end
end