class Airbrake::ThreadPool
@since v4.6.1
@api private
#=> ECHO: 0…ECHO: 1…ECHO: 2…
10.times { |i| thread_pool << i }
# Send work.
)
block: proc { |message| print “ECHO: #{message}…”}
queue_size: 100,
worker_size: 5,
name: ‘performance-notifier’,
thread_pool = ThreadPool.new(
# the block to be run concurrently.
# Initialize a new thread pool with 5 workers and a queue size of 100. Set
@example
worker threads and the size of the queue to process.
ThreadPool implements a simple thread pool that can configure the number of
def <<(message)
-
(Boolean)
- true if the message was successfully sent to the pool,
Parameters:
-
message
(Object
) -- The message that gets passed to the block
def <<(message) if backlog >= @queue_size logger.info do "#{LOG_LABEL} ThreadPool has reached its capacity of " \ "#{@queue_size} and the following message will not be " \ "processed: #{message.inspect}" end return false end @queue << message true end
def backlog
-
(Integer)
- how big the queue is at the moment
def backlog @queue.size end
def close
-
(Airbrake::Error)
- when invoked more than one time
Returns:
-
(void)
-
def close threads = @mutex.synchronize do raise Airbrake::Error, 'this thread pool is closed already' if @closed unless @queue.empty? msg = "#{LOG_LABEL} waiting to process #{@queue.size} task(s)..." logger.debug("#{msg} (Ctrl-C to abort)") end @worker_size.times { @queue << :stop } @closed = true @workers.list.dup end threads.each(&:join) logger.debug("#{LOG_LABEL} #{@name} thread pool closed") end
def closed?
def closed? @closed end
def has_workers?
- See: https://goo.gl/oydz8h - Example of at_exit that prevents exit
Returns:
-
(Boolean)
- true if an instance wasn't closed, but has no workers
def has_workers? @mutex.synchronize do return false if @closed if @pid != Process.pid && @workers.list.empty? @pid = Process.pid @workers = ThreadGroup.new spawn_workers end !@closed && @workers.list.any? end end
def initialize(worker_size:, queue_size:, block:, name: nil)
def initialize(worker_size:, queue_size:, block:, name: nil) @name = name @worker_size = worker_size @queue_size = queue_size @block = block @queue = SizedQueue.new(queue_size) @workers = ThreadGroup.new @mutex = Mutex.new @pid = nil @closed = false has_workers? end
def spawn_worker
def spawn_worker Thread.new do while (message = @queue.pop) break if message == :stop @block.call(message) end end end
def spawn_workers
def spawn_workers @worker_size.times { @workers.add(spawn_worker) } end