# frozen_string_literal: true
require "forwardable"
require "logger"
require "weakref"
require "thread"
require "concurrent-ruby"
require "semian/version"
require "semian/instrumentable"
require "semian/platform"
require "semian/resource"
require "semian/circuit_breaker"
require "semian/protected_resource"
require "semian/unprotected_resource"
require "semian/simple_sliding_window"
require "semian/simple_integer"
require "semian/simple_state"
require "semian/lru_hash"
require "semian/configuration_validator"
#
# === Overview
#
# Semian is a library that can be used to control access to external services.
#
# It's desirable to control access to external services so that in the case that one
# is slow or not responding, the performance of an entire system is not compromised.
#
# Semian uses the concept of a "resource" as an identifier that controls access
# to some external service. So for example, "mysql" or "redis" would be considered
# resources. If a system is sharded, like a database, you would typically create
# a resource for every shard.
#
# Resources are visible across an IPC namespace. This means that you can register a
# resource in one process and access it from another. This is useful in application
# servers like Unicorn that are multi-process. A resource is persistent. It will
# continue to exist even after the application exits, and will only be destroyed by
# manually removing it with the <code>ipcrm</code> command, calling Resource.destroy,
# or rebooting the machine.
#
# Each resource has a configurable number of tickets. Tickets are what controls
# access to the external service. If a client does not have a ticket, it cannot access
# a service. If there are no tickets available, the client will block for a configurable
# amount of time until a ticket is available. If there are no tickets available after
# the timeout period has elapsed, the client will be unable to access the service and
# an error will be raised.
#
# Resources also integrate a circuit breaker in order to fail faster and to let the
# resource the time to recover. If `error_threshold` errors happen in the span of `error_timeout`
# then the circuit will be opened and every attempt to acquire the resource will immediately fail.
#
# Once in open state, after `error_timeout` is elapsed, the circuit will transition in the half-open state.
# In that state a single error will fully re-open the circuit, and the circuit will transition back to the closed
# state only after the resource is acquired `success_threshold` consecutive times.
#
# A resource is registered by using the Semian.register method.
#
# ==== Examples
#
# ===== Registering a resource
#
# Semian.register(
# :mysql_shard0,
# tickets: 10,
# timeout: 0.5,
# error_threshold: 3,
# error_timeout: 10,
# success_threshold: 2,
# )
#
# This registers a new resource called <code>:mysql_shard0</code> that has 10 tickets and
# a default timeout of 500 milliseconds.
#
# After 3 failures in the span of 10 seconds the circuit will be open.
# After an additional 10 seconds it will transition to half-open.
# And finally after 2 successful acquisitions of the resource it will transition back to the closed state.
#
# ===== Using a resource
#
# Semian[:mysql_shard0].acquire do
# # Perform a MySQL query here
# end
#
# This acquires a ticket for the <code>:mysql_shard0</code> resource. If we use the example above,
# the ticket count would be lowered to 9 when block is executed, then raised to 10 when the block completes.
#
# ===== Overriding the default timeout
#
# Semian[:mysql_shard0].acquire(timeout: 1) do
# # Perform a MySQL query here
# end
#
# This is the same as the previous example, but overrides the timeout
# from the default value of 500 milliseconds to 1 second.
module Semian
extend self
extend Instrumentable
BaseError = Class.new(StandardError)
SyscallError = Class.new(BaseError)
TimeoutError = Class.new(BaseError)
InternalError = Class.new(BaseError)
OpenCircuitError = Class.new(BaseError)
SemaphoreMissingError = Class.new(BaseError)
attr_accessor :maximum_lru_size, :minimum_lru_time, :default_permissions, :namespace, :default_force_config_validation
self.maximum_lru_size = 500
self.minimum_lru_time = 300 # 300 seconds / 5 minutes
self.default_permissions = 0660
self.default_force_config_validation = false
# We only allow disabling thread-safety for parts of the code that are on the hot path.
# Since locking there could have a significant impact. Everything else is enforced thread safety
def thread_safe?
return @thread_safe if defined?(@thread_safe)
@thread_safe = true
end
def thread_safe=(thread_safe)
@thread_safe = thread_safe
end
@reset_mutex = Mutex.new
def issue_disabled_semaphores_warning
return if defined?(@warning_issued)
@warning_issued = true
if !sysv_semaphores_supported?
logger.info("Semian sysv semaphores are not supported on #{RUBY_PLATFORM} - all operations will no-op")
elsif disabled?
logger.info("Semian semaphores are disabled, is this what you really want? - all operations will no-op")
end
end
module AdapterError
attr_accessor :semian_identifier
def to_s
message = super
if @semian_identifier
prefix = "[#{@semian_identifier}] "
# When an error is created from another error's message it might
# already have a semian identifier in their message
unless message.start_with?(prefix)
message = "#{prefix}#{message}"
end
end
message
end
end
attr_accessor :logger
self.logger = Logger.new($stderr)
# Registers a resource.
#
# +name+: Name of the resource - this can be either a string or symbol. (required)
#
# +circuit_breaker+: The boolean if you want a circuit breaker acquired for your resource. Default true.
#
# +bulkhead+: The boolean if you want a bulkhead to be acquired for your resource. Default true.
#
# +tickets+: Number of tickets. If this value is 0, the ticket count will not be set,
# but the resource must have been previously registered otherwise an error will be raised.
# Mutually exclusive with the 'quota' argument.
#
# +quota+: Calculate tickets as a ratio of the number of registered workers.
# Must be greater than 0, less than or equal to 1. There will always be at least 1 ticket, as it
# is calculated as (workers * quota).ceil
# Mutually exclusive with the 'ticket' argument.
# but the resource must have been previously registered otherwise an error will be raised. (bulkhead)
#
# +permissions+: Octal permissions of the resource. Default to +Semian.default_permissions+ (0660). (bulkhead)
#
# +timeout+: Default timeout in seconds. Default 0. (bulkhead)
#
# +error_timeout+: The duration in seconds since the last error after which the error count is reset to 0.
# (circuit breaker required)
#
# +error_threshold+: The amount of errors that must happen within error_timeout amount of time to open
# the circuit. (circuit breaker required)
#
# +error_threshold_timeout+: The duration in seconds to examine number of errors to compare with error_threshold.
# Default same as error_timeout. (circuit breaker)
#
# +error_threshold_timeout_enabled+: flag to enable/disable filter time window based error eviction
# (error_threshold_timeout). Default true. (circuit breaker)
#
# +success_threshold+: The number of consecutive success after which an half-open circuit will be fully closed.
# (circuit breaker required)
#
# +exceptions+: An array of exception classes that should be accounted as resource errors. Default [].
# (circuit breaker)
#
# Returns the registered resource.
def register(name, **options)
return UnprotectedResource.new(name) if ENV.key?("SEMIAN_DISABLED")
# Validate configuration before proceeding
ConfigurationValidator.new(name, options).validate!
circuit_breaker = create_circuit_breaker(name, **options)
bulkhead = create_bulkhead(name, **options)
resources[name] = ProtectedResource.new(name, bulkhead, circuit_breaker)
end
def retrieve_or_register(name, **args)
# If consumer who retrieved / registered by a Semian::Adapter, keep track
# of who the consumer was so that we can clear the resource reference if needed.
consumer = args.delete(:consumer)
if consumer&.class&.include?(Semian::Adapter) && !args[:dynamic]
consumer_set = consumers.compute_if_absent(name) { ObjectSpace::WeakMap.new }
consumer_set[consumer] = true
end
self[name] || register(name, **args)
end
# Retrieves a resource by name.
def [](name)
resources[name]
end
def destroy(name)
resource = resources.delete(name)
resource&.destroy
end
def destroy_all_resources
resources.values.each(&:destroy)
resources.clear
end
# Unregister will not destroy the semian resource, but it will
# remove it from the hash of registered resources, and decrease
# the number of registered workers.
# Semian.destroy removes the underlying resource, but
# Semian.unregister will remove all references, while preserving
# the underlying semian resource (and sysV semaphore).
# Also clears any semian_resources
# in use by any semian adapters if the weak reference is still alive.
def unregister(name)
resource = resources.delete(name)
if resource
resource.bulkhead&.unregister_worker
consumers_for_resource = consumers.delete(name) || ObjectSpace::WeakMap.new
consumers_for_resource.each_key(&:clear_semian_resource)
end
end
# Unregisters all resources
def unregister_all_resources
resources.keys.each do |resource|
unregister(resource)
end
end
def reset!
@reset_mutex.synchronize do
@consumers = Concurrent::Map.new
@resources = LRUHash.new
end
end
THREAD_BULKHEAD_DISABLED_VAR = :semian_bulkheads_disabled
private_constant(:THREAD_BULKHEAD_DISABLED_VAR)
def bulkheads_disabled_in_thread?(thread)
thread.thread_variable_get(THREAD_BULKHEAD_DISABLED_VAR)
end
def disable_bulkheads_for_thread(thread)
old_value = thread.thread_variable_get(THREAD_BULKHEAD_DISABLED_VAR)
thread.thread_variable_set(THREAD_BULKHEAD_DISABLED_VAR, true)
yield
ensure
thread.thread_variable_set(THREAD_BULKHEAD_DISABLED_VAR, old_value)
end
def resources
return @resources if defined?(@resources) && @resources
@reset_mutex.synchronize do
@resources ||= LRUHash.new
end
end
def consumers
return @consumers if defined?(@consumers) && @consumers
@reset_mutex.synchronize do
@consumers ||= Concurrent::Map.new
end
end
private
def create_circuit_breaker(name, **options)
return if ENV.key?("SEMIAN_CIRCUIT_BREAKER_DISABLED")
return unless options.fetch(:circuit_breaker, true)
exceptions = options[:exceptions] || []
CircuitBreaker.new(
name,
success_threshold: options[:success_threshold],
error_threshold: options[:error_threshold],
error_threshold_timeout: options[:error_threshold_timeout],
error_timeout: options[:error_timeout],
error_threshold_timeout_enabled: if options[:error_threshold_timeout_enabled].nil?
true
else
options[:error_threshold_timeout_enabled]
end,
lumping_interval: if options[:lumping_interval].nil?
0
else
options[:lumping_interval]
end,
exceptions: Array(exceptions) + [::Semian::BaseError],
half_open_resource_timeout: options[:half_open_resource_timeout],
implementation: implementation(**options),
)
end
def implementation(**options)
# thread_safety_disabled will be replaced by a global setting
# Semian is thread safe by default. It is possible
# to modify the value by using Semian.thread_safe=
unless options[:thread_safety_disabled].nil?
logger.info(
"NOTE: thread_safety_disabled will be replaced by a global setting" \
"Semian is thread safe by default. It is possible" \
"to modify the value by using Semian.thread_safe=",
)
end
thread_safe = options[:thread_safety_disabled].nil? ? Semian.thread_safe? : !options[:thread_safety_disabled]
thread_safe ? ::Semian::ThreadSafe : ::Semian::Simple
end
def create_bulkhead(name, **options)
return if ENV.key?("SEMIAN_BULKHEAD_DISABLED") || bulkheads_disabled_in_thread?(Thread.current)
return unless options.fetch(:bulkhead, true)
permissions = options[:permissions] || default_permissions
timeout = options[:timeout] || 0
::Semian::Resource.new(
name,
tickets: options[:tickets],
quota: options[:quota],
permissions: permissions,
timeout: timeout,
)
end
end
if Semian.semaphores_enabled?
require "semian/semian"
else
Semian::MAX_TICKETS = 0
end
if defined? ActiveSupport
ActiveSupport.on_load(:active_record) do
require "semian/rails"
end
end