class DSPy::ReAct
ReAct Agent using Sorbet signatures
def create_enhanced_output_struct(signature_class)
def create_enhanced_output_struct(signature_class) input_props = signature_class.input_struct_class.props output_props = signature_class.output_struct_class.props build_enhanced_struct( { input: input_props, output: output_props }, { history: [T::Array[T::Hash[Symbol, T.untyped]], "ReAct execution history"], iterations: [Integer, "Number of iterations executed"], tools_used: [T::Array[String], "List of tools used during execution"] } ) end
def create_enhanced_result(input_kwargs, reasoning_result)
def create_enhanced_result(input_kwargs, reasoning_result) output_field_name = @original_signature_class.output_struct_class.props.keys.first output_data = input_kwargs.merge({ history: reasoning_result[:history].map(&:to_h), iterations: reasoning_result[:iterations], tools_used: reasoning_result[:tools_used] }) output_data[output_field_name] = reasoning_result[:final_answer] @enhanced_output_struct.new(**output_data) end
def create_history_entry(step, thought, action, action_input, observation)
def create_history_entry(step, thought, action, action_input, observation) HistoryEntry.new( step: step, thought: thought, action: action, action_input: action_input, observation: observation ) end
def default_no_answer_message
def default_no_answer_message "No answer reached within #{@max_iterations} iterations" end
def emit_iteration_complete_event(iteration, thought, action, action_input, observation, tools_used)
def emit_iteration_complete_event(iteration, thought, action, action_input, observation, tools_used) Instrumentation.emit('dspy.react.iteration_complete', { iteration: iteration, thought: thought, action: action, action_input: action_input, observation: observation, tools_used: tools_used.uniq }) end
def execute_action(action, action_input)
def execute_action(action, action_input) tool_name = action.downcase tool = @tools[tool_name] return "Tool '#{action}' not found. Available tools: #{@tools.keys.join(', ')}" unless tool begin result = if action_input.nil? || (action_input.is_a?(String) && action_input.strip.empty?) # No input provided tool.dynamic_call({}) else # Pass the action_input directly to dynamic_call, which can handle # either a Hash or a JSON string tool.dynamic_call(action_input) end result.to_s rescue => e "Error executing tool '#{action}': #{e.message}" end end
def execute_react_reasoning_loop(question)
def execute_react_reasoning_loop(question) history = T.let([], T::Array[HistoryEntry]) available_tools_desc = @tools.map { |name, tool| JSON.parse(tool.schema) } final_answer = T.let(nil, T.nilable(String)) iterations_count = 0 last_observation = T.let(nil, T.nilable(String)) tools_used = [] while should_continue_iteration?(iterations_count, final_answer) iterations_count += 1 iteration_result = execute_single_iteration( question, history, available_tools_desc, iterations_count, tools_used, last_observation ) if iteration_result[:should_finish] final_answer = iteration_result[:final_answer] break end history = iteration_result[:history] tools_used = iteration_result[:tools_used] last_observation = iteration_result[:last_observation] end handle_max_iterations_if_needed(iterations_count, final_answer, tools_used, history) { history: history, iterations: iterations_count, tools_used: tools_used.uniq, final_answer: final_answer || default_no_answer_message } end
def execute_single_iteration(question, history, available_tools_desc, iteration, tools_used, last_observation)
def execute_single_iteration(question, history, available_tools_desc, iteration, tools_used, last_observation) # Instrument each iteration Instrumentation.instrument('dspy.react.iteration', { iteration: iteration, max_iterations: @max_iterations, history_length: history.length, tools_used_so_far: tools_used.uniq }) do # Generate thought and action thought_obj = @thought_generator.forward( question: question, history: history, available_tools: available_tools_desc ) # Process thought result if finish_action?(thought_obj.action) final_answer = handle_finish_action( thought_obj.action_input, last_observation, iteration, thought_obj.thought, thought_obj.action, history ) return { should_finish: true, final_answer: final_answer } end # Execute tool action observation = execute_tool_with_instrumentation( thought_obj.action, thought_obj.action_input, iteration ) # Track tools used tools_used << thought_obj.action.downcase if valid_tool?(thought_obj.action) # Add to history history << create_history_entry( iteration, thought_obj.thought, thought_obj.action, thought_obj.action_input, observation ) # Process observation and decide next step observation_decision = process_observation_and_decide_next_step( question, history, observation, available_tools_desc, iteration ) if observation_decision[:should_finish] return { should_finish: true, final_answer: observation_decision[:final_answer] } end emit_iteration_complete_event( iteration, thought_obj.thought, thought_obj.action, thought_obj.action_input, observation, tools_used ) { should_finish: false, history: history, tools_used: tools_used, last_observation: observation } end end
def execute_tool_with_instrumentation(action, action_input, iteration)
def execute_tool_with_instrumentation(action, action_input, iteration) if action && @tools[action.downcase] Instrumentation.instrument('dspy.react.tool_call', { iteration: iteration, tool_name: action.downcase, tool_input: action_input }) do execute_action(action, action_input) end else "Unknown action: #{action}. Available actions: #{@tools.keys.join(', ')}, finish" end end
def finish_action?(action)
def finish_action?(action) action&.downcase == FINISH_ACTION end
def forward(**kwargs)
def forward(**kwargs) lm = config.lm || DSPy.config.lm available_tools = @tools.keys # Instrument the entire ReAct agent lifecycle result = instrument_prediction('dspy.react', @original_signature_class, kwargs, { max_iterations: @max_iterations, available_tools: available_tools }) do # Validate input and extract question input_struct = @original_signature_class.input_struct_class.new(**kwargs) question = T.cast(input_struct.serialize.values.first, String) # Execute ReAct reasoning loop reasoning_result = execute_react_reasoning_loop(question) # Create enhanced output with all ReAct data create_enhanced_result(kwargs, reasoning_result) end result end
def generate_example_output
def generate_example_output example = super example[:history] = [ { step: 1, thought: "I need to think about this question...", action: "some_tool", action_input: "input for tool", observation: "result from tool" } ] example[:iterations] = 1 example[:tools_used] = ["some_tool"] example end
def generate_forced_final_answer(question, history, available_tools_desc, observation_result, iteration)
def generate_forced_final_answer(question, history, available_tools_desc, observation_result, iteration) final_thought = @thought_generator.forward( question: question, history: history, available_tools: available_tools_desc ) if final_thought.action&.downcase != FINISH_ACTION forced_answer = if observation_result.interpretation && !observation_result.interpretation.empty? observation_result.interpretation else history.last&.observation || "No answer available" end handle_finish_action(forced_answer, history.last&.observation, iteration + 1, final_thought.thought, FINISH_ACTION, history) else handle_finish_action(final_thought.action_input, history.last&.observation, iteration + 1, final_thought.thought, final_thought.action, history) end end
def handle_finish_action(action_input, last_observation, step, thought, action, history)
def handle_finish_action(action_input, last_observation, step, thought, action, history) final_answer = action_input.to_s # If final_answer is empty but we have a last observation, use it if (final_answer.nil? || final_answer.empty?) && last_observation final_answer = last_observation end # Always add the finish action to history history << HistoryEntry.new( step: step, thought: thought, action: action, action_input: final_answer, observation: nil # No observation for finish action ) final_answer end
def handle_max_iterations_if_needed(iterations_count, final_answer, tools_used, history)
def handle_max_iterations_if_needed(iterations_count, final_answer, tools_used, history) if iterations_count >= @max_iterations && final_answer.nil? Instrumentation.emit('dspy.react.max_iterations', { iteration_count: iterations_count, max_iterations: @max_iterations, tools_used: tools_used.uniq, final_history_length: history.length }) end end
def initialize(signature_class, tools: [], max_iterations: 5)
def initialize(signature_class, tools: [], max_iterations: 5) @original_signature_class = signature_class @tools = T.let({}, T::Hash[String, T.untyped]) tools.each { |tool| @tools[tool.name.downcase] = tool } @max_iterations = max_iterations # Create thought generator using Predict to preserve field descriptions @thought_generator = T.let(DSPy::Predict.new(Thought), DSPy::Predict) # Create observation processor using Predict to preserve field descriptions @observation_processor = T.let(DSPy::Predict.new(ReActObservation), DSPy::Predict) # Create enhanced output struct with ReAct fields @enhanced_output_struct = create_enhanced_output_struct(signature_class) enhanced_output_struct = @enhanced_output_struct # Create enhanced signature class enhanced_signature = Class.new(DSPy::Signature) do # Set the description description signature_class.description # Use the same input struct @input_struct_class = signature_class.input_struct_class # Use the enhanced output struct with ReAct fields @output_struct_class = enhanced_output_struct class << self attr_reader :input_struct_class, :output_struct_class end end # Call parent constructor with enhanced signature super(enhanced_signature) end
def process_observation_and_decide_next_step(question, history, observation, available_tools_desc, iteration)
def process_observation_and_decide_next_step(question, history, observation, available_tools_desc, iteration) return { should_finish: false } if observation.include?("Unknown action") observation_result = @observation_processor.forward( question: question, history: history, observation: observation ) return { should_finish: false } unless observation_result.next_step == NextStep::Finish final_answer = generate_forced_final_answer( question, history, available_tools_desc, observation_result, iteration ) { should_finish: true, final_answer: final_answer } end
def should_continue_iteration?(iterations_count, final_answer)
def should_continue_iteration?(iterations_count, final_answer) final_answer.nil? && (@max_iterations.nil? || iterations_count < @max_iterations) end
def valid_tool?(action)
def valid_tool?(action) !!(action && @tools[action.downcase]) end
def validate_output_schema!(output)
def validate_output_schema!(output) # Validate that output is an instance of the enhanced output struct unless output.is_a?(@enhanced_output_struct) raise "Output must be an instance of #{@enhanced_output_struct}, got #{output.class}" end # Validate original signature output fields are present @original_signature_class.output_struct_class.props.each do |field_name, _prop| unless output.respond_to?(field_name) raise "Missing required field: #{field_name}" end end # Validate ReAct-specific fields unless output.respond_to?(:history) && output.history.is_a?(Array) raise "Missing or invalid history field" end unless output.respond_to?(:iterations) && output.iterations.is_a?(Integer) raise "Missing or invalid iterations field" end unless output.respond_to?(:tools_used) && output.tools_used.is_a?(Array) raise "Missing or invalid tools_used field" end end