class Temporalio::Internal::Worker::WorkflowInstance::InboundImplementation

Root implementation of the inbound interceptor.

def execute(input)

def execute(input)
  @instance.instance.execute(*input.args)
end

def handle_query(input)

def handle_query(input)
  invoke_handler(input.query, input)
end

def handle_signal(input)

def handle_signal(input)
  invoke_handler(input.signal, input)
end

def handle_update(input)

def handle_update(input)
  invoke_handler(input.update, input)
end

def init(outbound)

def init(outbound)
  @instance.context._outbound = outbound
end

def initialize(instance)

def initialize(instance)
  super(nil) # steep:ignore
  @instance = instance
end

def invoke_handler(name, input, to_invoke: input.definition.to_invoke)

def invoke_handler(name, input, to_invoke: input.definition.to_invoke)
  args = input.args
  # Add name as first param if dynamic
  args = [name] + args if input.definition.name.nil?
  # Assume symbol or proc
  case to_invoke
  when Symbol
    @instance.instance.send(to_invoke, *args)
  when Proc
    to_invoke.call(*args)
  else
    raise "Unrecognized invocation type #{to_invoke.class}"
  end
end

def validate_update(input)

def validate_update(input)
  invoke_handler(input.update, input, to_invoke: input.definition.validator_to_invoke)
end