class Sidekiq::Web::CsrfProtection

Experimental RBS support (using type sampling data from the type_fusion project).

# sig/sidekiq/web/csrf_protection.rbs

class Sidekiq::Web::CsrfProtection
  def decode_token: (String token) -> untyped
end

def accept?(env)

def accept?(env)
  return true if safe?(env)
  giventoken = ::Rack::Request.new(env).params["authenticity_token"]
  valid_token?(env, giventoken)
end

def admit(env)

def admit(env)
  # On each successful request, we create a fresh masked token
  # which will be used in any forms rendered for this request.
  s = session(env)
  s[:csrf] ||= SecureRandom.base64(TOKEN_LENGTH)
  env[:csrf_token] = mask_token(s[:csrf])
  @app.call(env)
end

def call(env)

def call(env)
  accept?(env) ? admit(env) : deny(env)
end

def compare_with_real_token(token, local)

def compare_with_real_token(token, local)
  ::Rack::Utils.secure_compare(token.to_s, decode_token(local).to_s)
end

def decode_token(token)

Experimental RBS support (using type sampling data from the type_fusion project).

def decode_token: (String token) -> untyped

This signature was generated using 1 sample from 1 application.

def decode_token(token)
  Base64.urlsafe_decode64(token)
end

def deny(env)

def deny(env)
  logger(env).warn "attack prevented by #{self.class}"
  [403, {"Content-Type" => "text/plain"}, ["Forbidden"]]
end

def initialize(app, options = nil)

def initialize(app, options = nil)
  @app = app
end

def logger(env)

def logger(env)
  @logger ||= (env["rack.logger"] || ::Logger.new(env["rack.errors"]))
end

def mask_token(token)

like BREACH.
on each request. The masking is used to mitigate SSL attacks
Creates a masked version of the authenticity token that varies
def mask_token(token)
  token = decode_token(token)
  one_time_pad = SecureRandom.random_bytes(token.length)
  encrypted_token = xor_byte_strings(one_time_pad, token)
  masked_token = one_time_pad + encrypted_token
  Base64.urlsafe_encode64(masked_token)
end

def masked_token?(token)

def masked_token?(token)
  token.length == TOKEN_LENGTH * 2
end

def safe?(env)

def safe?(env)
  %w[GET HEAD OPTIONS TRACE].include? env["REQUEST_METHOD"]
end

def session(env)

def session(env)
  env["rack.session"] || fail(<<~EOM)
    Sidekiq::Web needs a valid Rack session for CSRF protection. If this is a Rails app,
    make sure you mount Sidekiq::Web *inside* your application routes:
    Rails.application.routes.draw do
      mount Sidekiq::Web => "/sidekiq"
      ....
    end
    If this is a Rails app in API mode, you need to enable sessions.
      https://guides.rubyonrails.org/api_app.html#using-session-middlewares
    If this is a bare Rack app, use a session middleware before Sidekiq::Web:
      # first, use IRB to create a shared secret key for sessions and commit it
      require 'securerandom'; File.open(".session.key", "w") {|f| f.write(SecureRandom.hex(32)) }
      # now use the secret with a session cookie middleware
      use Rack::Session::Cookie, secret: File.read(".session.key"), same_site: true, max_age: 86400
      run Sidekiq::Web
  EOM
end

def unmask_token(masked_token)

Essentially the inverse of +mask_token+.
def unmask_token(masked_token)
  # Split the token into the one-time pad and the encrypted
  # value and decrypt it
  token_length = masked_token.length / 2
  one_time_pad = masked_token[0...token_length]
  encrypted_token = masked_token[token_length..]
  xor_byte_strings(one_time_pad, encrypted_token)
end

def unmasked_token?(token)

def unmasked_token?(token)
  token.length == TOKEN_LENGTH
end

def valid_token?(env, giventoken)

the token stored in the session.
Checks that the token given to us as a parameter matches
def valid_token?(env, giventoken)
  return false if giventoken.nil? || giventoken.empty?
  begin
    token = decode_token(giventoken)
  rescue ArgumentError # client input is invalid
    return false
  end
  sess = session(env)
  localtoken = sess[:csrf]
  # Checks that Rack::Session::Cookie actualy contains the csrf toekn
  return false if localtoken.nil?
  # Rotate the session token after every use
  sess[:csrf] = SecureRandom.base64(TOKEN_LENGTH)
  # See if it's actually a masked token or not. We should be able
  # to handle any unmasked tokens that we've issued without error.
  if unmasked_token?(token)
    compare_with_real_token token, localtoken
  elsif masked_token?(token)
    unmasked = unmask_token(token)
    compare_with_real_token unmasked, localtoken
  else
    false # Token is malformed
  end
end

def xor_byte_strings(s1, s2)

def xor_byte_strings(s1, s2)
  s1.bytes.zip(s2.bytes).map { |(c1, c2)| c1 ^ c2 }.pack("c*")
end