class Protobuf::Rpc::Connectors::EventMachine
def ensure_cb
don't want to swallow the black holes.
otherwise implement one here that simply throws an exception, since we
If a failure callback was set, just use that as a direct assignment
Returns a callable that ensures any errors will be returned to the client
def ensure_cb @ensure_cb ||= (@failure_cb || lambda { |error| raise "#{error.code.name}: #{error.message}" }) end
def ensure_em_running(&blk)
def ensure_em_running(&blk) if ::EM.reactor_running? @using_global_reactor = true yield else @using_global_reactor = false ::EM.fiber_run do blk.call ::EM.stop end end end
def error_stop_reactor(message)
def error_stop_reactor(message) err = Protobuf::Rpc::ClientError.new(Protobuf::Socketrpc::ErrorReason::RPC_ERROR, message) ensure_cb.call(err) ::EM.stop unless @using_global_reactor end
def log_signature
def log_signature @_log_signature ||= "client-#{self.class}" end
def resume_fiber(fib)
def resume_fiber(fib) ::EM::cancel_timer(@timeout_timer) fib.resume(true) rescue => ex log_exception(ex) message = "Synchronous client failed: #{ex.message}" error_stop_reactor(message) end
def send_request
def send_request ensure_em_running do f = Fiber.current ::EM.next_tick do log_debug { sign_message('Scheduling EventMachine client request to be created on next tick') } cnxn = EMClient.connect(options, &ensure_cb) cnxn.on_success(&success_cb) if success_cb cnxn.on_failure(&ensure_cb) cnxn.on_complete { resume_fiber(f) } cnxn.setup_connection cnxn.send_data log_debug { sign_message('Connection scheduled') } end set_timeout_and_validate_fiber end end
def set_timeout_and_validate_fiber
def set_timeout_and_validate_fiber @timeout_timer = ::EM::add_timer(@options[:timeout]) do message = "Client timeout of #{@options[:timeout]} seconds expired" error_stop_reactor(message) end Fiber.yield rescue FiberError => ex log_exception(ex) message = "Synchronous calls must be in 'EM.fiber_run' block" error_stop_reactor(message) end