class Protobuf::Rpc::Connectors::Zmq

def self.ping_port_responses


Class Methods
#
def self.ping_port_responses
  @ping_port_responses ||= ::ThreadSafe::Cache.new
end

def self.zmq_context(reload = false)

def self.zmq_context(reload = false)
  @zmq_contexts = nil if reload
  @zmq_contexts ||= Hash.new do |hash, key|
    hash[key] = ZMQ::Context.new
  end
  @zmq_contexts[Process.pid]
end

def check_available_rcv_timeout


Private Instance methods
#
def check_available_rcv_timeout
  @check_available_rcv_timeout ||= begin
    case
    when ENV.key?("PB_ZMQ_CLIENT_CHECK_AVAILABLE_RCV_TIMEOUT") then
      ENV["PB_ZMQ_CLIENT_CHECK_AVAILABLE_RCV_TIMEOUT"].to_i
    else
      200 # ms
    end
  end
end

def check_available_snd_timeout

def check_available_snd_timeout
  @check_available_snd_timeout ||= begin
    case
    when ENV.key?("PB_ZMQ_CLIENT_CHECK_AVAILABLE_SND_TIMEOUT") then
      ENV["PB_ZMQ_CLIENT_CHECK_AVAILABLE_SND_TIMEOUT"].to_i
    else
      200 # ms
    end
  end
end

def close_connection

def close_connection
  # The socket is automatically closed after every request.
end

def create_socket

the event of a timeout
service. The LINGER is set to 0 so we can close immediately in
Create a socket connected to a server that can handle the current
def create_socket
  attempt_number = 0
  begin
    attempt_number += 1
    socket = zmq_context.socket(::ZMQ::REQ)
    if socket # Make sure the context builds the socket
      server_uri = lookup_server_uri
      socket.setsockopt(::ZMQ::LINGER, 0)
      zmq_error_check(socket.connect(server_uri), :socket_connect)
      socket = socket_to_available_server(socket) if first_alive_load_balance?
    end
  end while socket.try(:socket).nil? && attempt_number < socket_creation_attempts
  raise RequestTimeout, "Cannot create new ZMQ client socket" if socket.try(:socket).nil?
  socket
end

def error?


Method to determine error state, must be used with Connector API.
def error?
  !! @error
end

def host_alive?(host)

def host_alive?(host)
  return true unless ping_port_enabled?
  if (last_response = self.class.ping_port_responses[host])
    if (Time.now.to_i - last_response[:at]) <= host_alive_check_interval
      return last_response[:ping_port_open]
    end
  end
  ping_port_open = ping_port_open?(host)
  self.class.ping_port_responses[host] = {
    :at => Time.now.to_i,
    :ping_port_open => ping_port_open,
  }
  ping_port_open
end

def host_alive_check_interval

def host_alive_check_interval
  @host_alive_check_interval ||= [ENV["PB_ZMQ_CLIENT_HOST_ALIVE_CHECK_INTERVAL"].to_i, 1].max
end

def log_signature


Instance methods
#
def log_signature
  @_log_signature ||= "[client-#{self.class}]"
end

def lookup_server_uri


to the host and port in the options
directory. If the service directory is not running, default
Lookup a server uri for the requested service in the service
def lookup_server_uri
  server_lookup_attempts.times do
    first_alive_listing = service_directory.all_listings_for(service).find do |listing|
      host_alive?(listing.try(:address))
    end
    if first_alive_listing
      host = first_alive_listing.try(:address)
      port = first_alive_listing.try(:port)
      @stats.server = [port, host]
      return "tcp://#{host}:#{port}"
    end
    host = options[:host]
    port = options[:port]
    if host_alive?(host)
      @stats.server = [port, host]
      return "tcp://#{host}:#{port}"
    end
    sleep(1.0 / 100.0)
  end
  fail "Host not found for service #{service}"
end

def ping_port_open?(host)

def ping_port_open?(host)
  Ping.new(host, ping_port.to_i).online?
end

def rcv_timeout

def rcv_timeout
  @rcv_timeout ||= begin
    case
    when options[:timeout] then
      options[:timeout]
    when ENV.key?("PB_ZMQ_CLIENT_RCV_TIMEOUT") then
      ENV["PB_ZMQ_CLIENT_RCV_TIMEOUT"].to_i
    else
      300_000 # 300 seconds
    end
  end
end

def send_request

Other tags:
    See: http://zguide.zeromq.org/php:chapter4#Client-side-Reliability-Lazy-Pirate-Pattern -
def send_request
  setup_connection
  send_request_with_lazy_pirate unless error?
end

def send_request_with_lazy_pirate


of retries, fail the request.
If we haven't received a legitimate response in the CLIENT_RETRIES number
Trying a number of times, attempt to get a response from the server.
def send_request_with_lazy_pirate
  attempt = 0
  begin
    attempt += 1
    send_request_with_timeout(attempt)
    parse_response
  rescue RequestTimeout
    retry if attempt < CLIENT_RETRIES
    failure(:RPC_FAILED, "The server repeatedly failed to respond within #{timeout} seconds")
  end
