require'protobuf/common/logger'require'protobuf/rpc/client'require'protobuf/rpc/error'moduleProtobufmoduleRpc# Object to encapsulate the request/response types for a given service method# RpcMethod=Struct.new("RpcMethod",:service,:method,:request_type,:response_type)classServiceincludeProtobuf::Logger::LogMethodsattr_reader:requestattr_accessor:response,:async_responderprivate:request,:response,:response=DEFAULT_LOCATION={:host=>'localhost',:port=>9939}# Class methods are intended for use on the client-side.#class<<self# You MUST add the method name to this list if you are adding# instance methods below, otherwise stuff will definitely breakNON_RPC_METHODS=%w( rpcs call_rpc on_rpc_failed rpc_failed request response method_missing async_responder on_send_response send_response log_signature )# Override methods being added to the class# If the method isn't already a private instance method, or it doesn't start with rpc_, # or it isn't in the reserved method list (NON_RPC_METHODS),# We want to remap the method such that we can wrap it in before and after behavior,# most notably calling call_rpc against the method. See call_rpc for more info.defmethod_added(old)new_method=:"rpc_#{old}"returnifprivate_instance_methods.include?(new_method)orold=~/^rpc_/orNON_RPC_METHODS.include?(old.to_s)alias_methodnew_method,oldprivatenew_methoddefine_method(old)do|pb_request|call_rpc(old.to_sym,pb_request)endrescueArgumentError=>e# Wrap a known issue where an instance method was defined in the class without# it being ignored with NON_RPC_METHODS. raiseArgumentError,"#{e.message} (Note: This could mean that you need to add the method #{old} to the NON_RPC_METHODS list)"end# Generated service classes should call this method on themselves to add rpc methods# to the stack with a given request and response typedefrpc(method,request_type,response_type)rpcs[self]||={}rpcs[self][method]=RpcMethod.newself,method,request_type,response_typeend# Shorthand for @rpcs class instance vardefrpcs@rpcs||={}end# Create a new client for the given service.# See Client#initialize and ClientConnection::DEFAULT_OPTIONS# for all available options.#defclient(options={})configureClient.new({:service=>self,:async=>false,:host=>self.host,:port=>self.port}.merge(options))end# Allows service-level configuration of location.# Useful for system-startup configuration of a service# so that any Clients using the Service.client sugar# will not have to configure the location each time.# defconfigure(config={})locations[self]||={}locations[self][:host]=config[:host]ifconfig.key?(:host)locations[self][:port]=config[:port]ifconfig.key?(:port)end# Shorthand call to configure, passing a string formatted as hostname:port# e.g. 127.0.0.1:9933# e.g. localhost:0#deflocated_at(location)returniflocation.nil?orlocation.downcase.strip!~/[a-z0-9.]+:\d+/host,port=location.downcase.strip.split':'configure:host=>host,:port=>port.to_iend# The host location of the servicedefhostconfigurelocations[self][:host]||DEFAULT_LOCATION[:host]end# The port of the service on the destination serverdefportconfigurelocations[self][:port]||DEFAULT_LOCATION[:port]end# Shorthand for @locations class instance vardeflocations@locations||={}endenddeflog_signature@log_signature||="service-#{self.class}"end# If a method comes through that hasn't been found, and it# is defined in the rpcs method list, we know that the rpc# stub has been created, but no implementing method provides the# functionality, so throw an appropriate error, otherwise go to super# defmethod_missingm,*paramsifrpcs.key?(m)exc=MethodNotFound.new"#{self}##{m} was defined as a valid rpc method, but was not implemented."log_errorexc.messageraiseexcelselog_error"-------------- [#{log_signature}] %s#%s not rpc method, passing to super"%[self.class.name,m.to_s]superm,paramsendend# Convenience wrapper around the rpc method list for a given classdefrpcsself.class.rpcs[self.class]end# Callback register for the server when a service# method calls rpc_failed. Called by Service#rpc_failed.defon_rpc_failed(&rpc_failure_cb)@rpc_failure_cb=rpc_failure_cbend# Automatically fail a service method.# NOTE: This shortcuts the @async_responder paradigm. There is# not any way to get around this currently (and I'm not sure you should want to).#defrpc_failed(message="RPC Failed while executing service method #{@current_method}")error_message='Unable to invoke rpc_failed, no failure callback is setup.'log_and_raise_error(error_message)if@rpc_failure_cb.nil?error=message.is_a?(String)?RpcFailed.new(message):messagelog_warn"[#{log_signature}] RPC Failed: %s"%error.message@rpc_failure_cb.call(error)end# Callback register for the server to be notified# when it is appropriate to generate a response to the client.# Used in conjunciton with Service#send_response.# defon_send_response(&responder)@responder=responderend# Tell the server to generate response and send it to the client.## NOTE: If @async_responder is set to true, this MUST be called by# the implementing service method, otherwise the connection# will timeout since no data will be sent.#defsend_responseerror_message="Unable to send response, responder is nil. It appears you aren't inside of an RPC request/response cycle."log_and_raise_error(error_message)if@responder.nil?@responder.call(@response)endprivatedeflog_and_raise_error(error_message)log_error(error_message)raiseerror_messageend# Call the rpc method that was previously privatized.# call_rpc allows us to wrap the normal method call with # before and after behavior, most notably setting up the request# and response instances.# # Implementing rpc methods should be aware# that request and response are implicitly available, and# that response should be manipulated during the rpc method,# as there is no way to reliably determine the response like# a normal (http-based) controller method would be able to.## Async behavior of responding can be achieved in the rpc method# by explicitly setting self.async_responder = true. It is then# the responsibility of the service method to send the response,# by calling self.send_response without any arguments. The rpc# server is setup to handle synchronous and asynchronous responses.#defcall_rpc(method,pb_request)@current_method=method# Allows the service to set whether or not# it would like to asynchronously respond to the connected client(s)@async_responder=false# Setup the request@request=rpcs[method].request_type.new@request.parse_from_string(pb_request.request_proto)rescueexc=BadRequestProto.new'Unable to parse request: %s'%$!.messagelog_errorexc.messagelog_error$!.backtrace.join("\n")raiseexcelse# when no Exception was thrown# Setup the response@response=rpcs[method].response_type.newlog_debug"[#{log_signature}] calling service method %s#%s"%[self.class,method]# Call the aliased rpc method (e.g. :rpc_find for :find)__send__("rpc_#{method}".to_sym)log_debug"[#{log_signature}] completed service method %s#%s"%[self.class,method]# Pass the populated response back to the server# Note this will only get called if the rpc method didn't explode (by design)if@async_responderlog_debug"[#{log_signature}] async request, not sending response (yet)"elselog_debug"[#{log_signature}] trigger server send_response"send_responseendendendendend