class Fluent::TimeSlicedOutput
def configure(conf)
def configure(conf) super # TODO timezone if conf['utc'] @localtime = false elsif conf['localtime'] @localtime = true end if @localtime @time_slicer = Proc.new {|time| Time.at(time).strftime(@time_slice_format) } else @time_slicer = Proc.new {|time| Time.at(time).utc.strftime(@time_slice_format) } end @time_slice_cache_interval = time_slice_cache_interval @before_tc = nil @before_key = nil if @flush_interval if conf['time_slice_wait'] $log.warn "time_slice_wait is ignored if flush_interval is specified: #{conf}" end @enqueue_buffer_proc = Proc.new do @buffer.keys.each {|key| @buffer.push(key) } end else @flush_interval = [60, @time_slice_cache_interval].min @enqueue_buffer_proc = Proc.new do nowslice = @time_slicer.call(Engine.now.to_i - @time_slice_wait) @buffer.keys.each {|key| if key < nowslice @buffer.push(key) end } end end end
def emit(tag, es, chain)
def emit(tag, es, chain) es.each {|time,record| tc = time / @time_slice_cache_interval if @before_tc == tc key = @before_key else @before_tc = tc key = @time_slicer.call(time) @before_key = key end data = format(tag, time, record) if @buffer.emit(key, data, chain) submit_flush end } end
def enqueue_buffer
def enqueue_buffer @enqueue_buffer_proc.call end
def initialize
def initialize super @localtime = true #@ignore_old = false # TODO end
def time_slice_cache_interval
def time_slice_cache_interval if @time_slicer.call(0) != @time_slicer.call(60-1) return 1 elsif @time_slicer.call(0) != @time_slicer.call(60*60-1) return 30 elsif @time_slicer.call(0) != @time_slicer.call(24*60*60-1) return 60*30 else return 24*60*30 end end