module Clacky::Agent::TimeMachine
def active_messages(force_reasoning_content_pad: false)
-
force_reasoning_content_pad(Boolean) -- forwarded to MessageHistory,
def active_messages(force_reasoning_content_pad: false) if @active_task_id == @current_task_id return @history.to_api(force_reasoning_content_pad: force_reasoning_content_pad) end stripped = @history.for_task(@active_task_id).map do |msg| msg.reject { |k, _| MessageHistory::INTERNAL_FIELDS.include?(k) } end # Apply the same reasoning_content padding rule used by to_api so # Time Machine replays satisfy thinking-mode providers after a # 400 retry. MessageHistory.pad_reasoning_content_if_needed(stripped, force: force_reasoning_content_pad) end
def extract_message_text(content)
def extract_message_text(content) tent.is_a?(String) ent content.is_a?(Array) _parts = content.select { |part| part[:type] == "text" } _parts.map { |part| part[:text] }.join(" ")
def get_child_tasks(task_id)
def get_child_tasks(task_id) @task_parents.select { |_, parent| parent == task_id }.keys end
def get_task_history(limit: 10)
-
(Array- Task history with metadata)
Parameters:
-
limit(Integer) -- Maximum number of recent tasks to return
def get_task_history(limit: 10) return [] if @current_task_id == 0 tasks = [] (1..@current_task_id).to_a.reverse.take(limit).reverse.each do |task_id| # Find first user message for this task first_user_msg = @history.to_a.find do |msg| msg[:task_id] == task_id && msg[:role] == "user" end summary = if first_user_msg content = extract_message_text(first_user_msg[:content]) # Truncate to 60 characters (including "...") content.length > 60 ? "#{content[0...57]}..." : content else "Task #{task_id}" end # Determine task status status = if task_id == @active_task_id :current elsif task_id < @active_task_id :past else :future end # Check if task has branches (multiple children) children = get_child_tasks(task_id) has_branches = children.length > 1 tasks << { task_id: task_id, summary: summary, status: status, has_branches: has_branches } end tasks end
def init_time_machine
def init_time_machine parents ||= {} # { task_id => parent_id } nt_task_id ||= 0 # Latest created task ID e_task_id ||= 0 # Current active task ID (for undo/redo)
def restore_to_task_state(task_id)
-
task_id(Integer) -- Target task ID
def restore_to_task_state(task_id) # Collect all modified files from task 1 to target task files_to_restore = {} (1..task_id).each do |tid| snapshot_dir = File.join( Dir.home, ".clacky", "snapshots", @session_id, "task-#{tid}" ) next unless Dir.exist?(snapshot_dir) Dir.glob(File.join(snapshot_dir, "**", "*")).each do |snapshot_file| next if File.directory?(snapshot_file) relative_path = snapshot_file.sub(snapshot_dir + "/", "") files_to_restore[relative_path] = snapshot_file end end # Restore files files_to_restore.each do |relative_path, snapshot_file| target_file = File.join(@working_dir, relative_path) FileUtils.mkdir_p(File.dirname(target_file)) FileUtils.cp(snapshot_file, target_file) end rescue StandardError => e # Silently handle errors in tests raise end
def save_modified_files_snapshot(modified_files)
-
modified_files(Array) -- List of file paths that were modified
def save_modified_files_snapshot(modified_files) return if modified_files.nil? || modified_files.empty? snapshot_dir = File.join( Dir.home, ".clacky", "snapshots", @session_id, "task-#{@current_task_id}" ) FileUtils.mkdir_p(snapshot_dir) modified_files.each do |file_path| next unless File.exist?(file_path) # Save file content to snapshot relative_path = file_path.start_with?(@working_dir) ? file_path.sub(@working_dir + "/", "") : File.basename(file_path) snapshot_file = File.join(snapshot_dir, relative_path) FileUtils.mkdir_p(File.dirname(snapshot_file)) FileUtils.cp(file_path, snapshot_file) end rescue StandardError => e # Silently handle errors in tests end
def start_new_task
Start a new task and establish parent relationship
def start_new_task parent_id = @active_task_id @current_task_id += 1 @active_task_id = @current_task_id @task_parents[@current_task_id] = parent_id @current_task_id end
def switch_to_task(target_task_id)
def switch_to_task(target_task_id) if target_task_id > @current_task_id || target_task_id < 1 return { success: false, message: "Invalid task ID: #{target_task_id}" } end restore_to_task_state(target_task_id) @active_task_id = target_task_id { success: true, message: "⏩ Switched to task #{target_task_id}", task_id: target_task_id } end
def undo_last_task
def undo_last_task parent_id = @task_parents[@active_task_id] return { success: false, message: "Already at root task" } if parent_id.nil? || parent_id == 0 restore_to_task_state(parent_id) @active_task_id = parent_id { success: true, message: "⏪ Undone to task #{parent_id}", task_id: parent_id } end