class Protobuf::Rpc::Service
def call_rpc(method, pb_request)
server is setup to handle synchronous and asynchronous responses.
by calling self.send_response without any arguments. The rpc
the responsibility of the service method to send the response,
by explicitly setting self.async_responder = true. It is then
Async behavior of responding can be achieved in the rpc method
a normal (http-based) controller method would be able to.
as there is no way to reliably determine the response like
that response should be manipulated during the rpc method,
that request and response are implicitly available, and
Implementing rpc methods should be aware
and response instances.
before and after behavior, most notably setting up the request
call_rpc allows us to wrap the normal method call with
Call the rpc method that was previously privatized.
def call_rpc(method, pb_request) @current_method = method # Allows the service to set whether or not # it would like to asynchronously respond to the connected client(s) @async_responder = false # Setup the request @request = rpcs[method].request_type.new @request.parse_from_string(pb_request.request_proto) rescue exc = BadRequestProto.new 'Unable to parse request: %s' % $!.message log_error exc.message log_error $!.backtrace.join("\n") raise exc else # when no Exception was thrown # Setup the response @response = rpcs[method].response_type.new log_debug "[#{log_signature}] calling service method %s#%s" % [self.class, method] # Call the aliased rpc method (e.g. :rpc_find for :find) __send__("rpc_#{method}".to_sym) log_debug "[#{log_signature}] completed service method %s#%s" % [self.class, method] # Pass the populated response back to the server # Note this will only get called if the rpc method didn't explode (by design) if @async_responder log_debug "[#{log_signature}] async request, not sending response (yet)" else log_debug "[#{log_signature}] trigger server send_response" send_response end end
def client(options={})
for all available options.
See Client#initialize and ClientConnection::DEFAULT_OPTIONS
Create a new client for the given service.
def client(options={}) configure Client.new({ :service => self, :async => false, :host => self.host, :port => self.port }.merge(options)) end
def configure(config={})
will not have to configure the location each time.
so that any Clients using the Service.client sugar
Useful for system-startup configuration of a service
Allows service-level configuration of location.
def configure(config={}) locations[self] ||= {} locations[self][:host] = config[:host] if config.key?(:host) locations[self][:port] = config[:port] if config.key?(:port) end
def host
def host configure locations[self][:host] || DEFAULT_LOCATION[:host] end
def located_at(location)
e.g. localhost:0
e.g. 127.0.0.1:9933
Shorthand call to configure, passing a string formatted as hostname:port
def located_at(location) return if location.nil? or location.downcase.strip !~ /[a-z0-9.]+:\d+/ host, port = location.downcase.strip.split ':' configure :host => host, :port => port.to_i end
def locations
def locations @locations ||= {} end
def log_and_raise_error(error_message)
def log_and_raise_error(error_message) log_error(error_message) raise error_message end
def log_signature
def log_signature @log_signature ||= "service-#{self.class}" end
def method_added(old)
We want to remap the method such that we can wrap it in before and after behavior,
or it isn't in the reserved method list (NON_RPC_METHODS),
If the method isn't already a private instance method, or it doesn't start with rpc_,
Override methods being added to the class
def method_added(old) new_method = :"rpc_#{old}" return if private_instance_methods.include?(new_method) or old =~ /^rpc_/ or NON_RPC_METHODS.include?(old.to_s) alias_method new_method, old private new_method define_method(old) do |pb_request| call_rpc(old.to_sym, pb_request) end rescue ArgumentError => e # Wrap a known issue where an instance method was defined in the class without # it being ignored with NON_RPC_METHODS. raise ArgumentError, "#{e.message} (Note: This could mean that you need to add the method #{old} to the NON_RPC_METHODS list)" end
def method_missing m, *params
functionality, so throw an appropriate error, otherwise go to super
stub has been created, but no implementing method provides the
is defined in the rpcs method list, we know that the rpc
If a method comes through that hasn't been found, and it
def method_missing m, *params if rpcs.key?(m) exc = MethodNotFound.new "#{self}##{m} was defined as a valid rpc method, but was not implemented." log_error exc.message raise exc else log_error "-------------- [#{log_signature}] %s#%s not rpc method, passing to super" % [self.class.name, m.to_s] super m, params end end
def on_rpc_failed(&rpc_failure_cb)
Callback register for the server when a service
def on_rpc_failed(&rpc_failure_cb) @rpc_failure_cb = rpc_failure_cb end
def on_send_response(&responder)
Used in conjunciton with Service#send_response.
when it is appropriate to generate a response to the client.
Callback register for the server to be notified
def on_send_response(&responder) @responder = responder end
def port
def port configure locations[self][:port] || DEFAULT_LOCATION[:port] end
def rpc(method, request_type, response_type)
Generated service classes should call this method on themselves to add rpc methods
def rpc(method, request_type, response_type) rpcs[self] ||= {} rpcs[self][method] = RpcMethod.new self, method, request_type, response_type end
def rpc_failed(message="RPC Failed while executing service method #{@current_method}")
not any way to get around this currently (and I'm not sure you should want to).
NOTE: This shortcuts the @async_responder paradigm. There is
Automatically fail a service method.
def rpc_failed(message="RPC Failed while executing service method #{@current_method}") error_message = 'Unable to invoke rpc_failed, no failure callback is setup.' log_and_raise_error(error_message) if @rpc_failure_cb.nil? error = message.is_a?(String) ? RpcFailed.new(message) : message log_warn "[#{log_signature}] RPC Failed: %s" % error.message @rpc_failure_cb.call(error) end
def rpcs
def rpcs @rpcs ||= {} end
def rpcs
def rpcs self.class.rpcs[self.class] end
def send_response
will timeout since no data will be sent.
the implementing service method, otherwise the connection
NOTE: If @async_responder is set to true, this MUST be called by
Tell the server to generate response and send it to the client.
def send_response error_message = "Unable to send response, responder is nil. It appears you aren't inside of an RPC request/response cycle." log_and_raise_error(error_message) if @responder.nil? @responder.call(@response) end