class SimpleXChat::ClientAgent
def connect
def connect @logger.debug("Connecting to: '#{@uri}'...") @socket = TCPSocket.new @uri.host, @uri.port @handshake = WebSocket::Handshake::Client.new(url: @uri.to_s) # Do websocket handshake @logger.debug("Doing handshake with: '#{@uri}'...") @socket.write @handshake.to_s resp = HTTPResponse.read_new(Net::BufferedIO.new(@socket)) @listener_thread = Thread.new do frame = WebSocket::Frame::Incoming::Client.new(version: @handshake.version) loop do begin buf = @socket.read_nonblock 4096 frame << buf obj = frame.next next if obj == nil @logger.debug("New message (raw): #{obj}") msg = JSON.parse obj.to_s # @logger.debug("New message: #{msg}") # @logger.debug("Command waiters: #{@command_waiters}") corr_id = msg["corrId"] resp = msg["resp"] single_use_queue = @command_waiters[corr_id] if corr_id != nil && single_use_queue != nil single_use_queue = @command_waiters[corr_id] single_use_queue.push(resp) @logger.debug("Message sent to waiter with corrId '#{corr_id}'") else @message_queue.push resp @logger.debug("Message put on message queue (number of messages in queue: #{@message_queue.size})") end rescue IO::WaitReadable IO.select([@socket]) retry rescue IO::WaitWritable IO.select(nil, [@socket]) retry rescue => e # TODO: Verify if this way of stopping the execution # is graceful enough after implementing reconnects @logger.error "Unhandled exception caught: #{e}" @message_queue.close raise e end end end @logger.info("Successfully connected ClientAgent to: #{@uri}") end