class Kafka::SocketWithTimeout

def initialize(host, port, connect_timeout: nil, timeout: nil)

Raises:
  • (Errno::ETIMEDOUT) - if the timeout is exceeded.

Parameters:
  • timeout (Integer) -- the read and write timeout, in seconds.
  • connect_timeout (Integer) -- the connection timeout, in seconds.
  • port (Integer) --
  • host (String) --
def initialize(host, port, connect_timeout: nil, timeout: nil)
  addr = Socket.getaddrinfo(host, nil)
  sockaddr = Socket.pack_sockaddr_in(port, addr[0][3])
  @timeout = timeout
  @socket = Socket.new(Socket.const_get(addr[0][0]), Socket::SOCK_STREAM, 0)
  @socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1)
  begin
    # Initiate the socket connection in the background. If it doesn't fail
    # immediately it will raise an IO::WaitWritable (Errno::EINPROGRESS)
    # indicating the connection is in progress.
    @socket.connect_nonblock(sockaddr)
  rescue IO::WaitWritable
    # IO.select will block until the socket is writable or the timeout
    # is exceeded, whichever comes first.
    unless IO.select(nil, [@socket], nil, connect_timeout)
      # IO.select returns nil when the socket is not ready before timeout
      # seconds have elapsed
      @socket.close
      raise Errno::ETIMEDOUT
    end
    begin
      # Verify there is now a good connection.
      @socket.connect_nonblock(sockaddr)
    rescue Errno::EISCONN
      # The socket is connected, we're good!
    end
  end
end