module Concurrent
def self.available_processor_count
-
(Float)
- number of available processors
def self.available_processor_count processor_counter.available_processor_count end
def self.cpu_quota
-
(nil, Float)
- Maximum number of available processors as set by a cgroup CPU quota, or nil if none set
def self.cpu_quota processor_counter.cpu_quota end
def self.cpu_shares
-
(Float, nil)
- CPU shares requested by the process, or nil if not set
def self.cpu_shares processor_counter.cpu_shares end
def self.create_simple_logger(level = :FATAL, output = $stderr)
def self.create_simple_logger(level = :FATAL, output = $stderr) level = Concern::Logging.const_get(level) unless level.is_a?(Integer) # TODO (pitr-ch 24-Dec-2016): figure out why it had to be replaced, stdlogger was deadlocking lambda do |severity, progname, message = nil, &block| return false if severity < level message = block ? block.call : message formatted_message = case message when String message when Exception format "%s (%s)\n%s", message.message, message.class, (message.backtrace || []).join("\n") else message.inspect end output.print format "[%s] %5s -- %s: %s\n", Time.now.strftime('%Y-%m-%d %H:%M:%S.%L'), Concern::Logging::SEV_LABEL[severity], progname, formatted_message true end end
def self.create_stdlib_logger(level = :FATAL, output = $stderr)
def self.create_stdlib_logger(level = :FATAL, output = $stderr) require 'logger' logger = Logger.new(output) logger.level = level logger.formatter = lambda do |severity, datetime, progname, msg| formatted_message = case msg when String msg when Exception format "%s (%s)\n%s", msg.message, msg.class, (msg.backtrace || []).join("\n") else msg.inspect end format "[%s] %5s -- %s: %s\n", datetime.strftime('%Y-%m-%d %H:%M:%S.%L'), severity, progname, formatted_message end lambda do |loglevel, progname, message = nil, &block| logger.add loglevel, message, progname, &block end end
def self.disable_at_exit_handlers!
- Has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841.
Other tags:
- Note: - This method should *never* be called
Note: - this option should be needed only because of `at_exit` ordering
def self.disable_at_exit_handlers! deprecated "Method #disable_at_exit_handlers! has no effect since it is no longer needed, see https://github.com/ruby-concurrency/concurrent-ruby/pull/841." end
def self.executor(executor_identifier)
-
(Executor)
-
Parameters:
-
executor_identifier
(Symbol, Executor
) -- symbols:
def self.executor(executor_identifier) Options.executor(executor_identifier) end
def self.global_fast_executor
-
(ThreadPoolExecutor)
- the thread pool
def self.global_fast_executor GLOBAL_FAST_EXECUTOR.value! end
def self.global_immediate_executor
def self.global_immediate_executor GLOBAL_IMMEDIATE_EXECUTOR end
def self.global_io_executor
-
(ThreadPoolExecutor)
- the thread pool
def self.global_io_executor GLOBAL_IO_EXECUTOR.value! end
def self.global_logger
def self.global_logger GLOBAL_LOGGER.value end
def self.global_logger=(value)
def self.global_logger=(value) GLOBAL_LOGGER.value = value end
def self.global_timer_set
-
(Concurrent::TimerSet)
- the thread pool
def self.global_timer_set GLOBAL_TIMER_SET.value! end
def self.mutex_owned_per_thread?
def self.mutex_owned_per_thread? return false if Concurrent.on_jruby? || Concurrent.on_truffleruby? mutex = Mutex.new # Lock the mutex: mutex.synchronize do # Check if the mutex is still owned in a child fiber: Fiber.new { mutex.owned? }.resume end end
def self.new_fast_executor(opts = {})
def self.new_fast_executor(opts = {}) FixedThreadPool.new( [2, Concurrent.processor_count].max, auto_terminate: opts.fetch(:auto_terminate, true), idletime: 60, # 1 minute max_queue: 0, # unlimited fallback_policy: :abort, # shouldn't matter -- 0 max queue name: "fast" ) end
def self.new_io_executor(opts = {})
def self.new_io_executor(opts = {}) CachedThreadPool.new( auto_terminate: opts.fetch(:auto_terminate, true), fallback_policy: :abort, # shouldn't matter -- 0 max queue name: "io" ) end
def self.physical_processor_count
- See: http://linux.die.net/man/8/sysctl -
See: http://www.unix.com/man-page/osx/1/HWPREFS/ -
See: http://msdn.microsoft.com/en-us/library/aa394373(v=vs.85).aspx -
See: https://github.com/grosser/parallel/blob/4fc8b89d08c7091fe0419ca8fba1ec3ce5a8d185/lib/parallel.rb -
Returns:
-
(Integer)
- number physical processor cores on the current system
def self.physical_processor_count processor_counter.physical_processor_count end
def self.processor_count
- See: http://docs.oracle.com/javase/6/docs/api/java/lang/Runtime.html#availableProcessors() -
Returns:
-
(Integer)
- number of processors seen by the OS or Java runtime
def self.processor_count processor_counter.processor_count end
def self.use_simple_logger(level = :FATAL, output = $stderr)
def self.use_simple_logger(level = :FATAL, output = $stderr) Concurrent.global_logger = create_simple_logger level, output end
def self.use_stdlib_logger(level = :FATAL, output = $stderr)
def self.use_stdlib_logger(level = :FATAL, output = $stderr) Concurrent.global_logger = create_stdlib_logger level, output end
def abort_transaction
def abort_transaction raise Transaction::AbortError.new end
def atomically
b.value += 10
a.value -= 10
Concurrent::atomically do
b = new TVar(100)
a = new TVar(100_000)
@example
Transactions within transactions are flattened to a single transaction.
the transaction. Creating a thread counts as a side-effect.
* If you create a new thread within an atomically, it will not be part of
* It is undefined behaviour to use callcc or Fiber with atomically.
* If an exception escapes an atomically block it will abort the transaction.
side-effects, except for via TVar.
more than once. In most cases your code should be free of
* Most importantly, the block that you pass to atomically may be executed
There are some very important and unusual semantics that you must be aware of:
properties from database transactions.
transactions never interfere with each other. You may recognise these
objects involved will never enter an illegal state, and isolated, in that
that it either happens or it does not, consistent, in that the `TVar`
With respect to the value of `TVar` objects, the transaction is atomic, in
Run a block that reads and writes `TVar`s as a single atomic transaction.
def atomically raise ArgumentError.new('no block given') unless block_given? # Get the current transaction transaction = Transaction::current # Are we not already in a transaction (not nested)? if transaction.nil? # New transaction begin # Retry loop loop do # Create a new transaction transaction = Transaction.new Transaction::current = transaction # Run the block, aborting on exceptions begin result = yield rescue Transaction::AbortError => e transaction.abort result = Transaction::ABORTED rescue Transaction::LeaveError => e transaction.abort break result rescue => e transaction.abort raise e end # If we can commit, break out of the loop if result != Transaction::ABORTED if transaction.commit break result end end end ensure # Clear the current transaction Transaction::current = nil end else # Nested transaction - flatten it and just run the block yield end end
def call_dataflow(method, executor, *inputs, &block)
def call_dataflow(method, executor, *inputs, &block) raise ArgumentError.new('an executor must be provided') if executor.nil? raise ArgumentError.new('no block given') unless block_given? unless inputs.all? { |input| input.is_a? IVar } raise ArgumentError.new("Not all dependencies are IVars.\nDependencies: #{ inputs.inspect }") end result = Future.new(executor: executor) do values = inputs.map { |input| input.send(method) } block.call(*values) end if inputs.empty? result.execute else counter = DependencyCounter.new(inputs.size) { result.execute } inputs.each do |input| input.add_observer counter end end result end
def dataflow(*inputs, &block)
-
(ArgumentError)
- if any of the inputs are not `IVar`s -
(ArgumentError)
- if no block is given
Returns:
-
(Object)
- the result of all the operations
Other tags:
- Yieldreturn: - the result of the block operation
Other tags:
- Yieldparam: inputs - each of the `Future` inputs to the dataflow
Other tags:
- Yield: - The operation to perform once all the dependencies are met
Parameters:
-
inputs
(Future
) -- zero or more `Future` operations that this dataflow depends upon
def dataflow(*inputs, &block) dataflow_with(Concurrent.global_io_executor, *inputs, &block) end
def dataflow!(*inputs, &block)
def dataflow!(*inputs, &block) dataflow_with!(Concurrent.global_io_executor, *inputs, &block) end
def dataflow_with(executor, *inputs, &block)
def dataflow_with(executor, *inputs, &block) call_dataflow(:value, executor, *inputs, &block) end
def dataflow_with!(executor, *inputs, &block)
def dataflow_with!(executor, *inputs, &block) call_dataflow(:value!, executor, *inputs, &block) end
def leave_transaction
def leave_transaction raise Transaction::LeaveError.new end
def monotonic_time(unit = :float_second)
-
(Float)
- The current monotonic time since some unspecified
Parameters:
-
unit
(Symbol
) -- the time unit to be returned, can be either
def monotonic_time(unit = :float_second) Process.clock_gettime(Process::CLOCK_MONOTONIC, unit) end