class Fluent::StreamOutput::ReformatWriter
def initialize(secondary)
def initialize(secondary) @secondary = secondary end
def write(chunk)
def write(chunk) chain = NullOutputChain.instance chunk.open {|io| # TODO use MessagePackIoEventStream u = Fluent::Engine.msgpack_factory.unpacker(io) begin u.each {|(tag,entries)| es = MultiEventStream.new entries.each {|o| es.add(o[0], o[1]) } @secondary.emit(tag, es, chain) } rescue EOFError end } end