lib/concurrent-ruby/concurrent/executor/abstract_executor_service.rb
require 'concurrent/errors' require 'concurrent/concern/deprecation' require 'concurrent/executor/executor_service' require 'concurrent/synchronization/lockable_object' module Concurrent # @!macro abstract_executor_service_public_api # @!visibility private class AbstractExecutorService < Synchronization::LockableObject include ExecutorService include Concern::Deprecation # The set of possible fallback policies that may be set at thread pool creation. FALLBACK_POLICIES = [:abort, :discard, :caller_runs].freeze # @!macro executor_service_attr_reader_fallback_policy attr_reader :fallback_policy attr_reader :name # Create a new thread pool. def initialize(opts = {}, &block) super(&nil) synchronize do @auto_terminate = opts.fetch(:auto_terminate, true) @name = opts.fetch(:name) if opts.key?(:name) ns_initialize(opts, &block) end end def to_s name ? "#{super[0..-2]} name: #{name}>" : super end # @!macro executor_service_method_shutdown def shutdown raise NotImplementedError end # @!macro executor_service_method_kill def kill raise NotImplementedError end # @!macro executor_service_method_wait_for_termination def wait_for_termination(timeout = nil) raise NotImplementedError end # @!macro executor_service_method_running_question def running? synchronize { ns_running? } end # @!macro executor_service_method_shuttingdown_question def shuttingdown? synchronize { ns_shuttingdown? } end # @!macro executor_service_method_shutdown_question def shutdown? synchronize { ns_shutdown? } end # @!macro executor_service_method_auto_terminate_question def auto_terminate? synchronize { @auto_terminate } end # @!macro executor_service_method_auto_terminate_setter def auto_terminate=(value) deprecated "Method #auto_terminate= has no effect. Set :auto_terminate option when executor is initialized." end private # Returns an action which executes the `fallback_policy` once the queue # size reaches `max_queue`. The reason for the indirection of an action # is so that the work can be deferred outside of synchronization. # # @param [Array] args the arguments to the task which is being handled. # # @!visibility private def fallback_action(*args) case fallback_policy when :abort lambda { raise RejectedExecutionError } when :discard lambda { false } when :caller_runs lambda { begin yield(*args) rescue => ex # let it fail log DEBUG, ex end true } else lambda { fail "Unknown fallback policy #{fallback_policy}" } end end def ns_execute(*args, &task) raise NotImplementedError end # @!macro executor_service_method_ns_shutdown_execution # # Callback method called when an orderly shutdown has completed. # The default behavior is to signal all waiting threads. def ns_shutdown_execution # do nothing end # @!macro executor_service_method_ns_kill_execution # # Callback method called when the executor has been killed. # The default behavior is to do nothing. def ns_kill_execution # do nothing end def ns_auto_terminate? @auto_terminate end end end