class ActiveSupport::EncryptedFile
def self.expected_key_length # :nodoc:
def self.expected_key_length # :nodoc: @expected_key_length ||= generate_key.length end
def self.generate_key
def self.generate_key SecureRandom.hex(ActiveSupport::MessageEncryptor.key_len(CIPHER)) end
def change(&block)
def change(&block) writing read, &block end
def check_key_length
def check_key_length raise InvalidKeyLengthError if key&.length != self.class.expected_key_length end
def decrypt(contents)
def decrypt(contents) encryptor.decrypt_and_verify contents end
def encrypt(contents)
def encrypt(contents) check_key_length encryptor.encrypt_and_sign contents end
def encryptor
def encryptor @encryptor ||= ActiveSupport::MessageEncryptor.new([ key ].pack("H*"), cipher: CIPHER) end
def handle_missing_key
def handle_missing_key raise MissingKeyError.new(key_path: key_path, env_key: env_key) if raise_if_missing_key end
def initialize(content_path:, key_path:, env_key:, raise_if_missing_key:)
def initialize(content_path:, key_path:, env_key:, raise_if_missing_key:) @content_path = Pathname.new(content_path).yield_self { |path| path.symlink? ? path.realpath : path } @key_path = Pathname.new(key_path) @env_key, @raise_if_missing_key = env_key, raise_if_missing_key end
def key
If +raise_if_missing_key+ is true, raises MissingKeyError if the
specified by +env_key+, then trying the key file specified by +key_path+.
Returns the encryption key, first trying the environment variable
def key read_env_key || read_key_file || handle_missing_key end
def read
- ActiveSupport::MessageEncryptor::InvalidMessage if the content cannot be
if the key is missing.
- MissingContentError if the encrypted file does not exist or otherwise
- MissingKeyError if the key is missing and +raise_if_missing_key+ is true.
Raises:
Reads the file and returns the decrypted content.
def read if !key.nil? && content_path.exist? decrypt content_path.binread else raise MissingContentError, content_path end end
def read_env_key
def read_env_key ENV[env_key].presence end
def read_key_file
def read_key_file return @key_file_contents if defined?(@key_file_contents) @key_file_contents = (key_path.binread.strip if key_path.exist?) end
def write(contents)
def write(contents) IO.binwrite "#{content_path}.tmp", encrypt(contents) FileUtils.mv "#{content_path}.tmp", content_path end
def writing(contents)
def writing(contents) tmp_file = "#{Process.pid}.#{content_path.basename.to_s.chomp('.enc')}" tmp_path = Pathname.new File.join(Dir.tmpdir, tmp_file) tmp_path.binwrite contents yield tmp_path updated_contents = tmp_path.binread write(updated_contents) if updated_contents != contents ensure FileUtils.rm(tmp_path) if tmp_path&.exist? end