class Clacky::Tools::Security::Replacer
Extracted almost verbatim from the old SafeShell::CommandSafetyReplacer.
Internal class that owns per-project state (trash dir, log dir, …).
def allow_dev_null_redirect(command)
def allow_dev_null_redirect(command) command end
def block_sudo_command(_command)
def block_sudo_command(_command) raise SecurityError, "sudo commands are not allowed for security reasons" end
def initialize(project_root)
def initialize(project_root) @project_root = File.expand_path(project_root) trash_directory = Clacky::TrashDirectory.new(@project_root) @backup_dir = trash_directory.backup_dir @project_hash = trash_directory.generate_project_hash(@project_root) @safety_log_dir = File.join(Dir.home, ".clacky", "safety_logs", @project_hash) FileUtils.mkdir_p(@safety_log_dir) unless Dir.exist?(@safety_log_dir) @safety_log_file = File.join(@safety_log_dir, "safety.log") end
def log_replacement(original, replacement, reason)
def log_replacement(original, replacement, reason) write_log( action: 'command_replacement', original_command: original, safe_replacement: replacement, reason: reason ) end
def log_warning(message)
def log_warning(message) write_log(action: 'warning', message: message) end
def make_command_safe(command)
def make_command_safe(command) command = command.strip # Use a UTF-8-scrubbed copy ONLY for regex checks. The original # bytes are returned unchanged so the shell receives exact paths # (e.g. GBK-encoded Chinese filenames in zip archives). @safe_check_command = Clacky::Utils::Encoding.safe_check(command) case @safe_check_command # Block attempts to terminate the clacky server process. # IMPORTANT: each verb is anchored with \b so substrings like # "Skill" (contains "kill") or "Bill Killalina" don't trigger # false positives. We also require `clacky` to appear as a whole # word AND within a reasonable distance (same logical command, # not hundreds of chars later in an unrelated echo string). when /\bpkill\b[^\n;|&]{0,80}\bclacky\b|\bkillall\b[^\n;|&]{0,80}\bclacky\b|\bkill\s+(?:-\S+\s+)*[^\n;|&]{0,40}\bclacky\b/i raise SecurityError, "Killing the clacky server process is not allowed. To restart, use: #{restart_hint}" when /\bclacky\s+server\b/ raise SecurityError, "Managing the clacky server from within a session is not allowed. To restart, use: #{restart_hint}" when /^chmod\s+x/ replace_chmod_command(command) when /^curl.*\|\s*(sh|bash)/ replace_curl_pipe_command(command) when /^sudo\s+/ block_sudo_command(command) when />\s*\/dev\/null\s*$/ allow_dev_null_redirect(command) when /^(mv|cp|mkdir|touch|echo)\s+/ validate_and_allow(command) else validate_general_command(@safe_check_command) command end end
def replace_chmod_command(command)
def replace_chmod_command(command) begin parts = Shellwords.split(command) rescue ArgumentError parts = command.split(/\s+/) end files = parts[2..-1] || [] files.each { |file| validate_file_path(file) unless file.start_with?('-') } log_replacement("chmod", command, "chmod +x is allowed - file permissions will be modified") command end
def replace_curl_pipe_command(command)
def replace_curl_pipe_command(command) if command.match(/curl\s+(.*?)\s*\|\s*(sh|bash)/) url = $1 shell_type = $2 timestamp = Time.now.strftime("%Y%m%d_%H%M%S") safe_file = File.join(@backup_dir, "downloaded_script_#{timestamp}.sh") result = "curl #{url} -o #{Shellwords.escape(safe_file)} && echo '🔒 Script downloaded to #{safe_file} for manual review. Run: cat #{safe_file}'" log_replacement("curl | #{shell_type}", result, "Script saved for manual review instead of automatic execution") result else command end end
def restart_hint
immediately actionable. When the variable isn't set (e.g. one-shot
convention) AND append the resolved PID in parentheses so it's
variable name in the hint (so the AI / user learns the standard
injected by ServerMaster (see server_master.rb). We keep the
When running inside a clacky server worker, `CLACKY_MASTER_PID` is
Build a copy-pasteable "how to restart clacky server" hint.
def restart_hint pid = ENV["CLACKY_MASTER_PID"].to_s if pid =~ /\A\d+\z/ "kill -USR1 $CLACKY_MASTER_PID (current master PID: #{pid})" else "kill -USR1 $CLACKY_MASTER_PID" end end
def validate_and_allow(command)
true credential directories (.ssh, .aws) and .env files. Everything
~/.clacky/skills/... into the project. We now only block writes to
which broke legitimate workflows like copying skill templates from
Historical behavior was to forbid any path outside @project_root,
Relaxed validator for mv / cp / mkdir / touch / echo.
def validate_and_allow(command) begin parts = Shellwords.split(command) rescue ArgumentError parts = command.split(/\s+/) end cmd = parts.first args = parts[1..-1] || [] case cmd when 'mv', 'cp' # For mv/cp only the DESTINATION (last non-flag arg) is a write # target; earlier args are sources and are read-only to the FS. write_targets = args.reject { |a| a.start_with?('-') } dest = write_targets.last validate_secret_write(dest) if dest when 'mkdir', 'touch' args.each { |path| validate_secret_write(path) unless path.start_with?('-') } when 'echo' # `echo foo > path` — best-effort: block only if redirecting to a # secret path. The redirect target will also be caught by # validate_general_command for /etc /usr /bin; here we add .env, # .ssh/, .aws/. if command =~ />\s*([^\s|&;]+)/ validate_secret_write(Regexp.last_match(1)) end end command end
def validate_file_path(path)
Alias retained for readability — chmod handler validates that
def validate_file_path(path) validate_secret_write(path) end
def validate_general_command(command)
def validate_general_command(command) cmd_without_quotes = command.gsub(/'[^']*'|"[^"]*"/, '') dangerous_patterns = [ /eval\s*\(/, /exec\s*\(/, /system\s*\(/, /`[^`]+`/, /\$\([^)]+\)/, /\|\s*sh\s*$/, /\|\s*bash\s*$/, />\s*\/etc\//, />\s*\/usr\//, />\s*\/bin\// ] dangerous_patterns.each do |pattern| if cmd_without_quotes.match?(pattern) raise SecurityError, "Dangerous command pattern detected: #{pattern.source}" end end command end
def validate_secret_write(path)
def validate_secret_write(path) return if path.nil? || path.empty? || path.start_with?('-') expanded_path = File.expand_path(path) SECRET_WRITE_PATTERNS.each do |pattern| if expanded_path.match?(pattern) raise SecurityError, "Write to credential/secret path blocked: #{path} " \ "(matched protected pattern). If intentional, edit the " \ "file manually outside the agent." end end end
def write_log(**fields)
def write_log(**fields) log_entry = { timestamp: Time.now.iso8601 }.merge(fields) File.open(@safety_log_file, 'a') { |f| f.puts JSON.generate(log_entry) } rescue StandardError # Logging must never break main functionality. end