lib/temporalio/internal/worker/activity_worker.rb



# frozen_string_literal: true

require 'temporalio/activity'
require 'temporalio/activity/definition'
require 'temporalio/cancellation'
require 'temporalio/converters/raw_value'
require 'temporalio/internal/bridge/api'
require 'temporalio/internal/proto_utils'
require 'temporalio/scoped_logger'
require 'temporalio/worker/interceptor'

module Temporalio
  module Internal
    module Worker
      # Worker for handling activity tasks. Upon overarching worker shutdown, {wait_all_complete} should be used to wait
      # for the activities to complete.
      class ActivityWorker
        LOG_TASKS = false

        attr_reader :worker, :bridge_worker

        def initialize(worker:, bridge_worker:)
          @worker = worker
          @bridge_worker = bridge_worker
          @runtime_metric_meter = worker.options.client.connection.options.runtime.metric_meter

          # Create shared logger that gives scoped activity details
          @scoped_logger = ScopedLogger.new(@worker.options.logger)
          @scoped_logger.scoped_values_getter = proc {
            Activity::Context.current_or_nil&._scoped_logger_info
          }

          # Build up activity hash by name (can be nil for dynamic), failing if any fail validation
          @activities = worker.options.activities.each_with_object({}) do |act, hash|
            # Class means create each time, instance means just call, definition
            # does nothing special
            defn = Activity::Definition::Info.from_activity(act)
            # Confirm name not in use
            raise ArgumentError, 'Only one dynamic activity allowed' if !defn.name && hash.key?(defn.name)
            raise ArgumentError, "Multiple activities named #{defn.name}" if hash.key?(defn.name)

            # Confirm executor is a known executor and let it initialize
            executor = worker.options.activity_executors[defn.executor]
            raise ArgumentError, "Unknown executor '#{defn.executor}'" if executor.nil?

            executor.initialize_activity(defn)

            hash[defn.name] = defn
          end

          # Need mutex for the rest of these
          @running_activities_mutex = Mutex.new
          @running_activities = {}
          @running_activities_empty_condvar = ConditionVariable.new
        end

        def set_running_activity(task_token, activity)
          @running_activities_mutex.synchronize do
            @running_activities[task_token] = activity
          end
        end

        def get_running_activity(task_token)
          @running_activities_mutex.synchronize do
            @running_activities[task_token]
          end
        end

        def remove_running_activity(task_token)
          @running_activities_mutex.synchronize do
            @running_activities.delete(task_token)
            @running_activities_empty_condvar.broadcast if @running_activities.empty?
          end
        end

        def wait_all_complete
          @running_activities_mutex.synchronize do
            @running_activities_empty_condvar.wait(@running_activities_mutex) until @running_activities.empty?
          end
        end

        def handle_task(task)
          @scoped_logger.debug("Received activity task: #{task}") if LOG_TASKS
          if !task.start.nil?
            handle_start_task(task.task_token, task.start)
          elsif !task.cancel.nil?
            handle_cancel_task(task.task_token, task.cancel)
          else
            raise "Unrecognized activity task: #{task}"
          end
        end

        def handle_start_task(task_token, start)
          set_running_activity(task_token, nil)

          # Find activity definition, falling back to dynamic if not found and not reserved name
          defn = @activities[start.activity_type]
          defn = @activities[nil] if !defn && !Internal::ProtoUtils.reserved_name?(start.activity_type)

          if defn.nil?
            raise Error::ApplicationError.new(
              "Activity #{start.activity_type} for workflow #{start.workflow_execution.workflow_id} " \
              "is not registered on this worker, available activities: #{@activities.keys.sort.join(', ')}",
              type: 'NotFoundError'
            )
          end

          # Run everything else in the excecutor
          executor = @worker.options.activity_executors[defn.executor]
          executor.execute_activity(defn) do
            # Set current executor
            Activity::Context._current_executor = executor
            # Execute with error handling
            execute_activity(task_token, defn, start)
          ensure
            # Unset at the end
            Activity::Context._current_executor = nil
          end
        rescue Exception => e # rubocop:disable Lint/RescueException -- We are intending to catch everything here
          remove_running_activity(task_token)
          @scoped_logger.warn("Failed starting activity #{start.activity_type}")
          @scoped_logger.warn(e)

          # We need to complete the activity task as failed, but this is on the
          # hot path for polling, so we want to complete it in the background
          begin
            @bridge_worker.complete_activity_task_in_background(
              Bridge::Api::CoreInterface::ActivityTaskCompletion.new(
                task_token:,
                result: Bridge::Api::ActivityResult::ActivityExecutionResult.new(
                  failed: Bridge::Api::ActivityResult::Failure.new(
                    # TODO(cretz): If failure conversion does slow failure
                    # encoding, it can gum up the system
                    failure: @worker.options.client.data_converter.to_failure(e)
                  )
                )
              )
            )
          rescue StandardError => e_inner
            @scoped_logger.error("Failed building start failure to return for #{start.activity_type}")
            @scoped_logger.error(e_inner)
          end
        end

        def handle_cancel_task(task_token, cancel)
          activity = get_running_activity(task_token)
          if activity.nil?
            @scoped_logger.warn("Cannot find activity to cancel for token #{task_token}")
            return
          end
          begin
            activity._cancel(
              reason: cancel.reason.to_s,
              details: Activity::CancellationDetails.new(
                gone_from_server: cancel.details.is_not_found,
                cancel_requested: cancel.details.is_cancelled,
                timed_out: cancel.details.is_timed_out,
                worker_shutdown: cancel.details.is_worker_shutdown,
                paused: cancel.details.is_paused,
                reset: cancel.details.is_reset
              )
            )
          rescue StandardError => e
            @scoped_logger.warn("Failed cancelling activity #{activity.info.activity_type} \
              with ID #{activity.info.activity_id}")
            @scoped_logger.warn(e)
          end
        end

        def execute_activity(task_token, defn, start)
          # Build info
          info = Activity::Info.new(
            activity_id: start.activity_id,
            activity_type: start.activity_type,
            attempt: start.attempt,
            current_attempt_scheduled_time: Internal::ProtoUtils.timestamp_to_time(
              start.current_attempt_scheduled_time
            ) || raise, # Never nil
            heartbeat_timeout: Internal::ProtoUtils.duration_to_seconds(start.heartbeat_timeout),
            local?: start.is_local,
            priority: Priority._from_proto(start.priority),
            raw_heartbeat_details: begin
              payloads = start.heartbeat_details.to_ary
              codec = @worker.options.client.data_converter.payload_codec
              payloads = codec.decode(payloads) if codec
              payloads.map { |p| Temporalio::Converters::RawValue.new(p) }
            end,
            schedule_to_close_timeout: Internal::ProtoUtils.duration_to_seconds(start.schedule_to_close_timeout),
            scheduled_time: Internal::ProtoUtils.timestamp_to_time(start.scheduled_time) || raise, # Never nil
            start_to_close_timeout: Internal::ProtoUtils.duration_to_seconds(start.start_to_close_timeout),
            started_time: Internal::ProtoUtils.timestamp_to_time(start.started_time) || raise, # Never nil
            task_queue: @worker.options.task_queue,
            task_token:,
            workflow_id: start.workflow_execution.workflow_id,
            workflow_namespace: start.workflow_namespace,
            workflow_run_id: start.workflow_execution.run_id,
            workflow_type: start.workflow_type
          )

          # Build input
          input = Temporalio::Worker::Interceptor::Activity::ExecuteInput.new(
            proc: defn.proc,
            # If the activity wants raw_args, we only decode we don't convert
            args: if defn.raw_args
                    payloads = start.input.to_ary
                    codec = @worker.options.client.data_converter.payload_codec
                    payloads = codec.decode(payloads) if codec
                    payloads.map { |p| Temporalio::Converters::RawValue.new(p) }
                  else
                    ProtoUtils.convert_from_payload_array(
                      @worker.options.client.data_converter,
                      start.input.to_ary,
                      hints: defn.arg_hints
                    )
                  end,
            result_hint: defn.result_hint,
            headers: ProtoUtils.headers_from_proto_map(start.header_fields, @worker.options.client.data_converter) || {}
          )

          # Run
          activity = RunningActivity.new(
            worker: @worker,
            info:,
            cancellation: Cancellation.new,
            worker_shutdown_cancellation: @worker._worker_shutdown_cancellation,
            payload_converter: @worker.options.client.data_converter.payload_converter,
            logger: @scoped_logger,
            runtime_metric_meter: @runtime_metric_meter
          )
          Activity::Context._current_executor&.set_activity_context(defn, activity)
          set_running_activity(task_token, activity)
          run_activity(defn, activity, input)
        rescue Exception => e # rubocop:disable Lint/RescueException -- We are intending to catch everything here
          @scoped_logger.warn("Failed starting or sending completion for activity #{start.activity_type}")
          @scoped_logger.warn(e)
          # This means that the activity couldn't start or send completion (run
          # handles its own errors).
          begin
            @bridge_worker.complete_activity_task(
              Bridge::Api::CoreInterface::ActivityTaskCompletion.new(
                task_token:,
                result: Bridge::Api::ActivityResult::ActivityExecutionResult.new(
                  failed: Bridge::Api::ActivityResult::Failure.new(
                    failure: @worker.options.client.data_converter.to_failure(e)
                  )
                )
              )
            )
          rescue StandardError => e_inner
            @scoped_logger.error("Failed sending failure for activity #{start.activity_type}")
            @scoped_logger.error(e_inner)
          end
        ensure
          Activity::Context._current_executor&.set_activity_context(defn, nil)
          remove_running_activity(task_token)
        end

        def run_activity(defn, activity, input)
          result = begin
            # Create the instance. We choose to do this before interceptors so that it is available in the interceptor.
            activity.instance = defn.instance.is_a?(Proc) ? defn.instance.call : defn.instance # steep:ignore

            # Build impl with interceptors
            # @type var impl: Temporalio::Worker::Interceptor::Activity::Inbound
            impl = InboundImplementation.new(self)
            impl = @worker._activity_interceptors.reverse_each.reduce(impl) do |acc, int|
              int.intercept_activity(acc)
            end
            impl.init(OutboundImplementation.new(self, activity.info.task_token))

            # Execute
            result = impl.execute(input)

            # Success
            Bridge::Api::ActivityResult::ActivityExecutionResult.new(
              completed: Bridge::Api::ActivityResult::Success.new(
                result: @worker.options.client.data_converter.to_payload(result, hint: input.result_hint)
              )
            )
          rescue Exception => e # rubocop:disable Lint/RescueException -- We are intending to catch everything here
            if e.is_a?(Activity::CompleteAsyncError)
              # Wanting to complete async
              @scoped_logger.debug('Completing activity asynchronously')
              Bridge::Api::ActivityResult::ActivityExecutionResult.new(
                will_complete_async: Bridge::Api::ActivityResult::WillCompleteAsync.new
              )
            elsif e.is_a?(Error::CanceledError) && activity.cancellation_details&.paused?
              # Server requested pause
              @scoped_logger.debug('Completing activity as failed due to exception caused by pause')
              Bridge::Api::ActivityResult::ActivityExecutionResult.new(
                failed: Bridge::Api::ActivityResult::Failure.new(
                  failure: @worker.options.client.data_converter.to_failure(
                    Error._with_backtrace_and_cause(
                      Error::ApplicationError.new('Activity paused', type: 'ActivityPause'), backtrace: nil, cause: e
                    )
                  )
                )
              )
            elsif e.is_a?(Error::CanceledError) && activity.cancellation_details&.reset?
              # Server requested reset
              @scoped_logger.debug('Completing activity as failed due to exception caused by reset')
              Bridge::Api::ActivityResult::ActivityExecutionResult.new(
                failed: Bridge::Api::ActivityResult::Failure.new(
                  failure: @worker.options.client.data_converter.to_failure(
                    Error._with_backtrace_and_cause(
                      Error::ApplicationError.new('Activity reset', type: 'ActivityReset'), backtrace: nil, cause: e
                    )
                  )
                )
              )
            elsif e.is_a?(Error::CanceledError) && activity._server_requested_cancel
              # Server requested cancel
              @scoped_logger.debug('Completing activity as canceled')
              Bridge::Api::ActivityResult::ActivityExecutionResult.new(
                cancelled: Bridge::Api::ActivityResult::Cancellation.new(
                  failure: @worker.options.client.data_converter.to_failure(e)
                )
              )
            else
              # General failure
              log_level = if e.is_a?(Error::ApplicationError) && e.category == Error::ApplicationError::Category::BENIGN
                            Logger::DEBUG
                          else
                            Logger::WARN
                          end
              @scoped_logger.add(log_level, 'Completing activity as failed')
              @scoped_logger.add(log_level, e)
              Bridge::Api::ActivityResult::ActivityExecutionResult.new(
                failed: Bridge::Api::ActivityResult::Failure.new(
                  failure: @worker.options.client.data_converter.to_failure(e)
                )
              )
            end
          end

          @scoped_logger.debug("Sending activity completion: #{result}") if LOG_TASKS
          @bridge_worker.complete_activity_task(
            Bridge::Api::CoreInterface::ActivityTaskCompletion.new(
              task_token: activity.info.task_token,
              result:
            )
          )
        end

        def assert_valid_activity(activity)
          defn = @activities[activity]
          defn = @activities[nil] if !defn && !Internal::ProtoUtils.reserved_name?(activity)

          return unless defn.nil?

          raise ArgumentError,
                "Activity #{activity} " \
                "is not registered on this worker, available activities: #{@activities.keys.sort.join(', ')}"
        end

        class RunningActivity < Activity::Context
          attr_reader :info, :cancellation, :cancellation_details, :worker_shutdown_cancellation,
                      :payload_converter, :logger, :_server_requested_cancel
          attr_accessor :instance, :_outbound_impl

          def initialize( # rubocop:disable Lint/MissingSuper
            worker:,
            info:,
            cancellation:,
            worker_shutdown_cancellation:,
            payload_converter:,
            logger:,
            runtime_metric_meter:
          )
            @worker = worker
            @info = info
            @cancellation = cancellation
            @cancellation_details = nil
            @worker_shutdown_cancellation = worker_shutdown_cancellation
            @payload_converter = payload_converter
            @logger = logger
            @runtime_metric_meter = runtime_metric_meter
            @_outbound_impl = nil
            @_server_requested_cancel = false
          end

          def heartbeat(*details, detail_hints: nil)
            raise 'Implementation not set yet' if _outbound_impl.nil?

            # No-op if local
            return if info.local?

            _outbound_impl.heartbeat(
              Temporalio::Worker::Interceptor::Activity::HeartbeatInput.new(details:, detail_hints:)
            )
          end

          def metric_meter
            @metric_meter ||= @runtime_metric_meter.with_additional_attributes(
              {
                namespace: info.workflow_namespace,
                task_queue: info.task_queue,
                activity_type: info.activity_type
              }
            )
          end

          def client
            @worker.client
          end

          def _cancel(reason:, details:)
            # Do not issue cancel if already canceled
            return if @cancellation_details

            @_server_requested_cancel = true
            # Set the cancellation details _before_ issuing the cancel itself
            @cancellation_details = details
            _, cancel_proc = cancellation
            cancel_proc.call(reason:)
          end
        end

        class InboundImplementation < Temporalio::Worker::Interceptor::Activity::Inbound
          def initialize(worker)
            super(nil) # steep:ignore
            @worker = worker
          end

          def init(outbound)
            context = Activity::Context.current
            raise 'Unexpected context type' unless context.is_a?(RunningActivity)

            context._outbound_impl = outbound
          end

          def execute(input)
            input.proc.call(*input.args)
          end
        end

        class OutboundImplementation < Temporalio::Worker::Interceptor::Activity::Outbound
          def initialize(worker, task_token)
            super(nil) # steep:ignore
            @worker = worker
            @task_token = task_token
          end

          def heartbeat(input)
            @worker.bridge_worker.record_activity_heartbeat(
              Bridge::Api::CoreInterface::ActivityHeartbeat.new(
                task_token: @task_token,
                details: ProtoUtils.convert_to_payload_array(@worker.worker.options.client.data_converter,
                                                             input.details,
                                                             hints: input.detail_hints)
              ).to_proto
            )
          end
        end
      end
    end
  end
end