class Temporalio::Internal::Worker::WorkflowInstance
be cached by the worker for sticky execution.
Instance of a user workflow. This is the instance with all state needed to run the workflow and is expected to
def self.new_completion_with_failure(run_id:, error:, failure_converter:, payload_converter:)
def self.new_completion_with_failure(run_id:, error:, failure_converter:, payload_converter:) Bridge::Api::WorkflowCompletion::WorkflowActivationCompletion.new( run_id: run_id, failed: Bridge::Api::WorkflowCompletion::Failure.new( failure: begin failure_converter.to_failure(error, payload_converter) rescue Exception => e # rubocop:disable Lint/RescueException Api::Failure::V1::Failure.new( message: "Failed converting error to failure: #{e.message}, " \ "original error message: #{error.message}", application_failure_info: Api::Failure::V1::ApplicationFailureInfo.new ) end ) ) end
def activate(activation)
def activate(activation) # Run inside of scheduler run_in_scheduler { activate_internal(activation) } end
def activate_internal(activation)
def activate_internal(activation) # Reset some activation state @commands = [] @current_activation_error = nil @continue_as_new_suggested = activation.continue_as_new_suggested @current_deployment_version = WorkerDeploymentVersion._from_bridge( activation.deployment_version_for_current_task ) @current_history_length = activation.history_length @current_history_size = activation.history_size_bytes @replaying = activation.is_replaying @now_timestamp = activation.timestamp # Apply jobs and run event loop begin # Create instance if it doesn't already exist @instance ||= with_context_frozen { create_instance } # Apply jobs activation.jobs.each { |job| apply(job) } # Schedule primary 'execute' if not already running (i.e. this is # the first activation) @primary_fiber ||= schedule(top_level: true) { run_workflow } # Run the event loop @scheduler.run_until_all_yielded rescue Exception => e # rubocop:disable Lint/RescueException on_top_level_exception(e) end # If we are not replaying and workflow is complete but not a # failure (i.e. success, continue as new, or cancel), we warn for # any unfinished handlers. if !@replaying && @commands.any? do |c| !c.complete_workflow_execution.nil? || !c.continue_as_new_workflow_execution.nil? || !c.cancel_workflow_execution.nil? end warn_on_any_unfinished_handlers end # Return success or failure if @current_activation_error @logger.replay_safety_disabled do @logger.warn('Failed activation') @logger.warn(@current_activation_error) end WorkflowInstance.new_completion_with_failure( run_id: activation.run_id, error: @current_activation_error, failure_converter: @failure_converter, payload_converter: @payload_converter ) else Bridge::Api::WorkflowCompletion::WorkflowActivationCompletion.new( run_id: activation.run_id, successful: Bridge::Api::WorkflowCompletion::Success.new( commands: @commands, versioning_behavior: @definition_options.versioning_behavior ) ) end ensure @commands = nil @current_activation_error = nil end
def add_command(command)
def add_command(command) raise Workflow::InvalidWorkflowStateError, 'Cannot add commands in this context' if @context_frozen @commands << command end
def apply(job)
def apply(job) case job.variant when :initialize_workflow # Ignore when :fire_timer pending_timers.delete(job.fire_timer.seq)&.resume when :update_random_seed @random = illegal_call_tracing_disabled { Random.new(job.update_random_seed.randomness_seed) } when :query_workflow apply_query(job.query_workflow) when :cancel_workflow # TODO(cretz): Use the details somehow? @cancellation_proc.call(reason: 'Workflow canceled') when :signal_workflow apply_signal(job.signal_workflow) when :resolve_activity pending_activities.delete(job.resolve_activity.seq)&.resume(job.resolve_activity.result) when :notify_has_patch @patches_notified << job.notify_has_patch.patch_id when :resolve_child_workflow_execution_start pending_child_workflow_starts.delete(job.resolve_child_workflow_execution_start.seq)&.resume( job.resolve_child_workflow_execution_start ) when :resolve_child_workflow_execution pending_child_workflows.delete(job.resolve_child_workflow_execution.seq)&._resolve( job.resolve_child_workflow_execution.result ) when :resolve_signal_external_workflow pending_external_signals.delete(job.resolve_signal_external_workflow.seq)&.resume( job.resolve_signal_external_workflow ) when :resolve_request_cancel_external_workflow pending_external_cancels.delete(job.resolve_request_cancel_external_workflow.seq)&.resume( job.resolve_request_cancel_external_workflow ) when :do_update apply_update(job.do_update) else raise "Unrecognized activation job variant: #{job.variant}" end end
def apply_query(job)
def apply_query(job) result_hint = nil schedule do # If it's a built-in, run it without interceptors, otherwise do normal behavior result = if job.query_type == '__stack_trace' # Use raw value built from default converter because we don't want to use user-conversion Converters::RawValue.new(Converters::PayloadConverter.default.to_payload(scheduler.stack_trace)) elsif job.query_type == '__temporal_workflow_metadata' # Use raw value built from default converter because we don't want to use user-conversion Converters::RawValue.new(Converters::PayloadConverter.default.to_payload(workflow_metadata)) else # Get query definition, falling back to dynamic if not present and not reserved defn = query_handlers[job.query_type] defn = query_handlers[nil] if !defn && !Internal::ProtoUtils.reserved_name?(job.query_type) unless defn raise "Query handler for #{job.query_type} expected but not found, " \ "known queries: [#{query_handlers.keys.compact.sort.join(', ')}]" end result_hint = defn.result_hint with_context_frozen do @inbound.handle_query( Temporalio::Worker::Interceptor::Workflow::HandleQueryInput.new( id: job.query_id, query: job.query_type, args: begin convert_handler_args(payload_array: job.arguments, defn:) rescue StandardError => e raise "Failed converting query input arguments: #{e}" end, definition: defn, headers: ProtoUtils.headers_from_proto_map(job.headers, @payload_converter) || {} ) ) end end add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( respond_to_query: Bridge::Api::WorkflowCommands::QueryResult.new( query_id: job.query_id, succeeded: Bridge::Api::WorkflowCommands::QuerySuccess.new( response: @payload_converter.to_payload(result, hint: result_hint) ) ) ) ) rescue Exception => e # rubocop:disable Lint/RescueException add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( respond_to_query: Bridge::Api::WorkflowCommands::QueryResult.new( query_id: job.query_id, failed: @failure_converter.to_failure(e, @payload_converter) ) ) ) end end
def apply_signal(job)
def apply_signal(job) # Get signal definition, falling back to dynamic if not present and not reserved defn = signal_handlers[job.signal_name] defn = signal_handlers[nil] if !defn && !Internal::ProtoUtils.reserved_name?(job.signal_name) handler_exec = if defn HandlerExecution.new(name: job.signal_name, update_id: nil, unfinished_policy: defn.unfinished_policy) end # Process as a top level handler so that errors are treated as if in primary workflow method schedule(top_level: true, handler_exec:) do # Send to interceptor if there is a definition, buffer otherwise if defn @inbound.handle_signal( Temporalio::Worker::Interceptor::Workflow::HandleSignalInput.new( signal: job.signal_name, args: begin convert_handler_args(payload_array: job.input, defn:) rescue StandardError => e # Signals argument conversion failure must not fail task @logger.error("Failed converting signal input arguments for #{job.signal_name}, dropping signal") @logger.error(e) next end, definition: defn, headers: ProtoUtils.headers_from_proto_map(job.headers, @payload_converter) || {} ) ) else buffered = @buffered_signals[job.signal_name] buffered = @buffered_signals[job.signal_name] = [] if buffered.nil? buffered << job end end end
def apply_update(job)
def apply_update(job) # Get update definition, falling back to dynamic if not present and not reserved defn = update_handlers[job.name] defn = update_handlers[nil] if !defn && !Internal::ProtoUtils.reserved_name?(job.name) handler_exec = (HandlerExecution.new(name: job.name, update_id: job.id, unfinished_policy: defn.unfinished_policy) if defn) schedule(handler_exec:) do # Until this is accepted, all errors are rejections accepted = false # Set update info Fiber[:__temporal_update_info] = Workflow::UpdateInfo.new(id: job.id, name: job.name).freeze # Reject if not present unless defn raise "Update handler for #{job.name} expected but not found, " \ "known updates: [#{update_handlers.keys.compact.sort.join(', ')}]" end # To match other SDKs, we are only calling the validation interceptor if there is a validator. Also to match # other SDKs, we are re-converting the args between validate and update to disallow user mutation in # validator/interceptor. if job.run_validator && defn.validator_to_invoke with_context_frozen do @inbound.validate_update( Temporalio::Worker::Interceptor::Workflow::HandleUpdateInput.new( id: job.id, update: job.name, args: begin convert_handler_args(payload_array: job.input, defn:) rescue StandardError => e raise "Failed converting update input arguments: #{e}" end, definition: defn, headers: ProtoUtils.headers_from_proto_map(job.headers, @payload_converter) || {} ) ) end end # We build the input before marking accepted so the exception can reject instead of fail task input = Temporalio::Worker::Interceptor::Workflow::HandleUpdateInput.new( id: job.id, update: job.name, args: begin convert_handler_args(payload_array: job.input, defn:) rescue StandardError => e raise "Failed converting update input arguments: #{e}" end, definition: defn, headers: ProtoUtils.headers_from_proto_map(job.headers, @payload_converter) || {} ) # Accept add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( update_response: Bridge::Api::WorkflowCommands::UpdateResponse.new( protocol_instance_id: job.protocol_instance_id, accepted: Google::Protobuf::Empty.new ) ) ) accepted = true # Issue update result = @inbound.handle_update(input) add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( update_response: Bridge::Api::WorkflowCommands::UpdateResponse.new( protocol_instance_id: job.protocol_instance_id, completed: @payload_converter.to_payload(result, hint: defn.result_hint) ) ) ) rescue Exception => e # rubocop:disable Lint/RescueException # Re-raise to cause task failure if this is accepted but this is not a failure exception raise if accepted && !failure_exception?(e) # Reject add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( update_response: Bridge::Api::WorkflowCommands::UpdateResponse.new( protocol_instance_id: job.protocol_instance_id, rejected: @failure_converter.to_failure(e, @payload_converter) ) ) ) end end
def convert_args(payload_array:, method_name:, raw_args:, arg_hints:, ignore_first_param: false)
def convert_args(payload_array:, method_name:, raw_args:, arg_hints:, ignore_first_param: false) # Just in case it is not an array payload_array = payload_array.to_ary # We want to discard extra arguments if we can. If there is a method # name, try to look it up. Then, assuming there's no :rest, trim args # to the amount of :req or :opt there are. if method_name && @definition.workflow_class.method_defined?(method_name) count = 0 req_count = 0 @definition.workflow_class.instance_method(method_name).parameters.each do |(type, _)| if type == :rest count = nil break elsif %i[req opt].include?(type) count += 1 req_count += 1 if type == :req end end # Fail if too few required param values, trim off excess if too many. If count is nil, it has a splat. if count if ignore_first_param count -= 1 req_count -= 1 end if req_count > payload_array.size # We have to fail here instead of let Ruby fail the invocation because some handlers, such as signals, # want to log and ignore invalid arguments instead of fail and if we used Ruby failure, we can't # differentiate between too-few-param caused by us or somewhere else by a user. raise ArgumentError, "wrong number of required arguments for #{method_name} " \ "(given #{payload_array.size}, expected #{req_count})" end payload_array = payload_array.take(count) end end # Convert if raw_args payload_array.map { |p| Converters::RawValue.new(p) } else ProtoUtils.convert_from_payload_array(@payload_converter, payload_array, hints: arg_hints) end end
def convert_handler_args(payload_array:, defn:)
def convert_handler_args(payload_array:, defn:) convert_args( payload_array:, method_name: defn.to_invoke.is_a?(Symbol) ? defn.to_invoke : nil, raw_args: defn.raw_args, arg_hints: defn.arg_hints, ignore_first_param: defn.name.nil? # Dynamic ) end
def create_instance
def create_instance # Convert workflow arguments @workflow_arguments = convert_args(payload_array: @init_job.arguments, method_name: :execute, raw_args: @definition.raw_args, arg_hints: @definition.arg_hints) # Initialize interceptors @inbound = @interceptors.reverse_each.reduce(InboundImplementation.new(self)) do |acc, int| int.intercept_workflow(acc) end @inbound.init(OutboundImplementation.new(self)) # Create the user instance instance = if @definition.init @definition.workflow_class.new(*@workflow_arguments) else @definition.workflow_class.new end # Run Dynamic config getter if @definition.dynamic_options_method dynamic_options = instance.send(@definition.dynamic_options_method) if dynamic_options&.versioning_behavior @definition_options.versioning_behavior = dynamic_options.versioning_behavior end if dynamic_options&.failure_exception_types @definition_options.failure_exception_types = dynamic_options.failure_exception_types end end instance end
def failure_exception?(err)
def failure_exception?(err) err.is_a?(Error::Failure) || err.is_a?(Timeout::Error) || @workflow_failure_exception_types&.any? { |cls| err.is_a?(cls) } || @definition_options.failure_exception_types&.any? { |cls| err.is_a?(cls) } end
def illegal_call_tracing_disabled(&)
def illegal_call_tracing_disabled(&) @tracer.disable(&) end
def initialize(details)
def initialize(details) # Initialize general state @context = Context.new(self) if details.illegal_calls && !details.illegal_calls.empty? @tracer = IllegalCallTracer.new(details.illegal_calls) end @logger = ReplaySafeLogger.new(logger: details.logger, instance: self) @logger.scoped_values_getter = proc { scoped_logger_info } @runtime_metric_meter = details.metric_meter @io_enabled = details.unsafe_workflow_io_enabled @scheduler = Scheduler.new(self) @payload_converter = details.payload_converter @failure_converter = details.failure_converter @disable_eager_activity_execution = details.disable_eager_activity_execution @pending_activities = {} # Keyed by sequence, value is fiber to resume with proto result @pending_timers = {} # Keyed by sequence, value is fiber to resume with proto result @pending_child_workflow_starts = {} # Keyed by sequence, value is fiber to resume with proto result @pending_child_workflows = {} # Keyed by sequence, value is ChildWorkflowHandle to resolve with proto result @pending_external_signals = {} # Keyed by sequence, value is fiber to resume with proto result @pending_external_cancels = {} # Keyed by sequence, value is fiber to resume with proto result @buffered_signals = {} # Keyed by signal name, value is array of signal jobs # TODO(cretz): Should these be sets instead? Both should be fairly low counts. @in_progress_handlers = [] # Value is HandlerExecution @patches_notified = [] @definition = details.definition @interceptors = details.interceptors @cancellation, @cancellation_proc = Cancellation.new @continue_as_new_suggested = false @current_history_length = 0 @current_history_size = 0 @replaying = false @workflow_failure_exception_types = details.workflow_failure_exception_types @signal_handlers = HandlerHash.new( details.definition.signals, Workflow::Definition::Signal ) do |defn| # New definition, drain buffer. If it's dynamic (i.e. no name) drain them all. to_drain = if defn.name.nil? all_signals = @buffered_signals.values.flatten @buffered_signals.clear all_signals else @buffered_signals.delete(defn.name) end to_drain&.each { |job| apply_signal(job) } end @query_handlers = HandlerHash.new(details.definition.queries, Workflow::Definition::Query) @update_handlers = HandlerHash.new(details.definition.updates, Workflow::Definition::Update) @definition_options = Workflow::DefinitionOptions.new( failure_exception_types: details.definition.failure_exception_types, versioning_behavior: details.definition.versioning_behavior ) @assert_valid_local_activity = details.assert_valid_local_activity # Create all things needed from initial job @init_job = details.initial_activation.jobs.find { |j| !j.initialize_workflow.nil? }&.initialize_workflow raise 'Missing init job from first activation' unless @init_job illegal_call_tracing_disabled do @info = Workflow::Info.new( attempt: @init_job.attempt, continued_run_id: ProtoUtils.string_or(@init_job.continued_from_execution_run_id), cron_schedule: ProtoUtils.string_or(@init_job.cron_schedule), execution_timeout: ProtoUtils.duration_to_seconds(@init_job.workflow_execution_timeout), headers: ProtoUtils.headers_from_proto_map(@init_job.headers, @payload_converter) || {}, last_failure: if @init_job.continued_failure @failure_converter.from_failure(@init_job.continued_failure, @payload_converter) end, last_result: if @init_job.last_completion_result @payload_converter.from_payloads(@init_job.last_completion_result).first end, namespace: details.namespace, parent: if @init_job.parent_workflow_info Workflow::Info::ParentInfo.new( namespace: @init_job.parent_workflow_info.namespace, run_id: @init_job.parent_workflow_info.run_id, workflow_id: @init_job.parent_workflow_info.workflow_id ) end, priority: Priority._from_proto(@init_job.priority), retry_policy: (RetryPolicy._from_proto(@init_job.retry_policy) if @init_job.retry_policy), root: if @init_job.root_workflow Workflow::Info::RootInfo.new( run_id: @init_job.root_workflow.run_id, workflow_id: @init_job.root_workflow.workflow_id ) end, run_id: details.initial_activation.run_id, run_timeout: ProtoUtils.duration_to_seconds(@init_job.workflow_run_timeout), start_time: ProtoUtils.timestamp_to_time(@init_job.start_time) || raise, task_queue: details.task_queue, task_timeout: ProtoUtils.duration_to_seconds(@init_job.workflow_task_timeout) || raise, workflow_id: @init_job.workflow_id, workflow_type: @init_job.workflow_type ).freeze @random = Random.new(@init_job.randomness_seed) end end
def instance
def instance @instance or raise 'Instance accessed before created' end
def memo
def memo # Lazy on first access @memo ||= ExternallyImmutableHash.new(ProtoUtils.memo_from_proto(@init_job.memo, payload_converter) || {}) end
def metric_meter
def metric_meter @metric_meter ||= ReplaySafeMetric::Meter.new( @runtime_metric_meter.with_additional_attributes( { namespace: info.namespace, task_queue: info.task_queue, workflow_type: info.workflow_type } ) ) end
def now
def now # Create each time ProtoUtils.timestamp_to_time(@now_timestamp) or raise 'Time unexpectedly not present' end
def on_top_level_exception(err)
def on_top_level_exception(err) if err.is_a?(Workflow::ContinueAsNewError) @logger.debug('Workflow requested continue as new') workflow_type, defn_arg_hints, = if err.workflow Workflow::Definition._workflow_type_and_hints_from_workflow_parameter(err.workflow) else [nil, @definition.arg_hints, nil] end add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( continue_as_new_workflow_execution: Bridge::Api::WorkflowCommands::ContinueAsNewWorkflowExecution.new( workflow_type:, task_queue: err.task_queue, arguments: ProtoUtils.convert_to_payload_array(payload_converter, err.args, hints: err.arg_hints || defn_arg_hints), workflow_run_timeout: ProtoUtils.seconds_to_duration(err.run_timeout), workflow_task_timeout: ProtoUtils.seconds_to_duration(err.task_timeout), memo: ProtoUtils.memo_to_proto_hash(err.memo, payload_converter), headers: ProtoUtils.headers_to_proto_hash(err.headers, payload_converter), search_attributes: err.search_attributes&._to_proto, retry_policy: err.retry_policy&._to_proto ) ) ) elsif @cancellation.canceled? && Error.canceled?(err) # If cancel was ever requested and this is a cancellation or an activity/child cancellation, we add a # cancel command. Technically this means that a swallowed cancel followed by, say, an activity cancel # later on will show the workflow as cancelled. But this is a Temporal limitation in that cancellation is # a state not an event. @logger.debug('Workflow requested to cancel and properly raised cancel') @logger.debug(err) add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( cancel_workflow_execution: Bridge::Api::WorkflowCommands::CancelWorkflowExecution.new ) ) elsif failure_exception?(err) @logger.debug('Workflow raised failure') @logger.debug(err) add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( fail_workflow_execution: Bridge::Api::WorkflowCommands::FailWorkflowExecution.new( failure: @failure_converter.to_failure(err, @payload_converter) ) ) ) else @current_activation_error ||= err end end
def patch(patch_id:, deprecated:)
def patch(patch_id:, deprecated:) # Use memoized result if present. If this is being deprecated, we can still use memoized result and skip the # command. patch_id = patch_id.to_s @patches_memoized ||= {} @patches_memoized.fetch(patch_id) do patched = !replaying || @patches_notified.include?(patch_id) @patches_memoized[patch_id] = patched if patched add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( set_patch_marker: Bridge::Api::WorkflowCommands::SetPatchMarker.new(patch_id:, deprecated:) ) ) end patched end end
def run_in_scheduler(&)
def run_in_scheduler(&) Fiber.set_scheduler(@scheduler) if @tracer @tracer.enable(&) else yield end ensure Fiber.set_scheduler(nil) end
def run_workflow
def run_workflow result = @inbound.execute( Temporalio::Worker::Interceptor::Workflow::ExecuteInput.new( args: @workflow_arguments, headers: @info.headers ) ) add_command( Bridge::Api::WorkflowCommands::WorkflowCommand.new( complete_workflow_execution: Bridge::Api::WorkflowCommands::CompleteWorkflowExecution.new( result: @payload_converter.to_payload(result, hint: @definition.result_hint) ) ) ) end
def schedule(
def schedule( top_level: false, handler_exec: nil, & ) in_progress_handlers << handler_exec if handler_exec Fiber.schedule do yield rescue Exception => e # rubocop:disable Lint/RescueException if top_level on_top_level_exception(e) else @current_activation_error ||= e end ensure in_progress_handlers.delete(handler_exec) if handler_exec end end
def scoped_logger_info
def scoped_logger_info @scoped_logger_info ||= { attempt: info.attempt, namespace: info.namespace, run_id: info.run_id, task_queue: info.task_queue, workflow_id: info.workflow_id, workflow_type: info.workflow_type } # Append update info if there is any update_info = Fiber[:__temporal_update_info] return @scoped_logger_info unless update_info @scoped_logger_info.merge({ update_id: update_info.id, update_name: update_info.name }) end
def search_attributes
def search_attributes # Lazy on first access @search_attributes ||= SearchAttributes._from_proto( @init_job.search_attributes, disable_mutations: true, never_nil: true ) || raise end
def warn_on_any_unfinished_handlers
def warn_on_any_unfinished_handlers updates, signals = in_progress_handlers.select do |h| h.unfinished_policy == Workflow::HandlerUnfinishedPolicy::WARN_AND_ABANDON end.partition(&:update_id) unless updates.empty? updates_str = JSON.generate(updates.map { |u| { name: u.name, id: u.update_id } }) warn( "[TMPRL1102] Workflow #{info.workflow_id} finished while update handlers are still running. This may " \ 'have interrupted work that the update handler was doing, and the client that sent the update will ' \ "receive a 'workflow execution already completed' RPCError instead of the update result. You can wait " \ 'for all update and signal handlers to complete by using ' \ '`Temporalio::Workflow.wait_condition { Temporalio::Workflow.handlers_finished? }`. ' \ 'Alternatively, if both you and the clients sending the update are okay with interrupting running ' \ 'handlers when the workflow finishes, and causing clients to receive errors, then you can disable this ' \ 'warning via the update handler definition: ' \ '`workflow_update unfinished_policy: Temporalio::Workflow::HandlerUnfinishedPolicy.ABANDON`. ' \ "The following updates were unfinished (and warnings were not disabled for their handler): #{updates_str}" ) end return if signals.empty? signals_str = JSON.generate(signals.group_by(&:name) .transform_values(&:size).sort_by { |_, v| -v }.map { |name, count| { name:, count: } }) warn( "[TMPRL1102] Workflow #{info.workflow_id} finished while signal handlers are still running. This may " \ 'have interrupted work that the signal handler was doing. You can wait for all update and signal ' \ 'handlers to complete by using ' \ '`Temporalio::Workflow.wait_condition { Temporalio::Workflow.handlers_finished? }`. ' \ 'Alternatively, if both you and the clients sending the signal are okay with interrupting running ' \ 'handlers when the workflow finishes, then you can disable this warning via the signal handler ' \ 'definition: ' \ '`workflow_signal unfinished_policy: Temporalio::Workflow::HandlerUnfinishedPolicy.ABANDON`. ' \ "The following signals were unfinished (and warnings were not disabled for their handler): #{signals_str}" ) end
def with_context_frozen(&)
def with_context_frozen(&) @context_frozen = true yield ensure @context_frozen = false end
def workflow_metadata
def workflow_metadata Temporalio::Api::Sdk::V1::WorkflowMetadata.new( definition: Temporalio::Api::Sdk::V1::WorkflowDefinition.new( type: info.workflow_type, query_definitions: query_handlers.values.map do |defn| Temporalio::Api::Sdk::V1::WorkflowInteractionDefinition.new( name: defn.name || '', description: defn.description || '' ) end, signal_definitions: signal_handlers.values.map do |defn| Temporalio::Api::Sdk::V1::WorkflowInteractionDefinition.new( name: defn.name || '', description: defn.description || '' ) end, update_definitions: update_handlers.values.map do |defn| Temporalio::Api::Sdk::V1::WorkflowInteractionDefinition.new( name: defn.name || '', description: defn.description || '' ) end ), current_details: current_details || '' ) end