class Fluent::StreamInput

obsolete

def initialize

def initialize
  require 'socket'
  require 'yajl'
  super
end

def on_message(msg)

}
3: object record
2: long? time
1: string tag
message Message {

}
2: raw entries # msgpack stream of Entry
1: string tag
message PackedForward {

}
2: list entries
1: string tag
message Forward {

}
2: object record
1: long time
message Entry {
def on_message(msg)
  # TODO format error
  tag = msg[0].to_s
  entries = msg[1]
  if entries.class == String
    # PackedForward
    es = MessagePackEventStream.new(entries)
    router.emit_stream(tag, es)
  elsif entries.class == Array
    # Forward
    es = MultiEventStream.new
    entries.each {|e|
      record = e[1]
      next if record.nil?
      time = e[0]
      time = (now ||= Engine.now) if time.to_i == 0
      es.add(time, record)
    }
    router.emit_stream(tag, es)
  else
    # Message
    record = msg[2]
    return if record.nil?
    time = msg[1]
    time = Engine.now if time.to_i == 0
    router.emit(tag, time, record)
  end
end

def run

def run
  @loop.run(@blocking_timeout)
rescue
  log.error "unexpected error", error: $!.to_s
  log.error_backtrace
end

def shutdown

def shutdown
  @loop.watchers.each {|w| w.detach }
  @loop.stop
  @lsock.close
  @thread.join
  super
end

def start

def start
  super
  @loop = Coolio::Loop.new
  @lsock = listen
  @loop.attach(@lsock)
  @thread = Thread.new(&method(:run))
end