class Fluent::Counter::Store
def self.gen_key(scope, key)
def self.gen_key(scope, key) "#{scope}\t#{key}" end
def build_response(d)
def build_response(d) { 'name' => d['name'], 'total' => d['total'], 'current' => d['current'], 'type' => d['type'], 'reset_interval' => d['reset_interval'], 'last_reset_at' => EventTime.new(*d['last_reset_at']), } end
def build_value(data)
value is Hash. value requires these fileds.
def build_value(data) type = data['type'] || 'numeric' now = EventTime.now t = [now.sec, now.nsec] v = initial_value(type) data.merge( 'type' => type, 'last_reset_at' => t, 'last_modified_at' => t, 'current' => v, 'total' => v, ) end
def delete(key)
def delete(key) ret = @storage.delete(key) or raise UnknownKey.new("`#{key}` doesn't exist in counter") build_response(ret) end
def get(key, raise_error: false, raw: false)
def get(key, raise_error: false, raw: false) ret = if raise_error @storage.get(key) or raise UnknownKey.new("`#{key}` doesn't exist in counter") else @storage.get(key) end if raw ret else ret && build_response(ret) end end
def inc(key, data, force: false)
def inc(key, data, force: false) value = data.delete('value') init(key, data) if !key?(key) && force v = get(key, raise_error: true, raw: true) valid_type!(v, value) v['total'] += value v['current'] += value t = EventTime.now v['last_modified_at'] = [t.sec, t.nsec] @storage.put(key, v) build_response(v) end
def init(key, data, ignore: false)
def init(key, data, ignore: false) ret = if v = get(key) raise InvalidParams.new("#{key} already exists in counter") unless ignore v else @storage.put(key, build_value(data)) end build_response(ret) end
def initial_value(type)
def initial_value(type) case type when 'numeric', 'integer' then 0 when 'float' then 0.0 else raise InvalidParams.new('`type` should be integer, float, or numeric') end end
def initialize(opt = {})
def initialize(opt = {}) @log = opt[:log] || $log # Notice: This storage is not be implemented auto save. @storage = Plugin.new_storage('local', parent: DummyParent.new(@log)) conf = if opt[:path] {'persistent' => true, 'path' => opt[:path] } else {'persistent' => false } end @storage.configure(Fluent::Config::Element.new('storage', {}, conf, [])) end
def key?(key)
def key?(key) !!@storage.get(key) end
def reset(key)
def reset(key) v = get(key, raise_error: true, raw: true) success = false old_data = v.dup now = EventTime.now last_reset_at = EventTime.new(*v['last_reset_at']) # Does it need reset? if (last_reset_at + v['reset_interval']) <= now success = true v['current'] = initial_value(v['type']) t = [now.sec, now.nsec] v['last_reset_at'] = t v['last_modified_at'] = t @storage.put(key, v) end { 'elapsed_time' => now - last_reset_at, 'success' => success, 'counter_data' => build_response(old_data) } end
def start
def start @storage.load end
def stop
def stop @storage.save end
def type_str(v)
def type_str(v) case v when Integer 'integer' when Float 'float' when Numeric 'numeric' else raise InvalidParams.new("`type` should be integer, float, or numeric") end end
def valid_type!(v, value)
def valid_type!(v, value) type = v['type'] return unless (type != 'numeric') && (type_str(value) != type) raise InvalidParams.new("`type` is #{type}. You should pass #{type} value as a `value`") end