class Selenium::WebDriver::WebSocketConnection
def attach_socket_listener
def attach_socket_listener Thread.new do Thread.current.abort_on_exception = true Thread.current.report_on_exception = false until socket.eof? incoming_frame << socket.readpartial(1024) while (frame = incoming_frame.next) message = process_frame(frame) next unless message['method'] params = message['params'] callbacks[message['method']].each do |callback| @callback_threads.add(callback_thread(params, &callback)) end end end rescue *CONNECTION_ERRORS Thread.stop end end
def callback_thread(params)
def callback_thread(params) Thread.new do Thread.current.abort_on_exception = true # We might end up blocked forever when we have an error in event. # For example, if network interception event raises error, # the browser will keep waiting for the request to be proceeded # before returning back to the original thread. In this case, # we should at least print the error. Thread.current.report_on_exception = true yield params rescue *CONNECTION_ERRORS Thread.stop end end
def callbacks
def callbacks @callbacks ||= Hash.new { |callbacks, event| callbacks[event] = [] } end
def close
def close @callback_threads.list.each(&:exit) @socket_thread.exit socket.close end
def incoming_frame
def incoming_frame @incoming_frame ||= WebSocket::Frame::Incoming::Client.new(version: ws.version) end
def initialize(url:)
def initialize(url:) @callback_threads = ThreadGroup.new @session_id = nil @url = url process_handshake @socket_thread = attach_socket_listener end
def messages
because its keys are WebSocket message identifiers and they should be
We should be thread-safe to use the hash without synchronization
def messages @messages ||= {} end
def next_id
def next_id @id ||= 0 @id += 1 end
def process_frame(frame)
def process_frame(frame) message = frame.to_s # Firefox will periodically fail on unparsable empty frame return {} if message.empty? message = JSON.parse(message) messages[message['id']] = message WebDriver.logger.debug "WebSocket <- #{message}"[...MAX_LOG_MESSAGE_SIZE], id: :bidi message end
def process_handshake
def process_handshake socket.print(ws.to_s) ws << socket.readpartial(1024) end
def send_cmd(**payload)
def send_cmd(**payload) id = next_id data = payload.merge(id: id) WebDriver.logger.debug "WebSocket -> #{data}"[...MAX_LOG_MESSAGE_SIZE], id: :bidi data = JSON.generate(data) out_frame = WebSocket::Frame::Outgoing::Client.new(version: ws.version, data: data, type: 'text') socket.write(out_frame.to_s) wait.until { messages.delete(id) } end
def socket
def socket @socket ||= if URI(@url).scheme == 'wss' socket = TCPSocket.new(ws.host, ws.port) socket = OpenSSL::SSL::SSLSocket.new(socket, OpenSSL::SSL::SSLContext.new) socket.sync_close = true socket.connect socket else TCPSocket.new(ws.host, ws.port) end end
def wait
def wait @wait ||= Wait.new(timeout: RESPONSE_WAIT_TIMEOUT, interval: RESPONSE_WAIT_INTERVAL) end
def ws
def ws @ws ||= WebSocket::Handshake::Client.new(url: @url) end