class Fluent::Plugin::Output

def flush_thread_run(state)

def flush_thread_run(state)
  flush_thread_interval = @buffer_config.flush_thread_interval
  state.next_clock = Fluent::Clock.now + flush_thread_interval
  while !self.after_started? && !self.stopped?
    sleep 0.5
  end
  log.debug "flush_thread actually running"
  state.mutex.lock
  begin
    # This thread don't use `thread_current_running?` because this thread should run in `before_shutdown` phase
    while @output_flush_threads_running
      current_clock = Fluent::Clock.now
      next_retry_time = nil
      @retry_mutex.synchronize do
        next_retry_time = @retry ? @retry.next_time : nil
      end
      if state.next_clock > current_clock
        interval = state.next_clock - current_clock
      elsif next_retry_time && next_retry_time > Time.now
        interval = next_retry_time.to_f - Time.now.to_f
      else
        state.mutex.unlock
        begin
          try_flush
          # next_flush_time uses flush_thread_interval or flush_thread_burst_interval (or retrying)
          interval = next_flush_time.to_f - Time.now.to_f
          # TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected
          #       because @retry still exists (#commit_write is not called yet in #try_flush)
          #       @retry should be cleared if delayed commit is enabled? Or any other solution?
          state.next_clock = Fluent::Clock.now + interval
        ensure
          state.mutex.lock
        end
      end
      if @dequeued_chunks_mutex.synchronize{ !@dequeued_chunks.empty? && @dequeued_chunks.first.expired? }
        unless @output_flush_interrupted
          state.mutex.unlock
          begin
            try_rollback_write
          ensure
            state.mutex.lock
          end
        end
      end
      state.cond_var.wait(state.mutex, interval) if interval > 0
    end
  rescue => e
    # normal errors are rescued by output plugins in #try_flush
    # so this rescue section is for critical & unrecoverable errors
    log.error "error on output thread", error: e
    log.error_backtrace
    raise
  ensure
    state.mutex.unlock
  end
end