lib/protobuf/rpc/connectors/eventmachine.rb
require 'protobuf/rpc/connectors/base' require 'protobuf/rpc/connectors/em_client' module Protobuf module Rpc module Connectors class EventMachine < Base def send_request ensure_em_running do f = Fiber.current ::EM.next_tick do log_debug { sign_message('Scheduling EventMachine client request to be created on next tick') } cnxn = EMClient.connect(options, &ensure_cb) cnxn.on_success(&success_cb) if success_cb cnxn.on_failure(&ensure_cb) cnxn.on_complete { resume_fiber(f) } cnxn.setup_connection cnxn.send_data log_debug { sign_message('Connection scheduled') } end set_timeout_and_validate_fiber end end # Returns a callable that ensures any errors will be returned to the client # # If a failure callback was set, just use that as a direct assignment # otherwise implement one here that simply throws an exception, since we # don't want to swallow the black holes. # def ensure_cb @ensure_cb ||= (@failure_cb || lambda { |error| raise "#{error.code.name}: #{error.message}" }) end def log_signature @_log_signature ||= "client-#{self.class}" end private def ensure_em_running(&blk) if ::EM.reactor_running? @using_global_reactor = true yield else @using_global_reactor = false ::EM.fiber_run do blk.call ::EM.stop end end end def resume_fiber(fib) ::EM::cancel_timer(@timeout_timer) fib.resume(true) rescue => ex log_exception(ex) message = "Synchronous client failed: #{ex.message}" error_stop_reactor(message) end def set_timeout_and_validate_fiber @timeout_timer = ::EM::add_timer(@options[:timeout]) do message = "Client timeout of #{@options[:timeout]} seconds expired" error_stop_reactor(message) end Fiber.yield rescue FiberError => ex log_exception(ex) message = "Synchronous calls must be in 'EM.fiber_run' block" error_stop_reactor(message) end def error_stop_reactor(message) err = Protobuf::Rpc::ClientError.new(Protobuf::Socketrpc::ErrorReason::RPC_ERROR, message) ensure_cb.call(err) ::EM.stop unless @using_global_reactor end end end end end