class Fluent::ForwardOutput::FailureDetector
def add(now)
def add(now) if @window.empty? @window << @init_gap @last = now else gap = now - @last @window << (gap * 1e6).to_i @window.shift if @window.length > SAMPLE_SIZE @last = now end end
def clear
def clear @window.clear @last = 0 end
def hard_timeout?(now)
def hard_timeout?(now) now - @last > @hard_timeout end
def initialize(heartbeat_interval, hard_timeout, init_last)
def initialize(heartbeat_interval, hard_timeout, init_last) @heartbeat_interval = heartbeat_interval @last = init_last @hard_timeout = hard_timeout # microsec @init_gap = (heartbeat_interval * 1e6).to_i @window = [@init_gap] end
def phi(now)
def phi(now) size = @window.size return 0.0 if size == 0 # Calculate weighted moving average mean_usec = 0 fact = 0 @window.each_with_index {|gap,i| mean_usec += gap * (1+i) fact += (1+i) } mean_usec = mean_usec / fact # Normalize arrive intervals into 1sec mean = (mean_usec.to_f / 1e6) - @heartbeat_interval + 1 # Calculate phi of the phi accrual failure detector t = now - @last phi = PHI_FACTOR * t / mean return phi end
def sample_size
def sample_size @window.size end