class LogStash::Inputs::BeatsSupport::CircuitBreaker
Largely inspired by Martin’s fowler circuit breaker
def closed?
def closed? current_state = state current_state == :close || current_state == :half_open end
def execute(args = nil)
def execute(args = nil) case state when :open logger.warn("CircuitBreaker::Open", :name => @name) raise OpenBreaker, "for #{@name}" when :close, :half_open if block_given? yield args else @block.call(args) end if state == :half_open logger.warn("CircuitBreaker::Close", :name => @name) reset end end rescue *@exceptions => e logger.warn("CircuitBreaker::rescuing exceptions", :name => @name, :exception => e.class) increment_errors(e) raise HalfOpenBreaker end
def increment_errors(exception)
def increment_errors(exception) t = Time.now @mutex.synchronize do @errors_count += 1 @last_failure_time = t end logger.debug("CircuitBreaker increment errors", :errors_count => @errors_count, :error_threshold => @error_threshold, :exception => exception.class, :message => exception.message) if logger.debug? end
def initialize(name, options = {}, &block)
def initialize(name, options = {}, &block) @exceptions = Array(options.fetch(:exceptions, [StandardError])) @error_threshold = options.fetch(:error_threshold, DEFAULT_ERROR_THRESHOLD) @time_before_retry = options.fetch(:time_before_retry, DEFAULT_TIME_BEFORE_RETRY) @block = block @name = name @mutex = Mutex.new reset end
def logger
def logger @logger ||= Cabin::Channel.get(LogStash) end
def reset
def reset @mutex.synchronize do @errors_count = 0 @last_failure_time = nil end end
def state
def state t = Time.now @mutex.synchronize do if @errors_count >= @error_threshold if t - @last_failure_time > @time_before_retry :half_open else :open end else :close end end end