lib/temporalio/worker/workflow_replayer.rb



# frozen_string_literal: true

require 'temporalio/api'
require 'temporalio/converters'
require 'temporalio/internal/bridge'
require 'temporalio/internal/bridge/worker'
require 'temporalio/internal/worker/multi_runner'
require 'temporalio/internal/worker/workflow_worker'
require 'temporalio/worker/interceptor'
require 'temporalio/worker/thread_pool'
require 'temporalio/worker/tuner'
require 'temporalio/worker/workflow_executor'
require 'temporalio/workflow'
require 'temporalio/workflow_history'

module Temporalio
  class Worker
    # Replayer to replay workflows from existing history.
    class WorkflowReplayer
      Options = Data.define(
        :workflows,
        :namespace,
        :task_queue,
        :data_converter,
        :workflow_executor,
        :interceptors,
        :build_id,
        :identity,
        :logger,
        :illegal_workflow_calls,
        :workflow_failure_exception_types,
        :workflow_payload_codec_thread_pool,
        :debug_mode,
        :runtime
      )

      # Options as returned from {options} representing the options passed to the constructor.
      class Options; end # rubocop:disable Lint/EmptyClass

      # @return [Options] Options for this replayer which has the same attributes as {initialize}.
      attr_reader :options

      # Create a new replayer. This combines some options from both {Worker.initialize} and {Client.initialize}.
      #
      # @param workflows [Array<Class<Workflow::Definition>>] Workflows for this replayer.
      # @param namespace [String] Namespace as set in the workflow info.
      # @param task_queue [String] Task queue as set in the workflow info.
      # @param data_converter [Converters::DataConverter] Data converter to use for all data conversions to/from
      #   payloads.
      # @param workflow_executor [WorkflowExecutor] Workflow executor that workflow tasks run within. This must be a
      #   {WorkflowExecutor::ThreadPool} currently.
      # @param interceptors [Array<Interceptor::Workflow>] Workflow interceptors.
      # @param build_id [String] Unique identifier for the current runtime. This is best set as a unique value
      #   representing all code and should change only when code does. This can be something like a git commit hash. If
      #   unset, default is hash of known Ruby code.
      # @param identity [String, nil] Override the identity for this replater.
      # @param logger [Logger] Logger to use. Defaults to stdout with warn level. Callers setting this logger are
      #   responsible for closing it.
      # @param illegal_workflow_calls [Hash<String, [:all, Array<Symbol>]>] Set of illegal workflow calls that are
      #   considered unsafe/non-deterministic and will raise if seen. The key of the hash is the fully qualified string
      #   class name (no leading `::`). The value is either `:all` which means any use of the class, or an array of
      #   symbols for methods on the class that cannot be used. The methods refer to either instance or class methods,
      #   there is no way to differentiate at this time.
      # @param workflow_failure_exception_types [Array<Class<Exception>>] Workflow failure exception types. This is the
      #   set of exception types that, if a workflow-thrown exception extends, will cause the workflow/update to fail
      #   instead of suspending the workflow via task failure. These are applied in addition to the
      #   `workflow_failure_exception_type` on the workflow definition class itself. If {::Exception} is set, it
      #   effectively will fail a workflow/update in all user exception cases.
      # @param workflow_payload_codec_thread_pool [ThreadPool, nil] Thread pool to run payload codec encode/decode
      #   within. This is required if a payload codec exists and the worker is not fiber based. Codecs can potentially
      #   block execution which is why they need to be run in the background.
      # @param debug_mode [Boolean] If true, deadlock detection is disabled. Deadlock detection will fail workflow tasks
      #   if they block the thread for too long. This defaults to true if the `TEMPORAL_DEBUG` environment variable is
      #   `true` or `1`.
      # @param runtime [Runtime] Runtime for this replayer.
      #
      # @yield If a block is present, this is the equivalent of calling {with_replay_worker} with the block and
      #   discarding the result.
      def initialize(
        workflows:,
        namespace: 'ReplayNamespace',
        task_queue: 'ReplayTaskQueue',
        data_converter: Converters::DataConverter.default,
        workflow_executor: WorkflowExecutor::ThreadPool.default,
        interceptors: [],
        build_id: Worker.default_build_id,
        identity: nil,
        logger: Logger.new($stdout, level: Logger::WARN),
        illegal_workflow_calls: Worker.default_illegal_workflow_calls,
        workflow_failure_exception_types: [],
        workflow_payload_codec_thread_pool: nil,
        debug_mode: %w[true 1].include?(ENV['TEMPORAL_DEBUG'].to_s.downcase),
        runtime: Runtime.default,
        &
      )
        @options = Options.new(
          workflows:,
          namespace:,
          task_queue:,
          data_converter:,
          workflow_executor:,
          interceptors:,
          build_id:,
          identity:,
          logger:,
          illegal_workflow_calls:,
          workflow_failure_exception_types:,
          workflow_payload_codec_thread_pool:,
          debug_mode:,
          runtime:
        ).freeze
        # Preload definitions and other settings
        @workflow_definitions = Internal::Worker::WorkflowWorker.workflow_definitions(workflows)
        @nondeterminism_as_workflow_fail, @nondeterminism_as_workflow_fail_for_types =
          Internal::Worker::WorkflowWorker.bridge_workflow_failure_exception_type_options(
            workflow_failure_exception_types:, workflow_definitions: @workflow_definitions
          )
        # If there is a block, we'll go ahead and assume it's for with_replay_worker
        with_replay_worker(&) if block_given? # steep:ignore
      end

      # Replay a workflow history.
      #
      # If doing multiple histories, it is better to use {replay_workflows} or {with_replay_worker} since they create
      # a replay worker just once instead of each time like this call does.
      #
      # @param history [WorkflowHistory] History to replay.
      # @param raise_on_replay_failure [Boolean] If true, the default, this will raise an exception on any replay
      #   failure. If false and the replay fails, the failure will be available in {ReplayResult.replay_failure}.
      #
      # @return [ReplayResult] Result of the replay.
      def replay_workflow(history, raise_on_replay_failure: true)
        with_replay_worker { |worker| worker.replay_workflow(history, raise_on_replay_failure:) }
      end

      # Replay multiple workflow histories.
      #
      # @param histories [Enumerable<WorkflowHistory>] Histories to replay.
      # @param raise_on_replay_failure [Boolean] If true, this will raise an exception on any replay failure. If false,
      #   the default, and the replay fails, the failure will be available in {ReplayResult.replay_failure}.
      #
      # @return [Array<ReplayResult>] Results of the replay.
      def replay_workflows(histories, raise_on_replay_failure: false)
        with_replay_worker do |worker|
          histories.map { |h| worker.replay_workflow(h, raise_on_replay_failure:) }
        end
      end

      # Run a block of code with a {ReplayWorker} to execute replays.
      #
      # @yield Block of code to run with a replay worker.
      # @yieldparam [ReplayWorker] Worker to run replays on. Note, only one workflow can replay at a time.
      # @yieldreturn [Object] Result of the block.
      def with_replay_worker(&)
        worker = ReplayWorker.new(
          options:,
          workflow_definitions: @workflow_definitions,
          nondeterminism_as_workflow_fail: @nondeterminism_as_workflow_fail,
          nondeterminism_as_workflow_fail_for_types: @nondeterminism_as_workflow_fail_for_types
        )
        begin
          yield worker
        ensure
          worker._shutdown
        end
      end

      # Result of a single workflow replay run.
      class ReplayResult
        # @return [WorkflowHistory] History originally passed in to the replayer.
        attr_reader :history

        # @return [Exception, nil] Failure during replay if any.
        attr_reader :replay_failure

        # @!visibility private
        def initialize(history:, replay_failure:)
          @history = history
          @replay_failure = replay_failure
        end
      end

      # Replay worker that can be used to replay individual workflow runs. Only one call to {replay_workflow} can be
      # made at a time.
      class ReplayWorker
        # @!visibility private
        def initialize(
          options:,
          workflow_definitions:,
          nondeterminism_as_workflow_fail:,
          nondeterminism_as_workflow_fail_for_types:
        )
          # Create the bridge worker and the replayer
          @bridge_replayer, @bridge_worker = Internal::Bridge::Worker::WorkflowReplayer.new(
            options.runtime._core_runtime,
            Internal::Bridge::Worker::Options.new(
              activity: false,
              workflow: true,
              namespace: options.namespace,
              task_queue: options.task_queue,
              tuner: Tuner.create_fixed(
                workflow_slots: 2, activity_slots: 1, local_activity_slots: 1
              )._to_bridge_options,
              build_id: options.build_id,
              identity_override: options.identity,
              max_cached_workflows: 2,
              max_concurrent_workflow_task_polls: 1,
              nonsticky_to_sticky_poll_ratio: 1.0,
              max_concurrent_activity_task_polls: 1,
              no_remote_activities: true,
              sticky_queue_schedule_to_start_timeout: 1.0,
              max_heartbeat_throttle_interval: 1.0,
              default_heartbeat_throttle_interval: 1.0,
              max_worker_activities_per_second: nil,
              max_task_queue_activities_per_second: nil,
              graceful_shutdown_period: 0.0,
              use_worker_versioning: false,
              nondeterminism_as_workflow_fail:,
              nondeterminism_as_workflow_fail_for_types:
            )
          )

          # Create the workflow worker
          @workflow_worker = Internal::Worker::WorkflowWorker.new(
            bridge_worker: @bridge_worker,
            namespace: options.namespace,
            task_queue: options.task_queue,
            workflow_definitions:,
            workflow_executor: options.workflow_executor,
            logger: options.logger,
            data_converter: options.data_converter,
            metric_meter: options.runtime.metric_meter,
            workflow_interceptors: options.interceptors.select do |i|
              i.is_a?(Interceptor::Workflow)
            end,
            disable_eager_activity_execution: false,
            illegal_workflow_calls: options.illegal_workflow_calls,
            workflow_failure_exception_types: options.workflow_failure_exception_types,
            workflow_payload_codec_thread_pool: options.workflow_payload_codec_thread_pool,
            debug_mode: options.debug_mode,
            on_eviction: proc { |_, remove_job| @last_workflow_remove_job = remove_job } # steep:ignore
          )

          # Create the runner
          @runner = Internal::Worker::MultiRunner.new(workers: [self], shutdown_signals: [])
        end

        # Replay a workflow history.
        #
        # @param history [WorkflowHistory] History to replay.
        # @param raise_on_replay_failure [Boolean] If true, the default, this will raise an exception on any replay
        #   failure. If false and the replay fails, the failure will be available in {ReplayResult.replay_failure}.
        #
        # @return [ReplayResult] Result of the replay.
        def replay_workflow(history, raise_on_replay_failure: true)
          raise ArgumentError, 'Expected history as WorkflowHistory' unless history.is_a?(WorkflowHistory)
          # Due to our event processing model, only one can run at a time
          raise 'Already running' if @running
          raise 'Replayer shutdown' if @shutdown

          # Push history proto
          # TODO(cretz): Unset this
          @running = true
          @last_workflow_remove_job = nil
          begin
            @bridge_replayer.push_history(
              history.workflow_id, Api::History::V1::History.new(events: history.events).to_proto
            )

            # Process events until workflow complete
            until @last_workflow_remove_job
              event = @runner.next_event
              case event
              when Internal::Worker::MultiRunner::Event::PollSuccess
                @workflow_worker.handle_activation(
                  runner: @runner,
                  activation: Internal::Bridge::Api::WorkflowActivation::WorkflowActivation.decode(event.bytes),
                  decoded: false
                )
              when Internal::Worker::MultiRunner::Event::WorkflowActivationDecoded
                @workflow_worker.handle_activation(runner: @runner, activation: event.activation, decoded: true)
              when Internal::Worker::MultiRunner::Event::WorkflowActivationComplete
                @workflow_worker.handle_activation_complete(
                  runner: @runner,
                  activation_completion: event.activation_completion,
                  encoded: event.encoded,
                  completion_complete_queue: event.completion_complete_queue
                )
              when Internal::Worker::MultiRunner::Event::WorkflowActivationCompletionComplete
              # Ignore
              else
                raise "Unexpected event: #{event}"
              end
            end

            # Create exception if removal is due to error
            err = if @last_workflow_remove_job.reason == :NONDETERMINISM
                    Workflow::NondeterminismError.new(
                      "#{@last_workflow_remove_job.reason}: #{@last_workflow_remove_job.message}"
                    )
                  elsif !%i[CACHE_FULL LANG_REQUESTED].include?(@last_workflow_remove_job.reason)
                    Workflow::InvalidWorkflowStateError.new(
                      "#{@last_workflow_remove_job.reason}: #{@last_workflow_remove_job.message}"
                    )
                  end
            # Raise if wanting to raise, otherwise return result
            raise err if raise_on_replay_failure && err

            ReplayResult.new(history:, replay_failure: err)
          ensure
            @running = false
          end
        end

        # @!visibility private
        def _shutdown
          @shutdown = true
          @runner.initiate_shutdown
          # Wait for all-pollers-shutdown before finalizing
          until @runner.next_event.is_a?(Internal::Worker::MultiRunner::Event::AllPollersShutDown); end
          @runner.wait_complete_and_finalize_shutdown
          @workflow_worker.on_shutdown_complete
          @workflow_worker = nil
        end

        # @!visibility private
        def _bridge_worker
          @bridge_worker
        end

        # @!visibility private
        def _initiate_shutdown
          _bridge_worker.initiate_shutdown
        end

        # @!visibility private
        def _wait_all_complete
          # Do nothing
        end
      end
    end
  end
end