class Airbrake::ThreadPool

@since v4.6.1
@api private
#=> ECHO: 0…ECHO: 1…ECHO: 2…
10.times { |i| thread_pool << i }
# Send work.
)
block: proc { |message| print “ECHO: #{message}…”}
queue_size: 100,
worker_size: 5,
name: ‘performance-notifier’,
thread_pool = ThreadPool.new(
# the block to be run concurrently.
# Initialize a new thread pool with 5 workers and a queue size of 100. Set
@example
worker threads and the size of the queue to process.
ThreadPool implements a simple thread pool that can configure the number of

def <<(message)

Returns:
  • (Boolean) - true if the message was successfully sent to the pool,

Parameters:
  • message (Object) -- The message that gets passed to the block
def <<(message)
  if backlog >= @queue_size
    logger.info do
      "#{LOG_LABEL} ThreadPool has reached its capacity of " \
      "#{@queue_size} and the following message will not be " \
      "processed: #{message.inspect}"
    end
    return false
  end
  @queue << message
  true
end

def backlog

Returns:
  • (Integer) - how big the queue is at the moment
def backlog
  @queue.size
end

def close

Raises:
  • (Airbrake::Error) - when invoked more than one time

Returns:
  • (void) -
def close
  threads = @mutex.synchronize do
    raise Airbrake::Error, 'this thread pool is closed already' if @closed
    unless @queue.empty?
      msg = "#{LOG_LABEL} waiting to process #{@queue.size} task(s)..."
      logger.debug("#{msg} (Ctrl-C to abort)")
    end
    @worker_size.times { @queue << :stop }
    @closed = true
    @workers.list.dup
  end
  threads.each(&:join)
  logger.debug("#{LOG_LABEL} #{@name} thread pool closed")
end

def closed?

def closed?
  @closed
end

def has_workers?

Other tags:
    See: https://goo.gl/oydz8h - Example of at_exit that prevents exit

Returns:
  • (Boolean) - true if an instance wasn't closed, but has no workers
def has_workers?
  @mutex.synchronize do
    return false if @closed
    if @pid != Process.pid && @workers.list.empty?
      @pid = Process.pid
      @workers = ThreadGroup.new
      spawn_workers
    end
    !@closed && @workers.list.any?
  end
end

def initialize(worker_size:, queue_size:, block:, name: nil)

def initialize(worker_size:, queue_size:, block:, name: nil)
  @name = name
  @worker_size = worker_size
  @queue_size = queue_size
  @block = block
  @queue = SizedQueue.new(queue_size)
  @workers = ThreadGroup.new
  @mutex = Mutex.new
  @pid = nil
  @closed = false
  has_workers?
end

def spawn_worker

def spawn_worker
  Thread.new do
    while (message = @queue.pop)
      break if message == :stop
      @block.call(message)
    end
  end
end

def spawn_workers

def spawn_workers
  @worker_size.times { @workers.add(spawn_worker) }
end