module Concurrent

def self.available_processor_count

Returns:
  • (Float) - number of available processors
def self.available_processor_count
  processor_counter.available_processor_count
end

def self.cpu_quota

Returns:
  • (nil, Float) - Maximum number of available processors as set by a cgroup CPU quota, or nil if none set
def self.cpu_quota
  processor_counter.cpu_quota
end

def self.cpu_shares

Returns:
  • (Float, nil) - CPU shares requested by the process, or nil if not set
def self.cpu_shares
  processor_counter.cpu_shares
end

def self.create_simple_logger(level = :FATAL, output = $stderr)

Create a simple logger with provided level and output.
def self.create_simple_logger(level = :FATAL, output = $stderr)
  level = Concern::Logging.const_get(level) unless level.is_a?(Integer)
  # TODO (pitr-ch 24-Dec-2016): figure out why it had to be replaced, stdlogger was deadlocking
  lambda do |severity, progname, message = nil, &block|
    return false if severity < level
    message           = block ? block.call : message
    formatted_message = case message
                        when String
                          message
                        when Exception
                          format "%s (%s)\n%s",
                                 message.message, message.class, (message.backtrace || []).join("\n")
                        else
                          message.inspect
                        end
    output.print format "[%s] %5s -- %s: %s\n",
                        Time.now.strftime('%Y-%m-%d %H:%M:%S.%L'),
                        Concern::Logging::SEV_LABEL[severity],
                        progname,
                        formatted_message
    true
  end
end

def self.create_stdlib_logger(level = :FATAL, output = $stderr)

Deprecated:
def self.create_stdlib_logger(level = :FATAL, output = $stderr)
  require 'logger'
  logger           = Logger.new(output)
  logger.level     = level
  logger.formatter = lambda do |severity, datetime, progname, msg|
    formatted_message = case msg
                        when String
                          msg
                        when Exception
                          format "%s (%s)\n%s",
                                 msg.message, msg.class, (msg.backtrace || []).join("\n")
                        else
                          msg.inspect
                        end
    format "[%s] %5s -- %s: %s\n",
           datetime.strftime('%Y-%m-%d %H:%M:%S.%L'),
           severity,
           progname,
           formatted_message
  end
  lambda do |loglevel, progname, message = nil, &block|
    logger.add loglevel, message, progname, &block
  end
end

def self.disable_at_exit_handlers!

Deprecated:
  • Has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841.

Other tags:
    Note: - This method should *never* be called
    Note: - this option should be needed only because of `at_exit` ordering
def self.disable_at_exit_handlers!
  deprecated "Method #disable_at_exit_handlers! has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841."
end

def self.executor(executor_identifier)

Returns:
  • (Executor) -

Parameters:
  • executor_identifier (Symbol, Executor) -- symbols:
def self.executor(executor_identifier)
  Options.executor(executor_identifier)
end

def self.global_fast_executor

Returns:
  • (ThreadPoolExecutor) - the thread pool
def self.global_fast_executor
  GLOBAL_FAST_EXECUTOR.value!
end

def self.global_immediate_executor

def self.global_immediate_executor
  GLOBAL_IMMEDIATE_EXECUTOR
end

def self.global_io_executor

Returns:
  • (ThreadPoolExecutor) - the thread pool
def self.global_io_executor
  GLOBAL_IO_EXECUTOR.value!
end

def self.global_logger

def self.global_logger
  GLOBAL_LOGGER.value
end

def self.global_logger=(value)

def self.global_logger=(value)
  GLOBAL_LOGGER.value = value
end

def self.global_timer_set

Returns:
  • (Concurrent::TimerSet) - the thread pool
def self.global_timer_set
  GLOBAL_TIMER_SET.value!
end

def self.mutex_owned_per_thread?

@!visibility private
def self.mutex_owned_per_thread?
  return false if Concurrent.on_jruby? || Concurrent.on_truffleruby?
  mutex = Mutex.new
  # Lock the mutex:
  mutex.synchronize do
    # Check if the mutex is still owned in a child fiber:
    Fiber.new { mutex.owned? }.resume
  end
end

def self.new_fast_executor(opts = {})

def self.new_fast_executor(opts = {})
  FixedThreadPool.new(
      [2, Concurrent.processor_count].max,
      auto_terminate:  opts.fetch(:auto_terminate, true),
      idletime:        60, # 1 minute
      max_queue:       0, # unlimited
      fallback_policy: :abort, # shouldn't matter -- 0 max queue
      name:            "fast"
  )
end

def self.new_io_executor(opts = {})

def self.new_io_executor(opts = {})
  CachedThreadPool.new(
      auto_terminate:  opts.fetch(:auto_terminate, true),
      fallback_policy: :abort, # shouldn't matter -- 0 max queue
      name:            "io"
  )
end

def self.physical_processor_count

Other tags:
    See: http://linux.die.net/man/8/sysctl -
    See: http://www.unix.com/man-page/osx/1/HWPREFS/ -
    See: http://msdn.microsoft.com/en-us/library/aa394373(v=vs.85).aspx -
    See: https://github.com/grosser/parallel/blob/4fc8b89d08c7091fe0419ca8fba1ec3ce5a8d185/lib/parallel.rb -

