lib/fluent/counter/store.rb



#
# Fluentd
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#

require 'fluent/config'
require 'fluent/counter/error'
require 'fluent/plugin/storage_local'
require 'fluent/time'

module Fluent
  module Counter
    class Store
      def self.gen_key(scope, key)
        "#{scope}\t#{key}"
      end

      def initialize(opt = {})
        @log = opt[:log] || $log

        # Notice: This storage is not be implemented auto save.
        @storage = Plugin.new_storage('local', parent: DummyParent.new(@log))
        conf = if opt[:path]
                 {'persistent' => true, 'path' => opt[:path] }
               else
                 {'persistent' => false }
               end
        @storage.configure(Fluent::Config::Element.new('storage', {}, conf, []))
      end

      # This class behaves as a configurable plugin for using in storage (OwnedByMixin).
      class DummyParent
        include Configurable

        attr_reader :log

        def initialize(log)
          @log = log
        end

        def plugin_id
          'dummy_parent_store'
        end

        def plugin_id_configured?
          false
        end

        # storage_local calls PluginId#plugin_root_dir
        def plugin_root_dir
          nil
        end
      end

      def start
        @storage.load
      end

      def stop
        @storage.save
      end

      def init(key, data, ignore: false)
        ret = if v = get(key)
                raise InvalidParams.new("#{key} already exists in counter") unless ignore
                v
              else
                @storage.put(key, build_value(data))
              end

        build_response(ret)
      end

      def get(key, raise_error: false, raw: false)
        ret = if raise_error
                @storage.get(key) or raise UnknownKey.new("`#{key}` doesn't exist in counter")
              else
                @storage.get(key)
              end
        if raw
          ret
        else
          ret && build_response(ret)
        end
      end

      def key?(key)
        !!@storage.get(key)
      end

      def delete(key)
        ret = @storage.delete(key) or raise UnknownKey.new("`#{key}` doesn't exist in counter")
        build_response(ret)
      end

      def inc(key, data, force: false)
        value = data.delete('value')
        init(key, data) if !key?(key) && force
        v = get(key, raise_error: true, raw: true)
        valid_type!(v, value)

        v['total'] += value
        v['current'] += value
        t = EventTime.now
        v['last_modified_at'] = [t.sec, t.nsec]
        @storage.put(key, v)

        build_response(v)
      end

      def reset(key)
        v = get(key, raise_error: true, raw: true)
        success = false
        old_data = v.dup
        now = EventTime.now
        last_reset_at = EventTime.new(*v['last_reset_at'])

        #  Does it need reset?
        if (last_reset_at + v['reset_interval']) <= now
          success = true
          v['current'] = initial_value(v['type'])
          t = [now.sec, now.nsec]
          v['last_reset_at'] = t
          v['last_modified_at'] = t
          @storage.put(key, v)
        end

        {
          'elapsed_time' => now - last_reset_at,
          'success' => success,
          'counter_data' => build_response(old_data)
        }
      end

      private

      def build_response(d)
        {
          'name' => d['name'],
          'total' => d['total'],
          'current' => d['current'],
          'type' => d['type'],
          'reset_interval' => d['reset_interval'],
          'last_reset_at' => EventTime.new(*d['last_reset_at']),
        }
      end

      # value is Hash. value requires these fileds.
      # :name, :total, :current, :type, :reset_interval, :last_reset_at, :last_modified_at
      def build_value(data)
        type = data['type'] || 'numeric'
        now = EventTime.now
        t = [now.sec, now.nsec]

        v = initial_value(type)

        data.merge(
          'type' => type,
          'last_reset_at' => t,
          'last_modified_at' => t,
          'current' => v,
          'total' => v,
        )
      end

      def initial_value(type)
        case type
        when 'numeric', 'integer' then 0
        when 'float' then 0.0
        else raise InvalidParams.new('`type` should be integer, float, or numeric')
        end
      end

      def valid_type!(v, value)
        type = v['type']
        return unless (type != 'numeric') && (type_str(value) != type)
        raise InvalidParams.new("`type` is #{type}. You should pass #{type} value as a `value`")
      end

      def type_str(v)
        case v
        when Integer
          'integer'
        when Float
          'float'
        when Numeric
          'numeric'
        else
          raise InvalidParams.new("`type` should be integer, float, or numeric")
        end
      end
    end
  end
end