class Kafka::Connection
def open
def open @logger.debug "Opening connection to #{@host}:#{@port} with client id #{@client_id}..." if @ssl_context @socket = SSLSocketWithTimeout.new(@host, @port, connect_timeout: @connect_timeout, timeout: @socket_timeout, ssl_context: @ssl_context) else @socket = SocketWithTimeout.new(@host, @port, connect_timeout: @connect_timeout, timeout: @socket_timeout) end @encoder = Kafka::Protocol::Encoder.new(@socket) @decoder = Kafka::Protocol::Decoder.new(@socket) # Correlation id is initialized to zero and bumped for each request. @correlation_id = 0 @last_request = nil rescue Errno::ETIMEDOUT => e @logger.error "Timed out while trying to connect to #{self}: #{e}" raise ConnectionError, e rescue SocketError, Errno::ECONNREFUSED, Errno::EHOSTUNREACH => e @logger.error "Failed to connect to #{self}: #{e}" raise ConnectionError, e end