class Protobuf::Rpc::Connectors::EventMachine

def ensure_cb


don't want to swallow the black holes.
otherwise implement one here that simply throws an exception, since we
If a failure callback was set, just use that as a direct assignment

Returns a callable that ensures any errors will be returned to the client
def ensure_cb
  @ensure_cb ||= (@failure_cb || lambda { |error| raise "#{error.code.name}: #{error.message}" })
end

def ensure_em_running(&blk)

def ensure_em_running(&blk)
  if ::EM.reactor_running?
    @using_global_reactor = true
    yield
  else
    @using_global_reactor = false
    ::EM.fiber_run do
      blk.call
      ::EM.stop
    end
  end
end

def error_stop_reactor(message)

def error_stop_reactor(message)
  err = Protobuf::Rpc::ClientError.new(Protobuf::Socketrpc::ErrorReason::RPC_ERROR, message)
  ensure_cb.call(err)
  ::EM.stop unless @using_global_reactor
end

def log_signature

def log_signature
  @_log_signature ||= "client-#{self.class}"
end

def resume_fiber(fib)

def resume_fiber(fib)
  ::EM::cancel_timer(@timeout_timer)
  fib.resume(true)
rescue => ex
  log_exception(ex)
  message = "Synchronous client failed: #{ex.message}"
  error_stop_reactor(message)
end

def send_request

def send_request
  ensure_em_running do
    f = Fiber.current
    ::EM.next_tick do
      log_debug { sign_message('Scheduling EventMachine client request to be created on next tick') }
      cnxn = EMClient.connect(options, &ensure_cb)
      cnxn.on_success(&success_cb) if success_cb
      cnxn.on_failure(&ensure_cb)
      cnxn.on_complete { resume_fiber(f) }
      cnxn.setup_connection
      cnxn.send_data
      log_debug { sign_message('Connection scheduled') }
    end
    set_timeout_and_validate_fiber
  end
end

def set_timeout_and_validate_fiber

def set_timeout_and_validate_fiber
  @timeout_timer = ::EM::add_timer(@options[:timeout]) do
    message = "Client timeout of #{@options[:timeout]} seconds expired"
    error_stop_reactor(message)
  end
  Fiber.yield
rescue FiberError => ex
  log_exception(ex)
  message = "Synchronous calls must be in 'EM.fiber_run' block"
  error_stop_reactor(message)
end