class Kafka::Connection

def open

def open
  @logger.debug "Opening connection to #{@host}:#{@port} with client id #{@client_id}..."
  if @ssl_context
    @socket = SSLSocketWithTimeout.new(@host, @port, connect_timeout: @connect_timeout, timeout: @socket_timeout, ssl_context: @ssl_context)
  else
    @socket = SocketWithTimeout.new(@host, @port, connect_timeout: @connect_timeout, timeout: @socket_timeout)
  end
  @encoder = Kafka::Protocol::Encoder.new(@socket)
  @decoder = Kafka::Protocol::Decoder.new(@socket)
  # Correlation id is initialized to zero and bumped for each request.
  @correlation_id = 0
  @last_request = nil
rescue Errno::ETIMEDOUT => e
  @logger.error "Timed out while trying to connect to #{self}: #{e}"
  raise ConnectionError, e
rescue SocketError, Errno::ECONNREFUSED, Errno::EHOSTUNREACH => e
  @logger.error "Failed to connect to #{self}: #{e}"
  raise ConnectionError, e
end