end

def send_request_with_timeout(attempt = 0)

def send_request_with_timeout(attempt = 0)
  socket = create_socket
  socket.setsockopt(::ZMQ::RCVTIMEO, rcv_timeout)
  socket.setsockopt(::ZMQ::SNDTIMEO, snd_timeout)
  logger.debug { sign_message("Sending Request (attempt #{attempt}, #{socket})") }
  zmq_eagain_error_check(socket.send_string(@request_data), :socket_send_string)
  logger.debug { sign_message("Waiting #{rcv_timeout}ms for response (attempt #{attempt}, #{socket})") }
  zmq_eagain_error_check(socket.recv_string(@response_data = ""), :socket_recv_string)
  logger.debug { sign_message("Response received (attempt #{attempt}, #{socket})") }
rescue ZmqEagainError
  logger.debug { sign_message("Timed out waiting for response (attempt #{attempt}, #{socket})") }
  raise RequestTimeout
ensure
  logger.debug { sign_message("Closing Socket") }
  zmq_error_check(socket.close, :socket_close) if socket
  logger.debug { sign_message("Socket closed") }
end

def server_lookup_attempts

def server_lookup_attempts
  @server_lookup_attempts ||= [ENV["PB_ZMQ_CLIENT_SERVER_LOOKUP_ATTEMPTS"].to_i, 5].max
end

def service


The service we're attempting to connect to
def service
  options[:service]
end

def service_directory

Alias for ::Protobuf::Rpc::ServiceDirectory.instance
def service_directory
  ::Protobuf::Rpc.service_directory
end

def snd_timeout

def snd_timeout
  @snd_timeout ||= begin
    case
    when options[:timeout] then
      options[:timeout]
    when ENV.key?("PB_ZMQ_CLIENT_SND_TIMEOUT") then
      ENV["PB_ZMQ_CLIENT_SND_TIMEOUT"].to_i
    else
      300_000 # 300 seconds
    end
  end
end

def socket_creation_attempts

def socket_creation_attempts
  @socket_creation_attempts ||= (ENV["PB_ZMQ_CLIENT_SOCKET_CREATION_ATTEMPTS"] || 5).to_i
end

def socket_to_available_server(socket)

def socket_to_available_server(socket)
  check_available_response = ""
  socket.setsockopt(::ZMQ::RCVTIMEO, check_available_rcv_timeout)
  socket.setsockopt(::ZMQ::SNDTIMEO, check_available_snd_timeout)
  zmq_recoverable_error_check(socket.send_string(::Protobuf::Rpc::Zmq::CHECK_AVAILABLE_MESSAGE), :socket_send_string)
  zmq_recoverable_error_check(socket.recv_string(check_available_response), :socket_recv_string)
  if check_available_response == ::Protobuf::Rpc::Zmq::NO_WORKERS_AVAILABLE
    zmq_recoverable_error_check(socket.close, :socket_close)
  end
  socket.setsockopt(::ZMQ::RCVTIMEO, -1)
  socket.setsockopt(::ZMQ::SNDTIMEO, -1)
  socket
rescue ZmqRecoverableError
  return nil # couldn't make a connection and need to try again
end

def zmq_context(reload = false)


an exit block to ensure the context is terminated correctly.
If the context does not exist, create it, then register
Return the ZMQ Context to use for this process.
def zmq_context(reload = false)
  self.class.zmq_context(reload)
end

def zmq_eagain_error_check(return_code, source)

def zmq_eagain_error_check(return_code, source)
  return if ::ZMQ::Util.resultcode_ok?(return_code || -1)
  if ::ZMQ::Util.errno == ::ZMQ::EAGAIN # rubocop:disable Style/GuardClause
    fail ZmqEagainError, <<-ERROR
    Last ZMQ API call to #{source} failed with "#{::ZMQ::Util.error_string}".
    #{caller(1).join($INPUT_RECORD_SEPARATOR)}
    ERROR
  else
    fail <<-ERROR
    Last ZMQ API call to #{source} failed with "#{::ZMQ::Util.error_string}".
    #{caller(1).join($INPUT_RECORD_SEPARATOR)}
    ERROR
  end
end

def zmq_error_check(return_code, source)

def zmq_error_check(return_code, source)
  return if ::ZMQ::Util.resultcode_ok?(return_code || -1)
  fail <<-ERROR
  Last ZMQ API call to #{source} failed with "#{::ZMQ::Util.error_string}".
  #{caller(1).join($INPUT_RECORD_SEPARATOR)}
  ERROR
end

def zmq_recoverable_error_check(return_code, source)

def zmq_recoverable_error_check(return_code, source)
  return if ::ZMQ::Util.resultcode_ok?(return_code || -1)
  fail ZmqRecoverableError, <<-ERROR
  Last ZMQ API call to #{source} failed with "#{::ZMQ::Util.error_string}".
  #{caller(1).join($INPUT_RECORD_SEPARATOR)}
  ERROR
end