Returns:
  • (Integer) - number physical processor cores on the current system
def self.physical_processor_count
  processor_counter.physical_processor_count
end

def self.processor_count

Other tags:
    See: http://docs.oracle.com/javase/6/docs/api/java/lang/Runtime.html#availableProcessors() -

Returns:
  • (Integer) - number of processors seen by the OS or Java runtime
def self.processor_count
  processor_counter.processor_count
end

def self.use_simple_logger(level = :FATAL, output = $stderr)

Use logger created by #create_simple_logger to log concurrent-ruby messages.
def self.use_simple_logger(level = :FATAL, output = $stderr)
  Concurrent.global_logger = create_simple_logger level, output
end

def self.use_stdlib_logger(level = :FATAL, output = $stderr)

Deprecated:
def self.use_stdlib_logger(level = :FATAL, output = $stderr)
  Concurrent.global_logger = create_stdlib_logger level, output
end

def abort_transaction

Abort a currently running transaction - see `Concurrent::atomically`.
def abort_transaction
  raise Transaction::AbortError.new
end

def atomically

end
b.value += 10
a.value -= 10
Concurrent::atomically do

b = new TVar(100)
a = new TVar(100_000)
@example

Transactions within transactions are flattened to a single transaction.

the transaction. Creating a thread counts as a side-effect.
* If you create a new thread within an atomically, it will not be part of

* It is undefined behaviour to use callcc or Fiber with atomically.

* If an exception escapes an atomically block it will abort the transaction.

side-effects, except for via TVar.
more than once. In most cases your code should be free of
* Most importantly, the block that you pass to atomically may be executed

There are some very important and unusual semantics that you must be aware of:

properties from database transactions.
transactions never interfere with each other. You may recognise these
objects involved will never enter an illegal state, and isolated, in that
that it either happens or it does not, consistent, in that the `TVar`
With respect to the value of `TVar` objects, the transaction is atomic, in
Run a block that reads and writes `TVar`s as a single atomic transaction.
def atomically
  raise ArgumentError.new('no block given') unless block_given?
  # Get the current transaction
  transaction = Transaction::current
  # Are we not already in a transaction (not nested)?
  if transaction.nil?
    # New transaction
    begin
      # Retry loop
      loop do
        # Create a new transaction
        transaction = Transaction.new
        Transaction::current = transaction
        # Run the block, aborting on exceptions
        begin
          result = yield
        rescue Transaction::AbortError => e
          transaction.abort
          result = Transaction::ABORTED
        rescue Transaction::LeaveError => e
          transaction.abort
          break result
        rescue => e
          transaction.abort
          raise e
        end
        # If we can commit, break out of the loop
        if result != Transaction::ABORTED
          if transaction.commit
            break result
          end
        end
      end
    ensure
      # Clear the current transaction
      Transaction::current = nil
    end
  else
    # Nested transaction - flatten it and just run the block
    yield
  end
end

def call_dataflow(method, executor, *inputs, &block)

def call_dataflow(method, executor, *inputs, &block)
  raise ArgumentError.new('an executor must be provided') if executor.nil?
  raise ArgumentError.new('no block given') unless block_given?
  unless inputs.all? { |input| input.is_a? IVar }
    raise ArgumentError.new("Not all dependencies are IVars.\nDependencies: #{ inputs.inspect }")
  end
  result = Future.new(executor: executor) do
    values = inputs.map { |input| input.send(method) }
    block.call(*values)
  end
  if inputs.empty?
    result.execute
  else
    counter = DependencyCounter.new(inputs.size) { result.execute }
    inputs.each do |input|
      input.add_observer counter
    end
  end
  result
end

def dataflow(*inputs, &block)

Raises:
  • (ArgumentError) - if any of the inputs are not `IVar`s
  • (ArgumentError) - if no block is given

Returns:
  • (Object) - the result of all the operations

Other tags:
    Yieldreturn: - the result of the block operation

Other tags:
    Yieldparam: inputs - each of the `Future` inputs to the dataflow

Other tags:
    Yield: - The operation to perform once all the dependencies are met

Parameters:
  • inputs (Future) -- zero or more `Future` operations that this dataflow depends upon
def dataflow(*inputs, &block)
  dataflow_with(Concurrent.global_io_executor, *inputs, &block)
end

def dataflow!(*inputs, &block)

def dataflow!(*inputs, &block)
  dataflow_with!(Concurrent.global_io_executor, *inputs, &block)
end

def dataflow_with(executor, *inputs, &block)

def dataflow_with(executor, *inputs, &block)
  call_dataflow(:value, executor, *inputs, &block)
end

def dataflow_with!(executor, *inputs, &block)

def dataflow_with!(executor, *inputs, &block)
  call_dataflow(:value!, executor, *inputs, &block)
end

def leave_transaction

Leave a transaction without committing or aborting - see `Concurrent::atomically`.
def leave_transaction
  raise Transaction::LeaveError.new
end

def monotonic_time(unit = :float_second)

Returns:
  • (Float) - The current monotonic time since some unspecified

Parameters:
  • unit (Symbol) -- the time unit to be returned, can be either
def monotonic_time(unit = :float_second)
  Process.clock_gettime(Process::CLOCK_MONOTONIC, unit)
end