class Sidekiq::Metrics::ExecutionTracker
def flush(time = Time.now)
def flush(time = Time.now) totals, jobs, grams = reset procd = totals["p"] fails = totals["f"] return if procd == 0 && fails == 0 now = time.utc # nowdate = now.strftime("%Y%m%d") # nowhour = now.strftime("%Y%m%d|%-H") nowmin = now.strftime("%Y%m%d|%-H:%-M") count = 0 redis do |conn| # persist fine-grained histogram data if grams.size > 0 conn.pipelined do |pipe| grams.each do |_, gram| gram.persist(pipe, now) end end end # persist coarse grained execution count + execution millis. # note as of today we don't use or do anything with the # daily or hourly rollups. [ # ["j", jobs, nowdate, LONG_TERM], # ["j", jobs, nowhour, MID_TERM], ["j", jobs, nowmin, SHORT_TERM] ].each do |prefix, data, bucket, ttl| conn.pipelined do |xa| stats = "#{prefix}|#{bucket}" data.each_pair do |key, value| xa.hincrby stats, key, value count += 1 end xa.expire(stats, ttl) end end logger.debug "Flushed #{count} metrics" count end end
def initialize(config)
def initialize(config) @config = config @jobs = Hash.new(0) @totals = Hash.new(0) @grams = Hash.new { |hash, key| hash[key] = Histogram.new(key) } @lock = Mutex.new end
def reset
def reset @lock.synchronize { array = [@totals, @jobs, @grams] @totals = Hash.new(0) @jobs = Hash.new(0) @grams = Hash.new { |hash, key| hash[key] = Histogram.new(key) } array } end
def track(queue, klass)
def track(queue, klass) start = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :millisecond) time_ms = 0 begin begin yield ensure finish = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :millisecond) time_ms = finish - start end # We don't track time for failed jobs as they can have very unpredictable # execution times. more important to know average time for successful jobs so we # can better recognize when a perf regression is introduced. @lock.synchronize { @grams[klass].record_time(time_ms) @jobs["#{klass}|ms"] += time_ms @totals["ms"] += time_ms } rescue Exception @lock.synchronize { @jobs["#{klass}|f"] += 1 @totals["f"] += 1 } raise ensure @lock.synchronize { @jobs["#{klass}|p"] += 1 @totals["p"] += 1 } end end