lib/temporalio/worker/activity_executor/thread_pool.rb
# frozen_string_literal: true require 'temporalio/worker/thread_pool' module Temporalio class Worker class ActivityExecutor # Activity executor for scheduling activities in their own thread using {Worker::ThreadPool}. class ThreadPool < ActivityExecutor # @return [ThreadPool] Default/shared thread pool executor using default thread pool. def self.default @default ||= new end # Create a new thread pool executor. # # @param thread_pool [Worker::ThreadPool] Thread pool to use. def initialize(thread_pool = Worker::ThreadPool.default) # rubocop:disable Lint/MissingSuper @thread_pool = thread_pool end # @see ActivityExecutor.execute_activity def execute_activity(_defn, &) @thread_pool.execute(&) end # @see ActivityExecutor.activity_context def activity_context Thread.current[:temporal_activity_context] end # @see ActivityExecutor.set_activity_context def set_activity_context(defn, context) Thread.current[:temporal_activity_context] = context # If they have opted in to raising on cancel, wire that up return unless defn.cancel_raise thread = Thread.current context&.cancellation&.add_cancel_callback do thread.raise(Error::CanceledError.new('Activity canceled')) if thread[:temporal_activity_context] == context end end end end end end