class LogStash::Inputs::BeatsSupport::CircuitBreaker

Largely inspired by Martin’s fowler circuit breaker

def closed?

def closed?
  current_state = state
  current_state == :close || current_state == :half_open
end

def execute(args = nil)

def execute(args = nil)
  case state
  when :open
    logger.warn("CircuitBreaker::Open", :name => @name)
    raise OpenBreaker, "for #{@name}"
  when :close, :half_open
    if block_given?
      yield args
    else
      @block.call(args)
    end
    if state == :half_open
      logger.warn("CircuitBreaker::Close", :name => @name)
      reset
    end
  end
rescue *@exceptions => e
  logger.warn("CircuitBreaker::rescuing exceptions", :name => @name, :exception => e.class)
  increment_errors(e)
  raise HalfOpenBreaker
end

def increment_errors(exception)

def increment_errors(exception)
  t = Time.now
  @mutex.synchronize do
    @errors_count += 1
    @last_failure_time = t
  end
  logger.debug("CircuitBreaker increment errors", 
               :errors_count => @errors_count, 
               :error_threshold => @error_threshold,
               :exception => exception.class,
               :message => exception.message) if logger.debug?
end

def initialize(name, options = {}, &block)

def initialize(name, options = {}, &block)
  @exceptions = Array(options.fetch(:exceptions, [StandardError]))
  @error_threshold = options.fetch(:error_threshold, DEFAULT_ERROR_THRESHOLD)
  @time_before_retry = options.fetch(:time_before_retry, DEFAULT_TIME_BEFORE_RETRY)
  @block = block
  @name = name
  @mutex = Mutex.new
  reset
end

def logger

def logger
  @logger ||= Cabin::Channel.get(LogStash)
end

def reset

def reset
  @mutex.synchronize do
    @errors_count = 0
    @last_failure_time = nil
  end
end

def state

def state
  t = Time.now
  @mutex.synchronize do
    if @errors_count >= @error_threshold
      if t - @last_failure_time > @time_before_retry
        :half_open
      else
        :open
      end
    else
      :close
    end
  end
end