lib/temporalio/testing/activity_environment.rb



# frozen_string_literal: true

require 'temporalio/activity'
require 'temporalio/cancellation'
require 'temporalio/converters/payload_converter'
require 'temporalio/worker/activity_executor'

module Temporalio
  module Testing
    # Test environment for testing activities.
    #
    # Users can create this environment and then use {run} to execute activities on it. Often, since mutable things like
    # cancellation can be set, users create this for each activity that is run. There is no real performance penalty for
    # creating an environment for every run.
    class ActivityEnvironment
      # @return [Activity::Info] The activity info used by default. This is frozen, but can be dup'd and mutated to pass
      #   in to {initialize}.
      def self.default_info
        @default_info ||= Activity::Info.new(
          activity_id: 'test',
          activity_type: 'unknown',
          attempt: 1,
          current_attempt_scheduled_time: Time.at(0),
          heartbeat_details: [],
          heartbeat_timeout: nil,
          local?: false,
          schedule_to_close_timeout: 1.0,
          scheduled_time: Time.at(0),
          start_to_close_timeout: 1.0,
          started_time: Time.at(0),
          task_queue: 'test',
          task_token: String.new('test', encoding: Encoding::ASCII_8BIT),
          workflow_id: 'test',
          workflow_namespace: 'default',
          workflow_run_id: 'test-run',
          workflow_type: 'test'
        ).freeze
      end

      # Create a test environment for activities.
      #
      # @param info [Activity::Info] Value for {Activity::Context#info}.
      # @param on_heartbeat [Proc(Array), nil] Proc that is called with all heartbeat details when
      #   {Activity::Context#heartbeat} is called.
      # @param cancellation [Cancellation] Value for {Activity::Context#cancellation}.
      # @param worker_shutdown_cancellation [Cancellation] Value for {Activity::Context#worker_shutdown_cancellation}.
      # @param payload_converter [Converters::PayloadConverter] Value for {Activity::Context#payload_converter}.
      # @param logger [Logger] Value for {Activity::Context#logger}.
      # @param activity_executors [Hash<Symbol, Worker::ActivityExecutor>] Executors that activities can run within.
      def initialize(
        info: ActivityEnvironment.default_info,
        on_heartbeat: nil,
        cancellation: Cancellation.new,
        worker_shutdown_cancellation: Cancellation.new,
        payload_converter: Converters::PayloadConverter.default,
        logger: Logger.new(nil),
        activity_executors: Worker::ActivityExecutor.defaults
      )
        @info = info
        @on_heartbeat = on_heartbeat
        @cancellation = cancellation
        @worker_shutdown_cancellation = worker_shutdown_cancellation
        @payload_converter = payload_converter
        @logger = logger
        @activity_executors = activity_executors
      end

      # Run an activity and returns its result or raises its exception.
      #
      # @param activity [Activity::Definition, Class<Activity::Definition>, Activity::Definition::Info] Activity to run.
      # @param args [Array<Object>] Arguments to the activity.
      # @return Activity result.
      def run(activity, *args)
        defn = Activity::Definition::Info.from_activity(activity)
        executor = @activity_executors[defn.executor]
        raise ArgumentError, "Unknown executor: #{defn.executor}" if executor.nil?

        queue = Queue.new
        executor.execute_activity(defn) do
          Activity::Context._current_executor = executor
          executor.set_activity_context(defn, Context.new(
                                                info: @info.dup,
                                                instance:
                                                  defn.instance.is_a?(Proc) ? defn.instance.call : defn.instance,
                                                on_heartbeat: @on_heartbeat,
                                                cancellation: @cancellation,
                                                worker_shutdown_cancellation: @worker_shutdown_cancellation,
                                                payload_converter: @payload_converter,
                                                logger: @logger
                                              ))
          queue.push([defn.proc.call(*args), nil])
        rescue Exception => e # rubocop:disable Lint/RescueException Intentionally capturing all exceptions
          queue.push([nil, e])
        ensure
          executor.set_activity_context(defn, nil)
          Activity::Context._current_executor = nil
        end

        result, err = queue.pop
        raise err unless err.nil?

        result
      end

      # @!visibility private
      class Context < Activity::Context
        attr_reader :info, :instance, :cancellation, :worker_shutdown_cancellation, :payload_converter, :logger

        def initialize( # rubocop:disable Lint/MissingSuper
          info:,
          instance:,
          on_heartbeat:,
          cancellation:,
          worker_shutdown_cancellation:,
          payload_converter:,
          logger:
        )
          @info = info
          @instance = instance
          @on_heartbeat = on_heartbeat
          @cancellation = cancellation
          @worker_shutdown_cancellation = worker_shutdown_cancellation
          @payload_converter = payload_converter
          @logger = logger
        end

        # @!visibility private
        def heartbeat(*details)
          @on_heartbeat&.call(details)
        end
      end

      private_constant :Context
    end
  end
end