class Fluent::StreamInput
obsolete
def initialize
def initialize require 'socket' require 'yajl' super end
def on_message(msg)
3: object record
2: long? time
1: string tag
message Message {
}
2: raw entries # msgpack stream of Entry
1: string tag
message PackedForward {
}
2: list
1: string tag
message Forward {
}
2: object record
1: long time
message Entry {
def on_message(msg) # TODO format error tag = msg[0].to_s entries = msg[1] if entries.class == String # PackedForward es = MessagePackEventStream.new(entries) router.emit_stream(tag, es) elsif entries.class == Array # Forward es = MultiEventStream.new entries.each {|e| record = e[1] next if record.nil? time = e[0] time = (now ||= Engine.now) if time.to_i == 0 es.add(time, record) } router.emit_stream(tag, es) else # Message record = msg[2] return if record.nil? time = msg[1] time = Engine.now if time.to_i == 0 router.emit(tag, time, record) end end
def run
def run @loop.run(@blocking_timeout) rescue log.error "unexpected error", error: $!.to_s log.error_backtrace end
def shutdown
def shutdown @loop.watchers.each {|w| w.detach } @loop.stop @lsock.close @thread.join super end
def start
def start super @loop = Coolio::Loop.new @lsock = listen @loop.attach(@lsock) @thread = Thread.new(&method(:run)) end