class JSON::JWE
def as_json(options = {})
def as_json(options = {}) case options[:syntax] when :general { protected: Base64.urlsafe_encode64(header.to_json, padding: false), recipients: [{ encrypted_key: Base64.urlsafe_encode64(jwe_encrypted_key, padding: false) }], iv: Base64.urlsafe_encode64(iv, padding: false), ciphertext: Base64.urlsafe_encode64(cipher_text, padding: false), tag: Base64.urlsafe_encode64(authentication_tag, padding: false) } else { protected: Base64.urlsafe_encode64(header.to_json, padding: false), encrypted_key: Base64.urlsafe_encode64(jwe_encrypted_key, padding: false), iv: Base64.urlsafe_encode64(iv, padding: false), ciphertext: Base64.urlsafe_encode64(cipher_text, padding: false), tag: Base64.urlsafe_encode64(authentication_tag, padding: false) } end end
def authentication_tag
def authentication_tag @authentication_tag ||= case when gcm? cipher.auth_tag when cbc? secured_input = [ auth_data, iv, cipher_text, BinData::Uint64be.new(auth_data.length * 8).to_binary_s ].join OpenSSL::HMAC.digest( sha_digest, mac_key, secured_input )[0, sha_size / 2 / 8] end end
def cbc?
def cbc? [:'A128CBC-HS256', :'A256CBC-HS512'].include? encryption_method&.to_sym end
def cipher
def cipher raise "#{cipher_name} isn't supported" unless OpenSSL::Cipher.ciphers.include?(cipher_name) @cipher ||= OpenSSL::Cipher.new cipher_name end
def cipher_name
def cipher_name case encryption_method&.to_sym when :A128GCM 'aes-128-gcm' when :A256GCM 'aes-256-gcm' when :'A128CBC-HS256' 'aes-128-cbc' when :'A256CBC-HS512' 'aes-256-cbc' else raise UnexpectedAlgorithm.new('Unknown Encryption Algorithm') end end
def decode_compact_serialized(input, private_key_or_secret, algorithms = nil, encryption_methods = nil, _allow_blank_payload = false)
def decode_compact_serialized(input, private_key_or_secret, algorithms = nil, encryption_methods = nil, _allow_blank_payload = false) unless input.count('.') + 1 == NUM_OF_SEGMENTS raise InvalidFormat.new("Invalid JWE Format. JWE should include #{NUM_OF_SEGMENTS} segments.") end jwe = new _header_json_, jwe.jwe_encrypted_key, jwe.iv, jwe.cipher_text, jwe.authentication_tag = input.split('.', NUM_OF_SEGMENTS).collect do |segment| begin Base64.urlsafe_decode64 segment rescue ArgumentError raise DecryptionFailed end end jwe.auth_data = input.split('.').first jwe.header = JSON.parse(_header_json_).with_indifferent_access unless private_key_or_secret == :skip_decryption jwe.decrypt! private_key_or_secret, algorithms, encryption_methods end jwe end
def decode_json_serialized(input, private_key_or_secret, algorithms = nil, encryption_methods = nil, _allow_blank_payload = false)
def decode_json_serialized(input, private_key_or_secret, algorithms = nil, encryption_methods = nil, _allow_blank_payload = false) input = input.with_indifferent_access jwe_encrypted_key = if input[:recipients].present? input[:recipients].first[:encrypted_key] else input[:encrypted_key] end compact_serialized = [ input[:protected], jwe_encrypted_key, input[:iv], input[:ciphertext], input[:tag] ].join('.') decode_compact_serialized compact_serialized, private_key_or_secret, algorithms, encryption_methods end
def decrypt!(private_key_or_secret, algorithms = nil, encryption_methods = nil)
def decrypt!(private_key_or_secret, algorithms = nil, encryption_methods = nil) raise UnexpectedAlgorithm.new('Unexpected alg header') unless algorithms.blank? || Array(algorithms).include?(alg) raise UnexpectedAlgorithm.new('Unexpected enc header') unless encryption_methods.blank? || Array(encryption_methods).include?(enc) self.private_key_or_secret = with_jwk_support private_key_or_secret self.content_encryption_key = decrypt_content_encryption_key self.mac_key, self.encryption_key = derive_encryption_and_mac_keys verify_cbc_authentication_tag! if cbc? cipher.decrypt cipher.key = encryption_key cipher.iv = iv # NOTE: 'iv' has to be set after 'key' for GCM if gcm? # https://github.com/ruby/openssl/issues/63 raise DecryptionFailed.new('Invalid authentication tag') if authentication_tag.length < 16 cipher.auth_tag = authentication_tag cipher.auth_data = auth_data end begin self.plain_text = cipher.update(cipher_text) + cipher.final rescue OpenSSL::OpenSSLError # Ensure that the same error is raised for invalid PKCS7 padding # as for invalid signatures. This prevents padding-oracle attacks. raise DecryptionFailed end self end
def decrypt_content_encryption_key
def decrypt_content_encryption_key fake_content_encryption_key = generate_content_encryption_key # NOTE: do this always not to make timing difference case alg&.to_sym when :RSA1_5 private_key_or_secret.private_decrypt jwe_encrypted_key when :'RSA-OAEP' private_key_or_secret.private_decrypt jwe_encrypted_key, OpenSSL::PKey::RSA::PKCS1_OAEP_PADDING when :A128KW, :A256KW AESKeyWrap.unwrap jwe_encrypted_key, private_key_or_secret when :dir private_key_or_secret when :'ECDH-ES' raise NotImplementedError.new('ECDH-ES not supported yet') when :'ECDH-ES+A128KW' raise NotImplementedError.new('ECDH-ES+A128KW not supported yet') when :'ECDH-ES+A256KW' raise NotImplementedError.new('ECDH-ES+A256KW not supported yet') else raise UnexpectedAlgorithm.new('Unknown Encryption Algorithm') end rescue OpenSSL::PKey::PKeyError fake_content_encryption_key end
def derive_encryption_and_mac_keys
def derive_encryption_and_mac_keys case when gcm? [:wont_be_used, content_encryption_key] when cbc? content_encryption_key.unpack( "a#{content_encryption_key.length / 2}" * 2 ) end end
def dir?
def dir? :dir == alg&.to_sym end
def encrypt!(public_key_or_secret)
def encrypt!(public_key_or_secret) self.public_key_or_secret = with_jwk_support public_key_or_secret cipher.encrypt self.content_encryption_key = generate_content_encryption_key self.mac_key, self.encryption_key = derive_encryption_and_mac_keys cipher.key = encryption_key self.iv = cipher.random_iv # NOTE: 'iv' has to be set after 'key' for GCM self.auth_data = Base64.urlsafe_encode64 header.to_json, padding: false cipher.auth_data = auth_data if gcm? self.cipher_text = cipher.update(plain_text) + cipher.final self end
def gcm?
def gcm? [:A128GCM, :A256GCM].include? encryption_method&.to_sym end
def generate_content_encryption_key
def generate_content_encryption_key case when dir? public_key_or_secret when gcm? cipher.random_key when cbc? SecureRandom.random_bytes sha_size / 8 end end
def initialize(input = nil)
def initialize(input = nil) self.plain_text = input.to_s end
def jwe_encrypted_key
def jwe_encrypted_key @jwe_encrypted_key ||= case alg&.to_sym when :RSA1_5 public_key_or_secret.public_encrypt content_encryption_key when :'RSA-OAEP' public_key_or_secret.public_encrypt content_encryption_key, OpenSSL::PKey::RSA::PKCS1_OAEP_PADDING when :A128KW, :A256KW AESKeyWrap.wrap content_encryption_key, public_key_or_secret when :dir '' when :'ECDH-ES' raise NotImplementedError.new('ECDH-ES not supported yet') when :'ECDH-ES+A128KW' raise NotImplementedError.new('ECDH-ES+A128KW not supported yet') when :'ECDH-ES+A256KW' raise NotImplementedError.new('ECDH-ES+A256KW not supported yet') else raise UnexpectedAlgorithm.new('Unknown Encryption Algorithm') end end
def sha_digest
def sha_digest OpenSSL::Digest.new "SHA#{sha_size}" end
def sha_size
def sha_size case encryption_method&.to_sym when :'A128CBC-HS256' 256 when :'A256CBC-HS512' 512 else raise UnexpectedAlgorithm.new('Unknown Hash Size') end end
def to_s
def to_s [ header.to_json, jwe_encrypted_key, iv, cipher_text, authentication_tag ].collect do |segment| Base64.urlsafe_encode64 segment.to_s, padding: false end.join('.') end
def verify_cbc_authentication_tag!
def verify_cbc_authentication_tag! secured_input = [ auth_data, iv, cipher_text, BinData::Uint64be.new(auth_data.length * 8).to_binary_s ].join expected_authentication_tag = OpenSSL::HMAC.digest( sha_digest, mac_key, secured_input )[0, sha_size / 2 / 8] unless secure_compare(authentication_tag, expected_authentication_tag) raise DecryptionFailed end end