class Fluent::BufferedOutput

def before_shutdown

def before_shutdown
  begin
    @buffer.before_shutdown(self)
  rescue
    $log.warn "before_shutdown failed", :error=>$!.to_s
    $log.warn_backtrace
  end
end

def calc_retry_wait

def calc_retry_wait
  # TODO retry pattern
  if @error_history.size <= @retry_limit
    @retry_wait * (2 ** (@error_history.size-1))
  else
    # secondary retry
    @retry_wait * (2 ** (@error_history.size-2-@retry_limit))
  end
end

def configure(conf)

def configure(conf)
  super
  @buffer = Plugin.new_buffer(@buffer_type)
  @buffer.configure(conf)
  if @buffer.respond_to?(:enable_parallel)
    if @num_threads == 1
      @buffer.enable_parallel(false)
    else
      @buffer.enable_parallel(true)
    end
  end
  @writers = (1..@num_threads).map {
    writer = OutputThread.new(self)
    writer.configure(conf)
    writer
  }
  if sconf = conf.elements.select {|e| e.name == 'secondary' }.first
    type = sconf['type'] || conf['type']
    @secondary = Plugin.new_output(type)
    @secondary.configure(sconf)
    if secondary_limit = conf['secondary_limit']
      @secondary_limit = secondary_limit.to_i
      if @secondary_limit < 0
        raise ConfigError, "invalid parameter 'secondary_limit #{secondary_limit}'"
      end
    end
    @secondary.secondary_init(self)
  end
  Status.register(self, "queue_size") { @buffer.queue_size }
  Status.register(self, "emit_count") { @emit_count }
end

def emit(tag, es, chain, key="")

def emit(tag, es, chain, key="")
  @emit_count += 1
  data = format_stream(tag, es)
  if @buffer.emit(key, data, chain)
    submit_flush
  end
end

def enqueue_buffer

def enqueue_buffer
  @buffer.keys.each {|key|
    @buffer.push(key)
  }
end

def flush_secondary(secondary)

def flush_secondary(secondary)
  @buffer.pop(secondary)
end

def force_flush

def force_flush
  enqueue_buffer
  submit_flush
end

def format_stream(tag, es)

def format_stream(tag, es)
  out = ''
  es.each {|time,record|
    out << format(tag, time, record)
  }
  out
end

def initialize

def initialize
  super
  @next_flush_time = 0
  @last_retry_time = 0
  @next_retry_time = 0
  @error_history = []
  @error_history.extend(MonitorMixin)
  @secondary_limit = 8
  @emit_count = 0
end

def shutdown

def shutdown
  @writers.each {|writer| writer.shutdown }
  @secondary.shutdown if @secondary
  @buffer.shutdown
end

def start

def start
  @next_flush_time = Engine.now + @flush_interval
  @buffer.start
  @secondary.start if @secondary
  @writers.each {|writer| writer.start }
end

def submit_flush

def submit_flush
  # TODO roundrobin?
  @writers.first.submit_flush
end

def try_flush

def try_flush
  time = Engine.now
  empty = @buffer.queue_size == 0
  if empty && @next_flush_time < (now = Engine.now)
    @buffer.synchronize do
      if @next_flush_time < now
        enqueue_buffer
        @next_flush_time = now + @flush_interval
        empty = @buffer.queue_size == 0
      end
    end
  end
  if empty
    return time + 1  # TODO 1
  end
  begin
    retrying = !@error_history.empty?
    if retrying
      @error_history.synchronize do
        if retrying = !@error_history.empty?  # re-check in synchronize
          if @next_retry_time >= time
            # allow retrying for only one thread
            return time + 1  # TODO 1
          end
          # assume next retry failes and
          # clear them if when it succeeds
          @last_retry_time = time
          @error_history << time
          @next_retry_time += calc_retry_wait
        end
      end
    end
    if @secondary && @error_history.size > @retry_limit
      has_next = flush_secondary(@secondary)
    else
      has_next = @buffer.pop(self)
    end
    # success
    if retrying
      @error_history.clear
      # Note: don't notify to other threads to prevent
      #       burst to recovered server
      $log.warn "retry succeeded.", :instance=>object_id
    end
    if has_next
      return time  # call try_flush soon
    else
      return time + 1  # TODO 1
    end
  rescue => e
    if retrying
      error_count = @error_history.size
    else
      # first error
      error_count = 0
      @error_history.synchronize do
        if @error_history.empty?
          @last_retry_time = time
          @error_history << time
          @next_retry_time = time + calc_retry_wait
        end
      end
    end
    if error_count < @retry_limit
      $log.warn "failed to flush the buffer, retrying.", :error=>e.to_s, :instance=>object_id
      $log.warn_backtrace e.backtrace
    elsif @secondary
      if error_count == @retry_limit
        $log.warn "failed to flush the buffer.", :error=>e.to_s, :instance=>object_id
        $log.warn "retry count exceededs limit. falling back to secondary output."
        $log.warn_backtrace e.backtrace
        retry  # retry immediately
      elsif error_count <= @retry_limit + @secondary_limit
        $log.warn "failed to flush the buffer, retrying secondary.", :error=>e.to_s, :instance=>object_id
        $log.warn_backtrace e.backtrace
      else
        $log.warn "failed to flush the buffer.", :error=>e.to_s, :instance=>object_id
        $log.warn "secondary retry count exceededs limit."
        $log.warn_backtrace e.backtrace
        write_abort
        @error_history.clear
      end
    else
      $log.warn "failed to flush the buffer.", :error=>e.to_s, :instance=>object_id
      $log.warn "retry count exceededs limit."
      $log.warn_backtrace e.backtrace
      write_abort
      @error_history.clear
    end
    return @next_retry_time
  end
end

def write_abort

def write_abort
  $log.error "throwing away old logs."
  begin
    @buffer.clear!
  rescue
    $log.error "unexpected error while aborting", :error=>$!.to_s
    $log.error_backtrace
  end
end