class Selenium::WebDriver::DevTools::NetworkInterceptor
def cancelled?(network_id)
def cancelled?(network_id) lock.synchronize { !!cancelled_requests.delete(network_id) } end
def cancelled_requests
def cancelled_requests @cancelled_requests ||= [] end
def continue_request(id)
def continue_request(id) devtools.fetch.continue_request(request_id: id) end
def fetch_response_body(id)
def fetch_response_body(id) devtools.fetch.get_response_body(request_id: id).dig('result', 'body') rescue Error::WebDriverError => e raise unless e.message.start_with?(CANNOT_GET_BODY_ON_REDIRECT_ERROR_CODE) end
def initialize(devtools)
def initialize(devtools) @devtools = devtools @lock = Mutex.new end
def intercept(&block)
def intercept(&block) devtools.network.on(:loading_failed) { |params| track_cancelled_request(params) } devtools.fetch.on(:request_paused) { |params| request_paused(params, &block) } devtools.network.set_cache_disabled(cache_disabled: true) devtools.network.enable devtools.fetch.enable(patterns: [{requestStage: 'Request'}, {requestStage: 'Response'}]) end
def intercept_request(id, params, &block)
def intercept_request(id, params, &block) original = DevTools::Request.from(id, params) mutable = DevTools::Request.from(id, params) block.call(mutable) do |&continue| pending_response_requests[id] = continue if original == mutable continue_request(original.id) else mutate_request(mutable) end end end
def intercept_response(id, params)
def intercept_response(id, params) return continue_response(id) unless block_given? body = fetch_response_body(id) original = DevTools::Response.from(id, body, params) mutable = DevTools::Response.from(id, body, params) yield mutable if original == mutable continue_response(id) else mutate_response(mutable) end end
def mutate_request(request)
def mutate_request(request) devtools.fetch.continue_request( request_id: request.id, url: request.url, method: request.method, post_data: (Base64.strict_encode64(request.post_data) if request.post_data), headers: request.headers.map do |k, v| {name: k, value: v} end ) end
def mutate_response(response)
def mutate_response(response) devtools.fetch.fulfill_request( request_id: response.id, body: (Base64.strict_encode64(response.body) if response.body), response_code: response.code, response_headers: response.headers.map do |k, v| {name: k, value: v} end ) end
def pending_response_requests
because its keys are interception job identifiers and they should be
We should be thread-safe to use the hash without synchronization
def pending_response_requests @pending_response_requests ||= {} end
def request_paused(data, &block)
def request_paused(data, &block) id = data['requestId'] network_id = data['networkId'] with_cancellable_request(network_id) do if response?(data) block = pending_response_requests.delete(id) intercept_response(id, data, &block) else intercept_request(id, data, &block) end end end
def response?(params)
- See: https://chromedevtools.github.io/devtools-protocol/tot/Fetch/#event-requestPaused -
def response?(params) params.key?('responseStatusCode') || params.key?('responseErrorReason') end
def track_cancelled_request(data)
def track_cancelled_request(data) return unless data['canceled'] lock.synchronize { cancelled_requests << data['requestId'] } end
def with_cancellable_request(network_id)
def with_cancellable_request(network_id) yield rescue Error::WebDriverError => e raise if e.message.start_with?(INVALID_INTERCEPTION_ID_ERROR_CODE) && !cancelled?(network_id) end