lib/temporalio/workflow/definition.rb



# frozen_string_literal: true

require 'temporalio/workflow'
require 'temporalio/workflow/handler_unfinished_policy'

module Temporalio
  module Workflow
    # Base class for all workflows.
    #
    # Workflows are instances of this class and must implement {execute}. Inside the workflow code, class methods on
    # {Workflow} can be used.
    #
    # By default, the workflow is named as its unqualified class name. This can be customized with {workflow_name}.
    class Definition
      class << self
        protected

        # Customize the workflow name. By default the workflow is named the unqualified class name of the class provided
        # to the worker.
        #
        # @param workflow_name [String, Symbol] Name to use.
        def workflow_name(workflow_name)
          if !workflow_name.is_a?(Symbol) && !workflow_name.is_a?(String)
            raise ArgumentError,
                  'Workflow name must be a symbol or string'
          end

          @workflow_name = workflow_name.to_s
        end

        # Set a workflow as dynamic. Dynamic workflows do not have names and handle any workflow that is not otherwise
        # registered. A worker can only have one dynamic workflow. It is often useful to use {workflow_raw_args} with
        # this.
        #
        # @param value [Boolean] Whether the workflow is dynamic.
        def workflow_dynamic(value = true) # rubocop:disable Style/OptionalBooleanParameter
          @workflow_dynamic = value
        end

        # Have workflow arguments delivered to `execute` (and `initialize` if {workflow_init} in use) as
        # {Converters::RawValue}s. These are wrappers for the raw payloads that have not been converted to types (but
        # they have been decoded by the codec if present). They can be converted with {Workflow.payload_converter}.
        #
        # @param value [Boolean] Whether the workflow accepts raw arguments.
        def workflow_raw_args(value = true) # rubocop:disable Style/OptionalBooleanParameter
          @workflow_raw_args = value
        end

        # Configure workflow failure exception types. This sets the types of exceptions that, if a
        # workflow-thrown exception extends, will cause the workflow/update to fail instead of suspending the workflow
        # via task failure. These are applied in addition to the worker option. If {::Exception} is set, it effectively
        # will fail a workflow/update in all user exception cases.
        #
        # @param types [Array<Class<Exception>>] Exception types to turn into workflow failures.
        def workflow_failure_exception_type(*types)
          types.each do |t|
            raise ArgumentError, 'All types must classes inheriting Exception' unless t.is_a?(Class) && t < Exception
          end
          @workflow_failure_exception_types ||= []
          @workflow_failure_exception_types.concat(types)
        end

        # Expose an attribute as a method and as a query. A `workflow_query_attr_reader :foo` is the equivalent of:
        # ```
        # workflow_query
        # def foo
        #   @foo
        # end
        # ```
        # This means it is a superset of `attr_reader`` and will not work if also using `attr_reader` or
        # `attr_accessor`. If a writer is needed alongside this, use `attr_writer`.
        #
        # @param attr_names [Array<Symbol>] Attributes to expose.
        def workflow_query_attr_reader(*attr_names)
          @workflow_queries ||= {}
          attr_names.each do |attr_name|
            raise 'Expected attr to be a symbol' unless attr_name.is_a?(Symbol)

            if method_defined?(attr_name, false)
              raise 'Method already defined for this attr name. ' \
                    'Note that a workflow_query_attr_reader includes attr_reader behavior. ' \
                    'If you also want a writer for this attribute, use a separate attr_writer.'
            end

            # Just run this as if done manually
            workflow_query
            define_method(attr_name) { instance_variable_get("@#{attr_name}") }
          end
        end

        # Mark an `initialize` as needing the workflow start arguments. Otherwise, `initialize` must accept no required
        # arguments. This must be placed above the `initialize` method or it will fail.
        #
        # @param value [Boolean] Whether the start parameters will be passed to `initialize`.
        def workflow_init(value = true) # rubocop:disable Style/OptionalBooleanParameter
          self.pending_handler_details = { type: :init, value: }
        end

        # Mark the next method as a workflow signal with a default name as the name of the method. Signals cannot return
        # values.
        #
        # @param name [String, Symbol, nil] Override the default name.
        # @param dynamic [Boolean] If true, make the signal dynamic. This means it receives all other signals without
        #   handlers. This cannot have a name override since it is nameless. The first parameter will be the name. Often
        #   it is useful to have the second parameter be `*args` and `raw_args` be true.
        # @param raw_args [Boolean] If true, does not convert arguments, but instead provides each argument as
        #   {Converters::RawValue} which is a raw payload wrapper, convertible with {Workflow.payload_converter}.
        # @param unfinished_policy [HandlerUnfinishedPolicy] How to treat unfinished handlers if they are still running
        #   when the workflow ends. The default warns, but this can be disabled.
        def workflow_signal(
          name: nil,
          dynamic: false,
          raw_args: false,
          unfinished_policy: HandlerUnfinishedPolicy::WARN_AND_ABANDON
        )
          raise 'Cannot provide name if dynamic is true' if name && dynamic

          self.pending_handler_details = { type: :signal, name:, dynamic:, raw_args:, unfinished_policy: }
        end

        # Mark the next method as a workflow query with a default name as the name of the method. Queries can not have
        # any side effects, meaning they should never mutate state or try to wait on anything.
        #
        # @param name [String, Symbol, nil] Override the default name.
        # @param dynamic [Boolean] If true, make the query dynamic. This means it receives all other queries without
        #   handlers. This cannot have a name override since it is nameless. The first parameter will be the name. Often
        #   it is useful to have the second parameter be `*args` and `raw_args` be true.
        # @param raw_args [Boolean] If true, does not convert arguments, but instead provides each argument as
        #   {Converters::RawValue} which is a raw payload wrapper, convertible with {Workflow.payload_converter}.
        def workflow_query(
          name: nil,
          dynamic: false,
          raw_args: false
        )
          raise 'Cannot provide name if dynamic is true' if name && dynamic

          self.pending_handler_details = { type: :query, name:, dynamic:, raw_args: }
        end

        # Mark the next method as a workflow update with a default name as the name of the method. Updates can return
        # values. Separate validation methods can be provided via {workflow_update_validator}.
        #
        # @param name [String, Symbol, nil] Override the default name.
        # @param dynamic [Boolean] If true, make the update dynamic. This means it receives all other updates without
        #   handlers. This cannot have a name override since it is nameless. The first parameter will be the name. Often
        #   it is useful to have the second parameter be `*args` and `raw_args` be true.
        # @param raw_args [Boolean] If true, does not convert arguments, but instead provides each argument as
        #   {Converters::RawValue} which is a raw payload wrapper, convertible with {Workflow.payload_converter}.
        # @param unfinished_policy [HandlerUnfinishedPolicy] How to treat unfinished handlers if they are still running
        #   when the workflow ends. The default warns, but this can be disabled.
        def workflow_update(
          name: nil,
          dynamic: false,
          raw_args: false,
          unfinished_policy: HandlerUnfinishedPolicy::WARN_AND_ABANDON
        )
          raise 'Cannot provide name if dynamic is true' if name && dynamic

          self.pending_handler_details = { type: :update, name:, dynamic:, raw_args:, unfinished_policy: }
        end

        # Mark the next method as a workflow update validator to the given update method. The validator is expected to
        # have the exact same parameter signature. It will run before an update and if it raises an exception, the
        # update will be rejected, possibly before even reaching history. Validators cannot have any side effects or do
        # any waiting, and they do not return values.
        #
        # @param update_method [Symbol] Name of the update method.
        def workflow_update_validator(update_method)
          self.pending_handler_details = { type: :update_validator, update_method: }
        end

        private

        attr_reader :pending_handler_details

        def pending_handler_details=(value)
          if value.nil?
            @pending_handler_details = value
            return
          elsif @pending_handler_details
            raise "Previous #{@pending_handler_details[:type]} handler was not put on method before this handler"
          end

          @pending_handler_details = value
        end
      end

      # @!visibility private
      def self.method_added(method_name)
        super

        # Nothing to do if there are no pending handler details
        handler = pending_handler_details
        return unless handler

        # Reset details
        self.pending_handler_details = nil

        # Initialize class variables if not done already
        @workflow_signals ||= {}
        @workflow_queries ||= {}
        @workflow_updates ||= {}
        @workflow_update_validators ||= {}
        @defined_methods ||= []

        defn, hash, other_hashes =
          case handler[:type]
          when :init
            raise "workflow_init was applied to #{method_name} instead of initialize" if method_name != :initialize

            @workflow_init = handler[:value]
            return
          when :update_validator
            other = @workflow_update_validators[handler[:update_method]]
            if other && (other[:method_name] != method_name || other[:update_method] != handler[:update_method])
              raise "Workflow update validator on #{method_name} for #{handler[:update_method]} defined separately " \
                    "on #{other[:method_name]} for #{other[:update_method]}"
            end

            # Just store this, we'll apply validators to updates at definition
            # building time
            @workflow_update_validators[handler[:update_method]] = { method_name:, **handler }
            return
          when :signal
            [Signal.new(
              name: handler[:dynamic] ? nil : (handler[:name] || method_name).to_s,
              to_invoke: method_name,
              raw_args: handler[:raw_args],
              unfinished_policy: handler[:unfinished_policy]
            ), @workflow_signals, [@workflow_queries, @workflow_updates]]
          when :query
            [Query.new(
              name: handler[:dynamic] ? nil : (handler[:name] || method_name).to_s,
              to_invoke: method_name,
              raw_args: handler[:raw_args]
            ), @workflow_queries, [@workflow_signals, @workflow_updates]]
          when :update
            [Update.new(
              name: handler[:dynamic] ? nil : (handler[:name] || method_name).to_s,
              to_invoke: method_name,
              raw_args: handler[:raw_args],
              unfinished_policy: handler[:unfinished_policy]
            ), @workflow_updates, [@workflow_signals, @workflow_queries]]
          else
            raise "Unrecognized handler type #{handler[:type]}"
          end

        # We only allow dupes with the same method name (override/redefine)
        # TODO(cretz): Should we also check that everything else is the same?
        other = hash[defn.name]
        if other && other.to_invoke != method_name
          raise "Workflow #{handler[:type].name} #{defn.name || '<dynamic>'} defined on " \
                "different methods #{other.to_invoke} and #{method_name}"
        elsif defn.name && other_hashes.any? { |h| h.include?(defn.name) }
          raise "Workflow signal #{defn.name} already defined as a different handler type"
        end
        hash[defn.name] = defn

        # Define class method for referencing the definition only if non-dynamic
        return unless defn.name

        define_singleton_method(method_name) { defn }
        @defined_methods.push(method_name)
      end

      # @!visibility private
      def self.singleton_method_added(method_name)
        super
        # We need to ensure class methods are not added after we have defined a method
        return unless @defined_methods&.include?(method_name)

        raise 'Attempting to override Temporal-defined class definition method'
      end

      # @!visibility private
      def self._workflow_definition
        @workflow_definition ||= _build_workflow_definition
      end

      # @!visibility private
      def self._workflow_type_from_workflow_parameter(workflow)
        case workflow
        when Class
          unless workflow < Definition
            raise ArgumentError, "Class '#{workflow}' does not extend Temporalio::Workflow::Definition"
          end

          info = Info.from_class(workflow)
          info.name || raise(ArgumentError, 'Cannot pass dynamic workflow to start')
        when Info
          workflow.name || raise(ArgumentError, 'Cannot pass dynamic workflow to start')
        when String, Symbol
          workflow.to_s
        else
          raise ArgumentError, 'Workflow is not a workflow class or string/symbol'
        end
      end

      # @!visibility private
      def self._build_workflow_definition
        # Make sure there isn't dangling pending handler details
        if pending_handler_details
          raise "Leftover #{pending_handler_details&.[](:type)} handler not applied to a method"
        end

        # Apply all update validators before merging with super
        updates = @workflow_updates&.dup || {}
        @workflow_update_validators&.each_value do |validator|
          update = updates.values.find { |u| u.to_invoke == validator[:update_method] }
          unless update
            raise "Unable to find update #{validator[:update_method]} pointed to by " \
                  "validator on #{validator[:method_name]}"
          end
          if instance_method(validator[:method_name])&.parameters !=
             instance_method(validator[:update_method])&.parameters
            raise "Validator on #{validator[:method_name]} does not have " \
                  "exact parameter signature of #{validator[:update_method]}"
          end

          updates[update.name] = update._with_validator_to_invoke(validator[:method_name])
        end

        # If there is a superclass, apply some values and check others
        override_name = @workflow_name
        dynamic = @workflow_dynamic
        init = @workflow_init
        raw_args = @workflow_raw_args
        signals = @workflow_signals || {}
        queries = @workflow_queries || {}
        if superclass && superclass != Temporalio::Workflow::Definition
          # @type var super_info: Temporalio::Workflow::Definition::Info
          super_info = superclass._workflow_definition # steep:ignore

          # Override values if not set here
          override_name = super_info.override_name if override_name.nil?
          dynamic = super_info.dynamic if dynamic.nil?
          init = super_info.init if init.nil?
          raw_args = super_info.raw_args if raw_args.nil?

          # Make sure handlers on the same method at least have the same name
          # TODO(cretz): Need to validate any other handler override details?
          # Probably not because we only care that caller-needed values remain
          # unchanged (method and name), implementer-needed values can be
          # overridden/changed.
          self_handlers = signals.values + queries.values + updates.values
          super_handlers = super_info.signals.values + super_info.queries.values + super_info.updates.values
          super_handlers.each do |super_handler|
            self_handler = self_handlers.find { |h| h.to_invoke == super_handler.to_invoke }
            next unless self_handler

            if super_handler.class != self_handler.class
              raise "Superclass handler on #{self_handler.to_invoke} is a #{super_handler.class} " \
                    "but current class expects #{self_handler.class}"
            end
            if super_handler.name != self_handler.name
              raise "Superclass handler on #{self_handler.to_invoke} has name #{super_handler.name} " \
                    "but current class expects #{self_handler.name}"
            end
          end

          # Merge handlers. We will merge such that handlers defined here
          # override ones from superclass by _name_ (not method to invoke).
          signals = super_info.signals.merge(signals)
          queries = super_info.queries.merge(queries)
          updates = super_info.updates.merge(updates)
        end

        # If init is true, validate initialize and execute signatures are identical
        if init && instance_method(:initialize)&.parameters&.size != instance_method(:execute)&.parameters&.size
          raise 'workflow_init present, so parameter count of initialize and execute must be the same'
        end

        raise 'Workflow cannot be given a name and be dynamic' if dynamic && override_name

        Info.new(
          workflow_class: self,
          override_name:,
          dynamic: dynamic || false,
          init: init || false,
          raw_args: raw_args || false,
          failure_exception_types: @workflow_failure_exception_types || [],
          signals:,
          queries:,
          updates:
        )
      end

      # Execute the workflow. This is the primary workflow method. The workflow is completed when this method completes.
      # This must be implemented by all workflows.
      def execute(*args)
        raise NotImplementedError, 'Workflow did not implement "execute"'
      end

      # Information about the workflow definition. This is usually not used directly.
      class Info
        attr_reader :workflow_class, :override_name, :dynamic, :init, :raw_args,
                    :failure_exception_types, :signals, :queries, :updates

        # Derive the workflow definition info from the class.
        #
        # @param workflow_class [Class<Definition>] Workflow class.
        # @return [Info] Built info.
        def self.from_class(workflow_class)
          unless workflow_class.is_a?(Class) && workflow_class < Definition
            raise "Workflow '#{workflow_class}' must be a class and must extend Temporalio::Workflow::Definition"
          end

          workflow_class._workflow_definition
        end

        # Create a definition info. This should usually not be used directly, but instead a class that extends
        # {Workflow::Definition} should be used.
        def initialize(
          workflow_class:,
          override_name: nil,
          dynamic: false,
          init: false,
          raw_args: false,
          failure_exception_types: [],
          signals: {},
          queries: {},
          updates: {}
        )
          @workflow_class = workflow_class
          @override_name = override_name
          @dynamic = dynamic
          @init = init
          @raw_args = raw_args
          @failure_exception_types = failure_exception_types.dup.freeze
          @signals = signals.dup.freeze
          @queries = queries.dup.freeze
          @updates = updates.dup.freeze
        end

        # @return [String] Workflow name.
        def name
          dynamic ? nil : (override_name || workflow_class.name.to_s.split('::').last)
        end
      end

      # A signal definition. This is usually built as a result of a {Definition.workflow_signal} method, but can be
      # manually created to set at runtime on {Workflow.signal_handlers}.
      class Signal
        attr_reader :name, :to_invoke, :raw_args, :unfinished_policy

        # @!visibility private
        def self._name_from_parameter(signal)
          case signal
          when Workflow::Definition::Signal
            signal.name || raise(ArgumentError, 'Cannot call dynamic signal directly')
          when String, Symbol
            signal.to_s
          else
            raise ArgumentError, 'Signal is not a definition or string/symbol'
          end
        end

        # Create a signal definition manually. See {Definition.workflow_signal} for more details on some of the
        # parameters.
        #
        # @param name [String, nil] Name or nil if dynamic.
        # @param to_invoke [Symbol, Proc] Method name or proc to invoke.
        # @param raw_args [Boolean] Whether the parameters should be raw values.
        # @param unfinished_policy [HandlerUnfinishedPolicy] How the workflow reacts when this handler is still running
        #   on workflow completion.
        def initialize(
          name:,
          to_invoke:,
          raw_args: false,
          unfinished_policy: HandlerUnfinishedPolicy::WARN_AND_ABANDON
        )
          @name = name
          @to_invoke = to_invoke
          @raw_args = raw_args
          @unfinished_policy = unfinished_policy
        end
      end

      # A query definition. This is usually built as a result of a {Definition.workflow_query} method, but can be
      # manually created to set at runtime on {Workflow.query_handlers}.
      class Query
        attr_reader :name, :to_invoke, :raw_args

        # @!visibility private
        def self._name_from_parameter(query)
          case query
          when Workflow::Definition::Query
            query.name || raise(ArgumentError, 'Cannot call dynamic query directly')
          when String, Symbol
            query.to_s
          else
            raise ArgumentError, 'Query is not a definition or string/symbol'
          end
        end

        # Create a query definition manually. See {Definition.workflow_query} for more details on some of the
        # parameters.
        #
        # @param name [String, nil] Name or nil if dynamic.
        # @param to_invoke [Symbol, Proc] Method name or proc to invoke.
        # @param raw_args [Boolean] Whether the parameters should be raw values.
        def initialize(
          name:,
          to_invoke:,
          raw_args: false
        )
          @name = name
          @to_invoke = to_invoke
          @raw_args = raw_args
        end
      end

      # An update definition. This is usually built as a result of a {Definition.workflow_update} method, but can be
      # manually created to set at runtime on {Workflow.update_handlers}.
      class Update
        attr_reader :name, :to_invoke, :raw_args, :unfinished_policy, :validator_to_invoke

        # @!visibility private
        def self._name_from_parameter(update)
          case update
          when Workflow::Definition::Update
            update.name || raise(ArgumentError, 'Cannot call dynamic update directly')
          when String, Symbol
            update.to_s
          else
            raise ArgumentError, 'Update is not a definition or string/symbol'
          end
        end

        # Create an update definition manually. See {Definition.workflow_update} for more details on some of the
        # parameters.
        #
        # @param name [String, nil] Name or nil if dynamic.
        # @param to_invoke [Symbol, Proc] Method name or proc to invoke.
        # @param raw_args [Boolean] Whether the parameters should be raw values.
        # @param unfinished_policy [HandlerUnfinishedPolicy] How the workflow reacts when this handler is still running
        #   on workflow completion.
        # @param validator_to_invoke [Symbol, Proc, nil] Method name or proc validator to invoke.
        def initialize(
          name:,
          to_invoke:,
          raw_args: false,
          unfinished_policy: HandlerUnfinishedPolicy::WARN_AND_ABANDON,
          validator_to_invoke: nil
        )
          @name = name
          @to_invoke = to_invoke
          @raw_args = raw_args
          @unfinished_policy = unfinished_policy
          @validator_to_invoke = validator_to_invoke
        end

        # @!visibility private
        def _with_validator_to_invoke(validator_to_invoke)
          Update.new(
            name:,
            to_invoke:,
            raw_args:,
            unfinished_policy:,
            validator_to_invoke:
          )
        end
      end
    end
  end
end