class Fluent::Counter::Server
def delete(params, scope, options)
def delete(params, scope, options) validator = Fluent::Counter::ArrayValidator.new(:empty, :key) valid_params, errors = validator.call(params) res = Response.new(errors) keys = valid_params.map { |vp| Store.gen_key(scope, vp) } do_delete = lambda do |store, key| begin v = store.delete(key) @log.debug("delete a key: #{key}") res.push_data v rescue => e res.push_error e end end if options['random'] @mutex_hash.synchronize_keys(*keys, &do_delete) else @mutex_hash.synchronize(*keys, &do_delete) end res.to_hash end
def establish(params, _scope, _options)
def establish(params, _scope, _options) validator = Fluent::Counter::ArrayValidator.new(:empty, :scope) valid_params, errors = validator.call(params) res = Response.new(errors) if scope = valid_params.first new_scope = "#{@name}\t#{scope}" res.push_data new_scope @log.debug("Establish new key: #{new_scope}") end res.to_hash end
def get(params, scope, _options)
def get(params, scope, _options) validator = Fluent::Counter::ArrayValidator.new(:empty, :key) valid_params, errors = validator.call(params) res = Response.new(errors) keys = valid_params.map { |vp| Store.gen_key(scope, vp) } keys.each do |key| begin v = @store.get(key, raise_error: true) @log.debug("Get counter value: #{key}") res.push_data v rescue => e res.push_error e end end res.to_hash end
def inc(params, scope, options)
def inc(params, scope, options) validate_param = [:empty, :name, :value] validate_param << :reset_interval if options['force'] validator = Fluent::Counter::HashValidator.new(*validate_param) valid_params, errors = validator.call(params) res = Response.new(errors) key_hash = valid_params.reduce({}) do |acc, vp| acc.merge(Store.gen_key(scope, vp['name']) => vp) end do_inc = lambda do |store, key| begin param = key_hash[key] v = store.inc(key, param, force: options['force']) @log.debug("Increment #{key} by #{param['value']}") res.push_data v rescue => e res.push_error e end end if options['random'] @mutex_hash.synchronize_keys(*(key_hash.keys), &do_inc) else @mutex_hash.synchronize(*(key_hash.keys), &do_inc) end res.to_hash end
def init(params, scope, options)
def init(params, scope, options) validator = Fluent::Counter::HashValidator.new(:empty, :name, :reset_interval) valid_params, errors = validator.call(params) res = Response.new(errors) key_hash = valid_params.reduce({}) do |acc, vp| acc.merge(Store.gen_key(scope, vp['name']) => vp) end do_init = lambda do |store, key| begin param = key_hash[key] v = store.init(key, param, ignore: options['ignore']) @log.debug("Create new key: #{param['name']}") res.push_data v rescue => e res.push_error e end end if options['random'] @mutex_hash.synchronize_keys(*(key_hash.keys), &do_init) else @mutex_hash.synchronize(*(key_hash.keys), &do_init) end res.to_hash end
def initialize(name, opt = {})
def initialize(name, opt = {}) raise 'Counter server name is invalid' unless Validator::VALID_NAME =~ name @name = name @opt = opt @addr = @opt[:addr] || DEFAULT_ADDR @port = @opt[:port] || DEFAULT_PORT @loop = @opt[:loop] || Coolio::Loop.new @log = @opt[:log] || $log @store = Fluent::Counter::Store.new(opt) @mutex_hash = MutexHash.new(@store) @server = Coolio::TCPServer.new(@addr, @port, Handler, method(:on_message)) @thread = nil @running = false end
def on_message(data)
def on_message(data) errors = Validator.request(data) unless errors.empty? return { 'id' => data['id'], 'data' => [], 'errors' => errors } end result = safe_run do send(data['method'], data['params'], data['scope'], data['options']) end result.merge('id' => data['id']) rescue => e @log.error e.to_s end
def reset(params, scope, options)
def reset(params, scope, options) validator = Fluent::Counter::ArrayValidator.new(:empty, :key) valid_params, errors = validator.call(params) res = Response.new(errors) keys = valid_params.map { |vp| Store.gen_key(scope, vp) } do_reset = lambda do |store, key| begin v = store.reset(key) @log.debug("Reset #{key}'s' counter value") res.push_data v rescue => e res.push_error e end end if options['random'] @mutex_hash.synchronize_keys(*keys, &do_reset) else @mutex_hash.synchronize(*keys, &do_reset) end res.to_hash end
def safe_run
def safe_run yield rescue => e { 'errors' => [InternalServerError.new(e).to_hash], 'data' => [] } end
def start
def start @server.attach(@loop) @thread = Thread.new do @running = true @loop.run(0.5) @running = false end @log.debug("starting counter server #{@addr}:#{@port}") @mutex_hash.start self end
def stop
def stop # This `sleep` for a test to wait for a `@loop` to begin to run sleep 0.1 @server.close @loop.stop if @running @mutex_hash.stop @thread.join if @thread @log.debug("calling stop in counter server #{@addr}:#{@port}") end