class Fluent::BufferedOutput
def before_shutdown
def before_shutdown begin @buffer.before_shutdown(self) rescue $log.warn "before_shutdown failed", :error=>$!.to_s $log.warn_backtrace end end
def calc_retry_wait
def calc_retry_wait # TODO retry pattern if @error_history.size <= @retry_limit @retry_wait * (2 ** (@error_history.size-1)) else # secondary retry @retry_wait * (2 ** (@error_history.size-2-@retry_limit)) end end
def configure(conf)
def configure(conf) super @buffer = Plugin.new_buffer(@buffer_type) @buffer.configure(conf) if @buffer.respond_to?(:enable_parallel) if @num_threads == 1 @buffer.enable_parallel(false) else @buffer.enable_parallel(true) end end @writers = (1..@num_threads).map { writer = OutputThread.new(self) writer.configure(conf) writer } if sconf = conf.elements.select {|e| e.name == 'secondary' }.first type = sconf['type'] || conf['type'] @secondary = Plugin.new_output(type) @secondary.configure(sconf) if secondary_limit = conf['secondary_limit'] @secondary_limit = secondary_limit.to_i if @secondary_limit < 0 raise ConfigError, "invalid parameter 'secondary_limit #{secondary_limit}'" end end @secondary.secondary_init(self) end Status.register(self, "queue_size") { @buffer.queue_size } Status.register(self, "emit_count") { @emit_count } end
def emit(tag, es, chain, key="")
def emit(tag, es, chain, key="") @emit_count += 1 data = format_stream(tag, es) if @buffer.emit(key, data, chain) submit_flush end end
def enqueue_buffer
def enqueue_buffer @buffer.keys.each {|key| @buffer.push(key) } end
def flush_secondary(secondary)
def flush_secondary(secondary) @buffer.pop(secondary) end
def force_flush
def force_flush enqueue_buffer submit_flush end
def format_stream(tag, es)
def format_stream(tag, es) out = '' es.each {|time,record| out << format(tag, time, record) } out end
def initialize
def initialize super @next_flush_time = 0 @last_retry_time = 0 @next_retry_time = 0 @error_history = [] @error_history.extend(MonitorMixin) @secondary_limit = 8 @emit_count = 0 end
def shutdown
def shutdown @writers.each {|writer| writer.shutdown } @secondary.shutdown if @secondary @buffer.shutdown end
def start
def start @next_flush_time = Engine.now + @flush_interval @buffer.start @secondary.start if @secondary @writers.each {|writer| writer.start } end
def submit_flush
def submit_flush # TODO roundrobin? @writers.first.submit_flush end
def try_flush
def try_flush time = Engine.now empty = @buffer.queue_size == 0 if empty && @next_flush_time < (now = Engine.now) @buffer.synchronize do if @next_flush_time < now enqueue_buffer @next_flush_time = now + @flush_interval empty = @buffer.queue_size == 0 end end end if empty return time + 1 # TODO 1 end begin retrying = !@error_history.empty? if retrying @error_history.synchronize do if retrying = !@error_history.empty? # re-check in synchronize if @next_retry_time >= time # allow retrying for only one thread return time + 1 # TODO 1 end # assume next retry failes and # clear them if when it succeeds @last_retry_time = time @error_history << time @next_retry_time += calc_retry_wait end end end if @secondary && @error_history.size > @retry_limit has_next = flush_secondary(@secondary) else has_next = @buffer.pop(self) end # success if retrying @error_history.clear # Note: don't notify to other threads to prevent # burst to recovered server $log.warn "retry succeeded.", :instance=>object_id end if has_next return time # call try_flush soon else return time + 1 # TODO 1 end rescue => e if retrying error_count = @error_history.size else # first error error_count = 0 @error_history.synchronize do if @error_history.empty? @last_retry_time = time @error_history << time @next_retry_time = time + calc_retry_wait end end end if error_count < @retry_limit $log.warn "failed to flush the buffer, retrying.", :error=>e.to_s, :instance=>object_id $log.warn_backtrace e.backtrace elsif @secondary if error_count == @retry_limit $log.warn "failed to flush the buffer.", :error=>e.to_s, :instance=>object_id $log.warn "retry count exceededs limit. falling back to secondary output." $log.warn_backtrace e.backtrace retry # retry immediately elsif error_count <= @retry_limit + @secondary_limit $log.warn "failed to flush the buffer, retrying secondary.", :error=>e.to_s, :instance=>object_id $log.warn_backtrace e.backtrace else $log.warn "failed to flush the buffer.", :error=>e.to_s, :instance=>object_id $log.warn "secondary retry count exceededs limit." $log.warn_backtrace e.backtrace write_abort @error_history.clear end else $log.warn "failed to flush the buffer.", :error=>e.to_s, :instance=>object_id $log.warn "retry count exceededs limit." $log.warn_backtrace e.backtrace write_abort @error_history.clear end return @next_retry_time end end
def write_abort
def write_abort $log.error "throwing away old logs." begin @buffer.clear! rescue $log.error "unexpected error while aborting", :error=>$!.to_s $log.error_backtrace end end