lib/temporalio/internal/client/implementation.rb



# frozen_string_literal: true

require 'google/protobuf/well_known_types'
require 'securerandom'
require 'temporalio/api'
require 'temporalio/client/activity_id_reference'
require 'temporalio/client/async_activity_handle'
require 'temporalio/client/connection'
require 'temporalio/client/interceptor'
require 'temporalio/client/schedule'
require 'temporalio/client/schedule_handle'
require 'temporalio/client/workflow_execution'
require 'temporalio/client/workflow_execution_count'
require 'temporalio/client/workflow_handle'
require 'temporalio/common_enums'
require 'temporalio/converters'
require 'temporalio/error'
require 'temporalio/error/failure'
require 'temporalio/internal/proto_utils'
require 'temporalio/runtime'
require 'temporalio/search_attributes'
require 'temporalio/workflow/definition'

module Temporalio
  module Internal
    module Client
      class Implementation < Temporalio::Client::Interceptor::Outbound
        def self.with_default_rpc_options(user_rpc_options)
          # If the user did not provide an override_retry, we need to make sure
          # we use an option set that has it as "true"
          if user_rpc_options.nil?
            user_rpc_options = @always_retry_options ||= Temporalio::Client::RPCOptions.new(override_retry: true)
          elsif !user_rpc_options.is_a?(Temporalio::Client::RPCOptions)
            raise ArgumentError, 'rpc_options must be RPCOptions'
          elsif user_rpc_options.override_retry.nil?
            # Copy and set as true
            user_rpc_options = user_rpc_options.dup
            user_rpc_options.override_retry = true
          end
          user_rpc_options
        end

        def initialize(client)
          super(nil)
          @client = client
        end

        def start_workflow(input)
          # TODO(cretz): Signal/update with start
          req = Api::WorkflowService::V1::StartWorkflowExecutionRequest.new(
            request_id: SecureRandom.uuid,
            namespace: @client.namespace,
            workflow_type: Api::Common::V1::WorkflowType.new(
              name: Workflow::Definition._workflow_type_from_workflow_parameter(input.workflow)
            ),
            workflow_id: input.workflow_id,
            task_queue: Api::TaskQueue::V1::TaskQueue.new(name: input.task_queue.to_s),
            input: @client.data_converter.to_payloads(input.args),
            workflow_execution_timeout: ProtoUtils.seconds_to_duration(input.execution_timeout),
            workflow_run_timeout: ProtoUtils.seconds_to_duration(input.run_timeout),
            workflow_task_timeout: ProtoUtils.seconds_to_duration(input.task_timeout),
            identity: @client.connection.identity,
            workflow_id_reuse_policy: input.id_reuse_policy,
            workflow_id_conflict_policy: input.id_conflict_policy,
            retry_policy: input.retry_policy&._to_proto,
            cron_schedule: input.cron_schedule,
            memo: ProtoUtils.memo_to_proto(input.memo, @client.data_converter),
            search_attributes: input.search_attributes&._to_proto,
            workflow_start_delay: ProtoUtils.seconds_to_duration(input.start_delay),
            request_eager_execution: input.request_eager_start,
            header: ProtoUtils.headers_to_proto(input.headers, @client.data_converter)
          )

          # Send request
          begin
            resp = @client.workflow_service.start_workflow_execution(
              req,
              rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
            )
          rescue Error::RPCError => e
            # Unpack and raise already started if that's the error, otherwise default raise
            if e.code == Error::RPCError::Code::ALREADY_EXISTS && e.grpc_status.details.first
              details = e.grpc_status.details.first.unpack(
                Api::ErrorDetails::V1::WorkflowExecutionAlreadyStartedFailure
              )
              if details
                raise Error::WorkflowAlreadyStartedError.new(
                  workflow_id: req.workflow_id,
                  workflow_type: req.workflow_type.name,
                  run_id: details.run_id
                )
              end
            end
            raise
          end

          # Return handle
          Temporalio::Client::WorkflowHandle.new(
            client: @client,
            id: input.workflow_id,
            run_id: nil,
            result_run_id: resp.run_id,
            first_execution_run_id: resp.run_id
          )
        end

        def list_workflows(input)
          Enumerator.new do |yielder|
            req = Api::WorkflowService::V1::ListWorkflowExecutionsRequest.new(
              namespace: @client.namespace,
              query: input.query || ''
            )
            loop do
              resp = @client.workflow_service.list_workflow_executions(
                req,
                rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
              )
              resp.executions.each do |raw_info|
                yielder << Temporalio::Client::WorkflowExecution.new(raw_info, @client.data_converter)
              end
              break if resp.next_page_token.empty?

              req.next_page_token = resp.next_page_token
            end
          end
        end

        def count_workflows(input)
          resp = @client.workflow_service.count_workflow_executions(
            Api::WorkflowService::V1::CountWorkflowExecutionsRequest.new(
              namespace: @client.namespace,
              query: input.query || ''
            ),
            rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
          )
          Temporalio::Client::WorkflowExecutionCount.new(
            resp.count,
            resp.groups.map do |group|
              Temporalio::Client::WorkflowExecutionCount::AggregationGroup.new(
                group.count,
                group.group_values.map { |payload| SearchAttributes._value_from_payload(payload) }
              )
            end
          )
        end

        def describe_workflow(input)
          resp = @client.workflow_service.describe_workflow_execution(
            Api::WorkflowService::V1::DescribeWorkflowExecutionRequest.new(
              namespace: @client.namespace,
              execution: Api::Common::V1::WorkflowExecution.new(
                workflow_id: input.workflow_id,
                run_id: input.run_id || ''
              )
            ),
            rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
          )
          Temporalio::Client::WorkflowExecution::Description.new(resp, @client.data_converter)
        end

        def fetch_workflow_history_events(input)
          Enumerator.new do |yielder|
            req = Api::WorkflowService::V1::GetWorkflowExecutionHistoryRequest.new(
              namespace: @client.namespace,
              execution: Api::Common::V1::WorkflowExecution.new(
                workflow_id: input.workflow_id,
                run_id: input.run_id || ''
              ),
              wait_new_event: input.wait_new_event,
              history_event_filter_type: input.event_filter_type,
              skip_archival: input.skip_archival
            )
            loop do
              resp = @client.workflow_service.get_workflow_execution_history(
                req,
                rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
              )
              resp.history&.events&.each { |event| yielder << event }
              break if resp.next_page_token.empty?

              req.next_page_token = resp.next_page_token
            end
          end
        end

        def signal_workflow(input)
          @client.workflow_service.signal_workflow_execution(
            Api::WorkflowService::V1::SignalWorkflowExecutionRequest.new(
              namespace: @client.namespace,
              workflow_execution: Api::Common::V1::WorkflowExecution.new(
                workflow_id: input.workflow_id,
                run_id: input.run_id || ''
              ),
              signal_name: Workflow::Definition::Signal._name_from_parameter(input.signal),
              input: @client.data_converter.to_payloads(input.args),
              header: Internal::ProtoUtils.headers_to_proto(input.headers, @client.data_converter),
              identity: @client.connection.identity,
              request_id: SecureRandom.uuid
            ),
            rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
          )
          nil
        end

        def query_workflow(input)
          begin
            resp = @client.workflow_service.query_workflow(
              Api::WorkflowService::V1::QueryWorkflowRequest.new(
                namespace: @client.namespace,
                execution: Api::Common::V1::WorkflowExecution.new(
                  workflow_id: input.workflow_id,
                  run_id: input.run_id || ''
                ),
                query: Api::Query::V1::WorkflowQuery.new(
                  query_type: Workflow::Definition::Query._name_from_parameter(input.query),
                  query_args: @client.data_converter.to_payloads(input.args),
                  header: Internal::ProtoUtils.headers_to_proto(input.headers, @client.data_converter)
                ),
                query_reject_condition: input.reject_condition || 0
              ),
              rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
            )
          rescue Error::RPCError => e
            # If the status is INVALID_ARGUMENT, we can assume it's a query failed
            # error
            raise Error::WorkflowQueryFailedError, e.message if e.code == Error::RPCError::Code::INVALID_ARGUMENT

            raise
          end
          unless resp.query_rejected.nil?
            raise Error::WorkflowQueryRejectedError.new(status: ProtoUtils.enum_to_int(
              Api::Enums::V1::WorkflowExecutionStatus, resp.query_rejected.status
            ))
          end

          results = @client.data_converter.from_payloads(resp.query_result)
          warn("Expected 0 or 1 query result, got #{results.size}") if results.size > 1
          results&.first
        end

        def start_workflow_update(input)
          if input.wait_for_stage == Temporalio::Client::WorkflowUpdateWaitStage::ADMITTED
            raise ArgumentError, 'ADMITTED wait stage not supported'
          end

          req = Api::WorkflowService::V1::UpdateWorkflowExecutionRequest.new(
            namespace: @client.namespace,
            workflow_execution: Api::Common::V1::WorkflowExecution.new(
              workflow_id: input.workflow_id,
              run_id: input.run_id || ''
            ),
            request: Api::Update::V1::Request.new(
              meta: Api::Update::V1::Meta.new(
                update_id: input.update_id,
                identity: @client.connection.identity
              ),
              input: Api::Update::V1::Input.new(
                name: Workflow::Definition::Update._name_from_parameter(input.update),
                args: @client.data_converter.to_payloads(input.args),
                header: Internal::ProtoUtils.headers_to_proto(input.headers, @client.data_converter)
              )
            ),
            wait_policy: Api::Update::V1::WaitPolicy.new(
              lifecycle_stage: input.wait_for_stage
            )
          )

          # Repeatedly try to invoke start until the update reaches user-provided
          # wait stage or is at least ACCEPTED (as of the time of this writing,
          # the user cannot specify sooner than ACCEPTED)
          # @type var resp: untyped
          resp = nil
          loop do
            resp = @client.workflow_service.update_workflow_execution(
              req,
              rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
            )

            # We're only done if the response stage is after the requested stage
            # or the response stage is accepted
            if resp.stage >= req.wait_policy.lifecycle_stage ||
               resp.stage >= Temporalio::Client::WorkflowUpdateWaitStage::ACCEPTED
              break
            end
          rescue Error::RPCError => e
            # Deadline exceeded or cancel is a special error type
            if e.code == Error::RPCError::Code::DEADLINE_EXCEEDED || e.code == Error::RPCError::Code::CANCELLED
              raise Error::WorkflowUpdateRPCTimeoutOrCanceledError
            end

            raise
          end

          # If the user wants to wait until completed, we must poll until outcome
          # if not already there
          if input.wait_for_stage == Temporalio::Client::WorkflowUpdateWaitStage::COMPLETED && !resp.outcome
            resp.outcome = @client._impl.poll_workflow_update(
              Temporalio::Client::Interceptor::PollWorkflowUpdateInput.new(
                workflow_id: input.workflow_id,
                run_id: input.run_id,
                update_id: input.update_id,
                rpc_options: input.rpc_options
              )
            )
          end

          Temporalio::Client::WorkflowUpdateHandle.new(
            client: @client,
            id: input.update_id,
            workflow_id: input.workflow_id,
            workflow_run_id: input.run_id,
            known_outcome: resp.outcome
          )
        end

        def poll_workflow_update(input)
          req = Api::WorkflowService::V1::PollWorkflowExecutionUpdateRequest.new(
            namespace: @client.namespace,
            update_ref: Api::Update::V1::UpdateRef.new(
              workflow_execution: Api::Common::V1::WorkflowExecution.new(
                workflow_id: input.workflow_id,
                run_id: input.run_id || ''
              ),
              update_id: input.update_id
            ),
            identity: @client.connection.identity,
            wait_policy: Api::Update::V1::WaitPolicy.new(
              lifecycle_stage: Temporalio::Client::WorkflowUpdateWaitStage::COMPLETED
            )
          )

          # Continue polling as long as we have no outcome
          loop do
            resp = @client.workflow_service.poll_workflow_execution_update(
              req,
              rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
            )
            return resp.outcome if resp.outcome
          rescue Error::RPCError => e
            # Deadline exceeded or cancel is a special error type
            if e.code == Error::RPCError::Code::DEADLINE_EXCEEDED || e.code == Error::RPCError::Code::CANCELLED
              raise Error::WorkflowUpdateRPCTimeoutOrCanceledError
            end

            raise
          end
        end

        def cancel_workflow(input)
          @client.workflow_service.request_cancel_workflow_execution(
            Api::WorkflowService::V1::RequestCancelWorkflowExecutionRequest.new(
              namespace: @client.namespace,
              workflow_execution: Api::Common::V1::WorkflowExecution.new(
                workflow_id: input.workflow_id,
                run_id: input.run_id || ''
              ),
              first_execution_run_id: input.first_execution_run_id,
              identity: @client.connection.identity,
              request_id: SecureRandom.uuid
            ),
            rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
          )
          nil
        end

        def terminate_workflow(input)
          @client.workflow_service.terminate_workflow_execution(
            Api::WorkflowService::V1::TerminateWorkflowExecutionRequest.new(
              namespace: @client.namespace,
              workflow_execution: Api::Common::V1::WorkflowExecution.new(
                workflow_id: input.workflow_id,
                run_id: input.run_id || ''
              ),
              reason: input.reason || '',
              first_execution_run_id: input.first_execution_run_id,
              details: @client.data_converter.to_payloads(input.details),
              identity: @client.connection.identity
            ),
            rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
          )
          nil
        end

        def create_schedule(input)
          if input.schedule.state.limited_actions && input.schedule.state.remaining_actions.zero?
            raise 'Must set limited actions to false if there are no remaining actions set'
          end
          if !input.schedule.state.limited_actions && !input.schedule.state.remaining_actions.zero?
            raise 'Must set limited actions to true if there are remaining actions set'
          end

          @client.workflow_service.create_schedule(
            Api::WorkflowService::V1::CreateScheduleRequest.new(
              namespace: @client.namespace,
              schedule_id: input.id,
              schedule: input.schedule._to_proto(@client.data_converter),
              initial_patch: if input.trigger_immediately || !input.backfills.empty?
                               Api::Schedule::V1::SchedulePatch.new(
                                 trigger_immediately: if input.trigger_immediately
                                                        Api::Schedule::V1::TriggerImmediatelyRequest.new(
                                                          overlap_policy: input.schedule.policy.overlap
                                                        )
                                                      end,
                                 backfill_request: input.backfills.map(&:_to_proto)
                               )
                             end,
              identity: @client.connection.identity,
              request_id: SecureRandom.uuid,
              memo: ProtoUtils.memo_to_proto(input.memo, @client.data_converter),
              search_attributes: input.search_attributes&._to_proto
            ),
            rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
          )
          Temporalio::Client::ScheduleHandle.new(client: @client, id: input.id)
        rescue Error::RPCError => e
          # Unpack and raise already started if that's the error, otherwise default raise
          details = if e.code == Error::RPCError::Code::ALREADY_EXISTS && e.grpc_status.details.first
                      e.grpc_status.details.first.unpack(Api::ErrorDetails::V1::WorkflowExecutionAlreadyStartedFailure)
                    end
          raise Error::ScheduleAlreadyRunningError if details

          raise
        end

        def list_schedules(input)
          Enumerator.new do |yielder|
            req = Api::WorkflowService::V1::ListSchedulesRequest.new(
              namespace: @client.namespace,
              query: input.query || ''
            )
            loop do
              resp = @client.workflow_service.list_schedules(
                req,
                rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
              )
              resp.schedules.each do |raw_entry|
                yielder << Temporalio::Client::Schedule::List::Description.new(
                  raw_entry:,
                  data_converter: @client.data_converter
                )
              end
              break if resp.next_page_token.empty?

              req.next_page_token = resp.next_page_token
            end
          end
        end

        def backfill_schedule(input)
          @client.workflow_service.patch_schedule(
            Api::WorkflowService::V1::PatchScheduleRequest.new(
              namespace: @client.namespace,
              schedule_id: input.id,
              patch: Api::Schedule::V1::SchedulePatch.new(
                backfill_request: input.backfills.map(&:_to_proto)
              ),
              identity: @client.connection.identity,
              request_id: SecureRandom.uuid
            ),
            rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
          )
          nil
        end

        def delete_schedule(input)
          @client.workflow_service.delete_schedule(
            Api::WorkflowService::V1::DeleteScheduleRequest.new(
              namespace: @client.namespace,
              schedule_id: input.id,
              identity: @client.connection.identity
            ),
            rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
          )
          nil
        end

        def describe_schedule(input)
          Temporalio::Client::Schedule::Description.new(
            id: input.id,
            raw_description: @client.workflow_service.describe_schedule(
              Api::WorkflowService::V1::DescribeScheduleRequest.new(
                namespace: @client.namespace,
                schedule_id: input.id
              ),
              rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
            ),
            data_converter: @client.data_converter
          )
        end

        def pause_schedule(input)
          @client.workflow_service.patch_schedule(
            Api::WorkflowService::V1::PatchScheduleRequest.new(
              namespace: @client.namespace,
              schedule_id: input.id,
              patch: Api::Schedule::V1::SchedulePatch.new(pause: input.note),
              identity: @client.connection.identity,
              request_id: SecureRandom.uuid
            ),
            rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
          )
          nil
        end

        def trigger_schedule(input)
          @client.workflow_service.patch_schedule(
            Api::WorkflowService::V1::PatchScheduleRequest.new(
              namespace: @client.namespace,
              schedule_id: input.id,
              patch: Api::Schedule::V1::SchedulePatch.new(
                trigger_immediately: Api::Schedule::V1::TriggerImmediatelyRequest.new(
                  overlap_policy: input.overlap || 0
                )
              ),
              identity: @client.connection.identity,
              request_id: SecureRandom.uuid
            ),
            rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
          )
          nil
        end

        def unpause_schedule(input)
          @client.workflow_service.patch_schedule(
            Api::WorkflowService::V1::PatchScheduleRequest.new(
              namespace: @client.namespace,
              schedule_id: input.id,
              patch: Api::Schedule::V1::SchedulePatch.new(unpause: input.note),
              identity: @client.connection.identity,
              request_id: SecureRandom.uuid
            ),
            rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
          )
          nil
        end

        def update_schedule(input)
          # TODO(cretz): This is supposed to be a retry-conflict loop, but we do
          # not yet have a way to know update failure is due to conflict token
          # mismatch
          update = input.updater.call(
            Temporalio::Client::Schedule::Update::Input.new(
              description: Temporalio::Client::Schedule::Description.new(
                id: input.id,
                raw_description: @client.workflow_service.describe_schedule(
                  Api::WorkflowService::V1::DescribeScheduleRequest.new(
                    namespace: @client.namespace,
                    schedule_id: input.id
                  ),
                  rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
                ),
                data_converter: @client.data_converter
              )
            )
          )
          # Do nothing if update is nil, fail if not an expected update
          return nil if update.nil?

          unless update.is_a?(Temporalio::Client::Schedule::Update)
            raise TypeError,
                  'Expected result of update block to be a Schedule::Update'
          end

          @client.workflow_service.update_schedule(
            Api::WorkflowService::V1::UpdateScheduleRequest.new(
              namespace: @client.namespace,
              schedule_id: input.id,
              schedule: update.schedule._to_proto(@client.data_converter),
              search_attributes: update.search_attributes&._to_proto,
              identity: @client.connection.identity,
              request_id: SecureRandom.uuid
            ),
            rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
          )
          nil
        end

        def heartbeat_async_activity(input)
          resp = if input.task_token_or_id_reference.is_a?(Temporalio::Client::ActivityIDReference)
                   @client.workflow_service.record_activity_task_heartbeat_by_id(
                     Api::WorkflowService::V1::RecordActivityTaskHeartbeatByIdRequest.new(
                       workflow_id: input.task_token_or_id_reference.workflow_id,
                       run_id: input.task_token_or_id_reference.run_id,
                       activity_id: input.task_token_or_id_reference.activity_id,
                       namespace: @client.namespace,
                       identity: @client.connection.identity,
                       details: @client.data_converter.to_payloads(input.details)
                     ),
                     rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
                   )
                 else
                   @client.workflow_service.record_activity_task_heartbeat(
                     Api::WorkflowService::V1::RecordActivityTaskHeartbeatRequest.new(
                       task_token: input.task_token_or_id_reference,
                       namespace: @client.namespace,
                       identity: @client.connection.identity,
                       details: @client.data_converter.to_payloads(input.details)
                     ),
                     rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
                   )
                 end
          raise Error::AsyncActivityCanceledError if resp.cancel_requested

          nil
        end

        def complete_async_activity(input)
          if input.task_token_or_id_reference.is_a?(Temporalio::Client::ActivityIDReference)
            @client.workflow_service.respond_activity_task_completed_by_id(
              Api::WorkflowService::V1::RespondActivityTaskCompletedByIdRequest.new(
                workflow_id: input.task_token_or_id_reference.workflow_id,
                run_id: input.task_token_or_id_reference.run_id,
                activity_id: input.task_token_or_id_reference.activity_id,
                namespace: @client.namespace,
                identity: @client.connection.identity,
                result: @client.data_converter.to_payloads([input.result])
              ),
              rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
            )
          else
            @client.workflow_service.respond_activity_task_completed(
              Api::WorkflowService::V1::RespondActivityTaskCompletedRequest.new(
                task_token: input.task_token_or_id_reference,
                namespace: @client.namespace,
                identity: @client.connection.identity,
                result: @client.data_converter.to_payloads([input.result])
              ),
              rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
            )
          end
          nil
        end

        def fail_async_activity(input)
          if input.task_token_or_id_reference.is_a?(Temporalio::Client::ActivityIDReference)
            @client.workflow_service.respond_activity_task_failed_by_id(
              Api::WorkflowService::V1::RespondActivityTaskFailedByIdRequest.new(
                workflow_id: input.task_token_or_id_reference.workflow_id,
                run_id: input.task_token_or_id_reference.run_id,
                activity_id: input.task_token_or_id_reference.activity_id,
                namespace: @client.namespace,
                identity: @client.connection.identity,
                failure: @client.data_converter.to_failure(input.error),
                last_heartbeat_details: if input.last_heartbeat_details.empty?
                                          nil
                                        else
                                          @client.data_converter.to_payloads(input.last_heartbeat_details)
                                        end
              ),
              rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
            )
          else
            @client.workflow_service.respond_activity_task_failed(
              Api::WorkflowService::V1::RespondActivityTaskFailedRequest.new(
                task_token: input.task_token_or_id_reference,
                namespace: @client.namespace,
                identity: @client.connection.identity,
                failure: @client.data_converter.to_failure(input.error),
                last_heartbeat_details: if input.last_heartbeat_details.empty?
                                          nil
                                        else
                                          @client.data_converter.to_payloads(input.last_heartbeat_details)
                                        end
              ),
              rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
            )
          end
          nil
        end

        def report_cancellation_async_activity(input)
          if input.task_token_or_id_reference.is_a?(Temporalio::Client::ActivityIDReference)
            @client.workflow_service.respond_activity_task_canceled_by_id(
              Api::WorkflowService::V1::RespondActivityTaskCanceledByIdRequest.new(
                workflow_id: input.task_token_or_id_reference.workflow_id,
                run_id: input.task_token_or_id_reference.run_id,
                activity_id: input.task_token_or_id_reference.activity_id,
                namespace: @client.namespace,
                identity: @client.connection.identity,
                details: @client.data_converter.to_payloads(input.details)
              ),
              rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
            )
          else
            @client.workflow_service.respond_activity_task_canceled(
              Api::WorkflowService::V1::RespondActivityTaskCanceledRequest.new(
                task_token: input.task_token_or_id_reference,
                namespace: @client.namespace,
                identity: @client.connection.identity,
                details: @client.data_converter.to_payloads(input.details)
              ),
              rpc_options: Implementation.with_default_rpc_options(input.rpc_options)
            )
          end
          nil
        end
      end
    end
  end
end