class Fluent::Counter::Client
def build_request(method, id, scope = nil, params = nil, options = nil)
def build_request(method, id, scope = nil, params = nil, options = nil) r = { id: id, method: method } r[:scope] = scope if scope r[:params] = params if params r[:options] = options if options r end
def delete(*params, options: {})
def delete(*params, options: {}) exist_scope! res = send_request('delete', @scope, params, options) if block_given? Thread.start do yield res.get end else res end end
def establish(scope)
def establish(scope) scope = Timeout.timeout(@timeout) { response = send_request('establish', nil, [scope]) Fluent::Counter.raise_error(response.errors.first) if response.errors? data = response.data data.first } @scope = scope rescue Timeout::Error raise "Can't establish the connection to counter server due to timeout" end
def exist_scope!
def exist_scope! raise 'Call `establish` method to get a `scope` before calling this method' unless @scope end
def generate_id
def generate_id id = 0 @id_mutex.synchronize do id = @id @id += 1 @id = 0 if ID_LIMIT_COUNT < @id end id end
def get(*params, options: {})
def get(*params, options: {}) exist_scope! res = send_request('get', @scope, params, options) if block_given? Thread.start do yield res.get end else res end end
def inc(params, options: {})
3. inc([{ name: 'name1',value: 20 }, { name: 'name2',value: 20 }])
2. inc({ name: 'name',value: 20 }, options: {})
1. inc(name: 'name')
`inc` receives various arguments.
=== Example
def inc(params, options: {}) exist_scope! params = [params] unless params.is_a?(Array) res = send_request('inc', @scope, params, options) if block_given? Thread.start do yield res.get end else res end end
def init(params, options: {})
4. init([{ name: 'name1',reset_interval: 20 }, { name: 'name2',reset_interval: 20 }], options: {})
3. init([{ name: 'name1',reset_interval: 20 }, { name: 'name2',reset_interval: 20 }])
2. init({ name: 'name',reset_interval: 20 }, options: {})
1. init(name: 'name')
`init` receives various arguments.
=== Example
def init(params, options: {}) exist_scope! params = [params] unless params.is_a?(Array) res = send_request('init', @scope, params, options) # if `async` is false or missing, block at this method and return a Future::Result object. if block_given? Thread.start do yield res.get end else res end end
def initialize(loop = nil, opt = {})
def initialize(loop = nil, opt = {}) @loop = loop || Coolio::Loop.new @port = opt[:port] || DEFAULT_PORT @host = opt[:host] || DEFAULT_ADDR @log = opt[:log] || $log @timeout = opt[:timeout] || DEFAULT_TIMEOUT @conn = Connection.connect(@host, @port, method(:on_message)) @responses = {} @id = 0 @id_mutex = Mutex.new @loop_mutex = Mutex.new end
def on_message(data)
def on_message(data) if response = @responses.delete(data['id']) response.set(data) else @log.warn("Receiving missing id data: #{data}") end end
def reset(*params, options: {})
def reset(*params, options: {}) exist_scope! res = send_request('reset', @scope, params, options) if block_given? Thread.start do yield res.get end else res end end
def send_request(method, scope, params, opt = {})
def send_request(method, scope, params, opt = {}) id = generate_id res = Future.new(@loop, @loop_mutex) @responses[id] = res # set a response value to this future object at `on_message` request = build_request(method, id, scope, params, opt) @log.debug(request) @conn.send_data request res end
def start
def start @loop.attach(@conn) @log.debug("starting counter client: #{@host}:#{@port}") self rescue => e if @log @log.error e else STDERR.puts e end end
def stop
def stop @conn.close @log.debug("calling stop in counter client: #{@host}:#{@port}") end