class Airbrake::TDigest
rubocop:disable Metrics/ClassLength
@since v3.2.0
@api private
@see github.com/castle/tdigest<br>@see github.com/tdunning/t-digest<br><br>Castle’s version). Our backend does not permit little endian.
The difference is that we pack with Big Endian (unlike Native Endian in
custom modifications. Huge thanks to Castle for the implementation :beer:
This implementation is imported from github.com/castle/tdigest with
Ruby implementation of Ted Dunning’s t-digest data structure.
def self.from_bytes(bytes)
rubocop:disable Metrics/PerceivedComplexity, Metrics/MethodLength
def self.from_bytes(bytes) format, compression, size = bytes.unpack('NGN') tdigest = new(1 / compression) start_idx = 16 # after header case format when VERBOSE_ENCODING array = bytes[start_idx..-1].unpack("G#{size}N#{size}") means, counts = array.each_slice(size).to_a if array.any? when SMALL_ENCODING means = bytes[start_idx..(start_idx + 4 * size)].unpack("g#{size}") # Decode delta encoding of means x = 0 means.map! do |m| m += x x = m m end counts_bytes = bytes[(start_idx + 4 * size)..-1].unpack('C*') counts = [] # Decode variable length integer bytes size.times do v = counts_bytes.shift z = 0x7f & v shift = 7 while (v & 0x80) != 0 raise 'Shift too large in decode' if shift > 28 v = counts_bytes.shift || 0 z += (v & 0x7f) << shift shift += 7 end counts << z end # This shouldn't happen raise 'Mismatch' unless counts.size == means.size else raise 'Unknown compression format' end means.zip(counts).each { |val| tdigest.push(val[0], val[1]) } if means && counts tdigest end
def self.from_json(array)
def self.from_json(array) tdigest = new # Handle both string and symbol keys array.each { |a| tdigest.push(a['m'] || a[:m], a['n'] || a[:n]) } tdigest end
def +(other)
def +(other) # Uses delta, k and cx from the caller t = self.class.new(@delta, @k, @cx) data = centroids.values + other.centroids.values t.push_centroid(data.delete_at(rand(data.length))) while data.any? t end
def _add_weight(centroid, x, n)
def _add_weight(centroid, x, n) unless x == centroid.mean centroid.mean += n * (x - centroid.mean) / (centroid.n + n) end _cumulate(false, true) if centroid.mean_cumn.nil? centroid.cumn += n centroid.mean_cumn += n / 2.0 centroid.n += n end
def _cumulate(exact = false, force = false)
def _cumulate(exact = false, force = false) unless force factor = if @last_cumulate == 0 Float::INFINITY else (@size.to_f / @last_cumulate) end return if @size == @last_cumulate || (!exact && @cx && @cx > factor) end cumn = 0 @centroids.each_value do |c| c.mean_cumn = cumn + c.n / 2.0 cumn = c.cumn = cumn + c.n end @size = @last_cumulate = cumn nil end
def _digest(x, n)
rubocop:disable Metrics/PerceivedComplexity, Metrics/CyclomaticComplexity
def _digest(x, n) # Use 'first' and 'last' instead of min/max because of performance reasons # This works because RBTree is sorted min = min.last if (min = @centroids.first) max = max.last if (max = @centroids.last) nearest = find_nearest(x) @size += n if nearest && nearest.mean == x _add_weight(nearest, x, n) elsif nearest == min @centroids[x] = Centroid.new(x, n, 0) elsif nearest == max @centroids[x] = Centroid.new(x, n, @size) else p = nearest.mean_cumn.to_f / @size max_n = (4 * @size * @delta * p * (1 - p)).floor if max_n - nearest.n >= n _add_weight(nearest, x, n) else @centroids[x] = Centroid.new(x, n, nearest.cumn) end end _cumulate(false) # If the number of centroids has grown to a very large size, # it may be due to values being inserted in sorted order. # We combat that by replaying the centroids in random order, # which is what compress! does compress! if @centroids.size > (@k / @delta) nil end
def as_bytes
def as_bytes # compression as defined by Java implementation size = @centroids.size output = [VERBOSE_ENCODING, compression, size] output += @centroids.each_value.map(&:mean) output += @centroids.each_value.map(&:n) output.pack("NGNG#{size}N#{size}") end
def as_json(_ = nil)
def as_json(_ = nil) @centroids.each_value.map(&:as_json) end
def as_small_bytes
def as_small_bytes size = @centroids.size output = [self.class::SMALL_ENCODING, compression, size] x = 0 # delta encoding allows saving 4-bytes floats mean_arr = @centroids.each_value.map do |c| val = c.mean - x x = c.mean val end output += mean_arr # Variable length encoding of numbers c_arr = @centroids.each_value.each_with_object([]) do |c, arr| k = 0 n = c.n while n < 0 || n > 0x7f b = 0x80 | (0x7f & n) arr << b n = n >> 7 k += 1 raise 'Unreasonable large number' if k > 6 end arr << n end output += c_arr output.pack("NGNg#{size}C#{size}") end
def bound_mean(x)
def bound_mean(x) upper = @centroids.upper_bound(x) lower = @centroids.lower_bound(x) [lower[1], upper[1]] end
def bound_mean_cumn(cumn)
def bound_mean_cumn(cumn) last_c = nil bounds = [] @centroids.each_value do |v| if v.mean_cumn == cumn bounds << v break elsif v.mean_cumn > cumn bounds << last_c bounds << v break else last_c = v end end # If still no results, pick lagging value if any bounds << last_c if bounds.empty? && !last_c.nil? bounds end
def compress!
def compress! points = to_a reset! push_centroid(points.shuffle) _cumulate(true, true) nil end
def compression
def compression 1 / @delta end
def find_nearest(x)
def find_nearest(x) return if size == 0 upper_key, upper = @centroids.upper_bound(x) lower_key, lower = @centroids.lower_bound(x) return lower unless upper_key return upper unless lower_key if (lower_key - x).abs < (upper_key - x).abs lower else upper end end
def initialize(delta = 0.01, k = 25, cx = 1.1)
def initialize(delta = 0.01, k = 25, cx = 1.1) @delta = delta @k = k @cx = cx @centroids = RBTree.new @size = 0 @last_cumulate = 0 end
def merge!(other)
def merge!(other) push_centroid(other.centroids.values.shuffle) self end
def p_rank(x)
rubocop:disable Metrics/PerceivedComplexity, Metrics/AbcSize
def p_rank(x) is_array = x.is_a? Array x = [x] unless is_array min = @centroids.first max = @centroids.last x.map! do |item| if size == 0 nil elsif item < min[1].mean 0.0 elsif item > max[1].mean 1.0 else _cumulate(true) bound = bound_mean(item) lower, upper = bound mean_cumn = lower.mean_cumn if lower != upper mean_cumn += (item - lower.mean) * (upper.mean_cumn - lower.mean_cumn) \ / (upper.mean - lower.mean) end mean_cumn / @size end end is_array ? x : x.first end
def percentile(p)
rubocop:disable Metrics/PerceivedComplexity, Metrics/CyclomaticComplexity
def percentile(p) is_array = p.is_a? Array p = [p] unless is_array p.map! do |item| unless (0..1).cover?(item) raise ArgumentError, "p should be in [0,1], got #{item}" end if size == 0 nil else _cumulate(true) h = @size * item lower, upper = bound_mean_cumn(h) if lower.nil? && upper.nil? nil elsif upper == lower || lower.nil? || upper.nil? (lower || upper).mean elsif h == lower.mean_cumn lower.mean else upper.mean end end end is_array ? p : p.first end
def push(x, n = 1)
def push(x, n = 1) x = [x] unless x.is_a? Array x.each { |value| _digest(value, n) } end
def push_centroid(c)
def push_centroid(c) c = [c] unless c.is_a? Array c.each { |centroid| _digest(centroid.mean, centroid.n) } end
def reset!
def reset! @centroids.clear @size = 0 @last_cumulate = 0 end
def to_a
def to_a @centroids.each_value.to_a end