module Semian
def [](name)
def [](name) resources[name] end
def bulkheads_disabled_in_thread?(thread)
def bulkheads_disabled_in_thread?(thread) thread.thread_variable_get(THREAD_BULKHEAD_DISABLED_VAR) end
def consumers
def consumers @consumers ||= {} end
def create_bulkhead(name, **options)
def create_bulkhead(name, **options) return if ENV.key?("SEMIAN_BULKHEAD_DISABLED") || bulkheads_disabled_in_thread?(Thread.current) return unless options.fetch(:bulkhead, true) permissions = options[:permissions] || default_permissions timeout = options[:timeout] || 0 ::Semian::Resource.new( name, tickets: options[:tickets], quota: options[:quota], permissions: permissions, timeout: timeout, ) end
def create_circuit_breaker(name, **options)
def create_circuit_breaker(name, **options) return if ENV.key?("SEMIAN_CIRCUIT_BREAKER_DISABLED") return unless options.fetch(:circuit_breaker, true) require_keys!([:success_threshold, :error_threshold, :error_timeout], options) exceptions = options[:exceptions] || [] CircuitBreaker.new( name, success_threshold: options[:success_threshold], error_threshold: options[:error_threshold], error_threshold_timeout: options[:error_threshold_timeout], error_timeout: options[:error_timeout], error_threshold_timeout_enabled: if options[:error_threshold_timeout_enabled].nil? true else options[:error_threshold_timeout_enabled] end, exceptions: Array(exceptions) + [::Semian::BaseError], half_open_resource_timeout: options[:half_open_resource_timeout], implementation: implementation(**options), ) end
def destroy(name)
def destroy(name) resource = resources.delete(name) resource&.destroy end
def destroy_all_resources
def destroy_all_resources resources.values.each(&:destroy) resources.clear end
def disable_bulkheads_for_thread(thread)
def disable_bulkheads_for_thread(thread) old_value = thread.thread_variable_get(THREAD_BULKHEAD_DISABLED_VAR) thread.thread_variable_set(THREAD_BULKHEAD_DISABLED_VAR, true) yield ensure thread.thread_variable_set(THREAD_BULKHEAD_DISABLED_VAR, old_value) end
def disabled?
def disabled? ENV.key?("SEMIAN_SEMAPHORES_DISABLED") || ENV.key?("SEMIAN_DISABLED") end
def implementation(**options)
def implementation(**options) # thread_safety_disabled will be replaced by a global setting # Semian is thread safe by default. It is possible # to modify the value by using Semian.thread_safe= unless options[:thread_safety_disabled].nil? logger.info( "NOTE: thread_safety_disabled will be replaced by a global setting" \ "Semian is thread safe by default. It is possible" \ "to modify the value by using Semian.thread_safe=", ) end thread_safe = options[:thread_safety_disabled].nil? ? Semian.thread_safe? : !options[:thread_safety_disabled] thread_safe ? ::Semian::ThreadSafe : ::Semian::Simple end
def issue_disabled_semaphores_warning
def issue_disabled_semaphores_warning return if defined?(@warning_issued) @warning_issued = true if !sysv_semaphores_supported? logger.info("Semian sysv semaphores are not supported on #{RUBY_PLATFORM} - all operations will no-op") elsif disabled? logger.info("Semian semaphores are disabled, is this what you really want? - all operations will no-op") end end
def register(name, **options)
(circuit breaker)
+exceptions+: An array of exception classes that should be accounted as resource errors. Default [].
(circuit breaker required)
+success_threshold+: The number of consecutive success after which an half-open circuit will be fully closed.
(error_threshold_timeout). Default true. (circuit breaker)
+error_threshold_timeout_enabled+: flag to enable/disable filter time window based error eviction
Default same as error_timeout. (circuit breaker)
+error_threshold_timeout+: The duration in seconds to examine number of errors to compare with error_threshold.
the circuit. (circuit breaker required)
+error_threshold+: The amount of errors that must happen within error_timeout amount of time to open
(circuit breaker required)
+error_timeout+: The duration in seconds since the last error after which the error count is reset to 0.
+timeout+: Default timeout in seconds. Default 0. (bulkhead)
+permissions+: Octal permissions of the resource. Default to +Semian.default_permissions+ (0660). (bulkhead)
but the resource must have been previously registered otherwise an error will be raised. (bulkhead)
Mutually exclusive with the 'ticket' argument.
is calculated as (workers * quota).ceil
Must be greater than 0, less than or equal to 1. There will always be at least 1 ticket, as it
+quota+: Calculate tickets as a ratio of the number of registered workers.
Mutually exclusive with the 'quota' argument.
but the resource must have been previously registered otherwise an error will be raised.
+tickets+: Number of tickets. If this value is 0, the ticket count will not be set,
+bulkhead+: The boolean if you want a bulkhead to be acquired for your resource. Default true.
+circuit_breaker+: The boolean if you want a circuit breaker acquired for your resource. Default true.
+name+: Name of the resource - this can be either a string or symbol. (required)
Registers a resource.
def register(name, **options) return UnprotectedResource.new(name) if ENV.key?("SEMIAN_DISABLED") circuit_breaker = create_circuit_breaker(name, **options) bulkhead = create_bulkhead(name, **options) if circuit_breaker.nil? && bulkhead.nil? raise ArgumentError, "Both bulkhead and circuitbreaker cannot be disabled." end resources[name] = ProtectedResource.new(name, bulkhead, circuit_breaker) end
def require_keys!(required, options)
def require_keys!(required, options) diff = required - options.keys unless diff.empty? raise ArgumentError, "Missing required arguments for Semian: #{diff}" end end
def reset!
def reset! @consumers = {} @resources = LRUHash.new end
def resources
def resources @resources ||= LRUHash.new end
def retrieve_or_register(name, **args)
def retrieve_or_register(name, **args) # If consumer who retrieved / registered by a Semian::Adapter, keep track # of who the consumer was so that we can clear the resource reference if needed. consumer = args.delete(:consumer) if consumer&.class&.include?(Semian::Adapter) consumers[name] ||= [] consumers[name] << WeakRef.new(consumer) end self[name] || register(name, **args) end
def semaphores_enabled?
def semaphores_enabled? !disabled? && sysv_semaphores_supported? end
def sysv_semaphores_supported?
def sysv_semaphores_supported? /linux/.match(RUBY_PLATFORM) end
def thread_safe=(thread_safe)
def thread_safe=(thread_safe) @thread_safe = thread_safe end
def thread_safe?
def thread_safe? return @thread_safe if defined?(@thread_safe) @thread_safe = true end
def unregister(name)
Also clears any semian_resources
the underlying semian resource (and sysV semaphore).
Semian.unregister will remove all references, while preserving
Semian.destroy removes the underlying resource, but
the number of registered workers.
remove it from the hash of registered resources, and decrease
Unregister will not destroy the semian resource, but it will
def unregister(name) resource = resources.delete(name) if resource resource.bulkhead&.unregister_worker consumers_for_resource = consumers.delete(name) || [] consumers_for_resource.each do |consumer| if consumer.weakref_alive? consumer.clear_semian_resource end rescue WeakRef::RefError next end end end
def unregister_all_resources
def unregister_all_resources resources.keys.each do |resource| unregister(resource) end end