class Fluent::Plugin::Output
def acts_as_secondary(primary)
def acts_as_secondary(primary) @as_secondary = true @primary_instance = primary @chunk_keys = @primary_instance.chunk_keys || [] @chunk_key_tag = @primary_instance.chunk_key_tag || false if @primary_instance.chunk_key_time @chunk_key_time = @primary_instance.chunk_key_time @timekey_zone = @primary_instance.timekey_zone @output_time_formatter_cache = {} end self.context_router = primary.context_router singleton_class.module_eval do define_method(:commit_write){ |chunk_id| @primary_instance.commit_write(chunk_id, delayed: delayed_commit, secondary: true) } define_method(:rollback_write){ |chunk_id, update_retry: true| @primary_instance.rollback_write(chunk_id, update_retry) } end end
def after_shutdown
def after_shutdown try_rollback_all if @buffering && !@as_secondary # rollback regardless with @delayed_commit, because secondary may do it @secondary.after_shutdown if @secondary if @buffering && @buffer @buffer.after_shutdown @output_flush_threads_running = false if @output_flush_threads && !@output_flush_threads.empty? @output_flush_threads.each do |state| # to wakeup thread and make it to stop by itself state.mutex.synchronize { if state.thread && state.thread.status state.next_clock = 0 state.cond_var.signal end } Thread.pass state.thread.join end end end super end
def after_start
def after_start super @secondary.after_start if @secondary end
def backup_chunk(chunk, using_secondary, delayed_commit)
def backup_chunk(chunk, using_secondary, delayed_commit) if @buffer_config.disable_chunk_backup log.warn "disable_chunk_backup is true. #{dump_unique_id_hex(chunk.unique_id)} chunk is thrown away" else unique_id = dump_unique_id_hex(chunk.unique_id) safe_plugin_id = plugin_id.gsub(/[ "\/\\:;|*<>?]/, '_') backup_base_dir = system_config.root_dir || DEFAULT_BACKUP_DIR backup_file = File.join(backup_base_dir, 'backup', "worker#{fluentd_worker_id}", safe_plugin_id, "#{unique_id}.log") backup_dir = File.dirname(backup_file) log.warn "bad chunk is moved to #{backup_file}" FileUtils.mkdir_p(backup_dir) unless Dir.exist?(backup_dir) File.open(backup_file, 'ab', system_config.file_permission || 0644) { |f| chunk.write_to(f) } end commit_write(chunk.unique_id, secondary: using_secondary, delayed: delayed_commit) end
def before_shutdown
def before_shutdown @secondary.before_shutdown if @secondary if @buffering && @buffer if @flush_at_shutdown force_flush end @buffer.before_shutdown # Need to ensure to stop enqueueing ... after #shutdown, we cannot write any data @output_enqueue_thread_running = false if @output_enqueue_thread && @output_enqueue_thread.alive? @output_enqueue_thread.wakeup @output_enqueue_thread.join end end super end
def calculate_timekey(time)
def calculate_timekey(time) time_int = time.to_i if @timekey_use_utc (time_int - (time_int % @timekey)).to_i else offset = @calculate_offset ? @calculate_offset.call(time) : @offset (time_int - ((time_int + offset)% @timekey)).to_i end end
def check_slow_flush(start)
def check_slow_flush(start) elapsed_time = Fluent::Clock.now - start elapsed_millsec = (elapsed_time * 1000).to_i @counters_monitor.synchronize { @flush_time_count += elapsed_millsec } if elapsed_time > @slow_flush_log_threshold @counters_monitor.synchronize { @slow_flush_count += 1 } log.warn "buffer flush took longer time than slow_flush_log_threshold:", elapsed_time: elapsed_time, slow_flush_log_threshold: @slow_flush_log_threshold, plugin_id: self.plugin_id end end
def chunk_for_test(tag, time, record)
def chunk_for_test(tag, time, record) require 'fluent/plugin/buffer/memory_chunk' m = metadata_for_test(tag, time, record) Fluent::Plugin::Buffer::MemoryChunk.new(m) end
def close
def close @buffer.close if @buffering && @buffer @secondary.close if @secondary super end
def commit_write(chunk_id, delayed: @delayed_commit, secondary: false)
def commit_write(chunk_id, delayed: @delayed_commit, secondary: false) log.on_trace { log.trace "committing write operation to a chunk", chunk: dump_unique_id_hex(chunk_id), delayed: delayed } if delayed @dequeued_chunks_mutex.synchronize do @dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id } end end @buffer.purge_chunk(chunk_id) @retry_mutex.synchronize do if @retry # success to flush chunks in retries if secondary log.warn "retry succeeded by secondary.", chunk_id: dump_unique_id_hex(chunk_id) else log.warn "retry succeeded.", chunk_id: dump_unique_id_hex(chunk_id) end @retry = nil end end end
def configure(conf)
def configure(conf) unless implement?(:synchronous) || implement?(:buffered) || implement?(:delayed_commit) raise "BUG: output plugin must implement some methods. see developer documents." end has_buffer_section = (conf.elements(name: 'buffer').size > 0) has_flush_interval = conf.has_key?('flush_interval') super if has_buffer_section unless implement?(:buffered) || implement?(:delayed_commit) raise Fluent::ConfigError, "<buffer> section is configured, but plugin '#{self.class}' doesn't support buffering" end @buffering = true else # no buffer sections if implement?(:synchronous) if !implement?(:buffered) && !implement?(:delayed_commit) if @as_secondary raise Fluent::ConfigError, "secondary plugin '#{self.class}' must support buffering, but doesn't." end @buffering = false else if @as_secondary # secondary plugin always works as buffered plugin without buffer instance @buffering = true else # @buffering.nil? shows that enabling buffering or not will be decided in lazy way in #start @buffering = nil end end else # buffered or delayed_commit is supported by `unless` of first line in this method @buffering = true end end if @as_secondary if !@buffering && !@buffering.nil? raise Fluent::ConfigError, "secondary plugin '#{self.class}' must support buffering, but doesn't" end end if (@buffering || @buffering.nil?) && !@as_secondary # When @buffering.nil?, @buffer_config was initialized with default value for all parameters. # If so, this configuration MUST success. @chunk_keys = @buffer_config.chunk_keys.dup @chunk_key_time = !!@chunk_keys.delete('time') @chunk_key_tag = !!@chunk_keys.delete('tag') if @chunk_keys.any? { |key| begin k = Fluent::PluginHelper::RecordAccessor::Accessor.parse_parameter(key) if k.is_a?(String) k !~ CHUNK_KEY_PATTERN else if key.start_with?('$[') raise Fluent::ConfigError, "in chunk_keys: bracket notation is not allowed" else false end end rescue => e raise Fluent::ConfigError, "in chunk_keys: #{e.message}" end } raise Fluent::ConfigError, "chunk_keys specification includes invalid char" else @chunk_key_accessors = Hash[@chunk_keys.map { |key| [key.to_sym, Fluent::PluginHelper::RecordAccessor::Accessor.new(key)] }] end if @chunk_key_time raise Fluent::ConfigError, "<buffer ...> argument includes 'time', but timekey is not configured" unless @buffer_config.timekey Fluent::Timezone.validate!(@buffer_config.timekey_zone) @timekey_zone = @buffer_config.timekey_use_utc ? '+0000' : @buffer_config.timekey_zone @timekey = @buffer_config.timekey @timekey_use_utc = @buffer_config.timekey_use_utc @offset = Fluent::Timezone.utc_offset(@timekey_zone) @calculate_offset = @offset.respond_to?(:call) ? @offset : nil @output_time_formatter_cache = {} end if (@chunk_key_tag ? 1 : 0) + @chunk_keys.size >= CHUNKING_FIELD_WARN_NUM log.warn "many chunk keys specified, and it may cause too many chunks on your system." end # no chunk keys or only tags (chunking can be done without iterating event stream) @simple_chunking = !@chunk_key_time && @chunk_keys.empty? @flush_mode = @buffer_config.flush_mode if @flush_mode == :default if has_flush_interval log.info "'flush_interval' is configured at out side of <buffer>. 'flush_mode' is set to 'interval' to keep existing behaviour" @flush_mode = :interval else @flush_mode = (@chunk_key_time ? :lazy : :interval) end end buffer_type = @buffer_config[:@type] buffer_conf = conf.elements(name: 'buffer').first || Fluent::Config::Element.new('buffer', '', {}, []) @buffer = Plugin.new_buffer(buffer_type, parent: self) @buffer.configure(buffer_conf) @flush_at_shutdown = @buffer_config.flush_at_shutdown if @flush_at_shutdown.nil? @flush_at_shutdown = if @buffer.persistent? false else true # flush_at_shutdown is true in default for on-memory buffer end elsif !@flush_at_shutdown && !@buffer.persistent? buf_type = Plugin.lookup_type_from_class(@buffer.class) log.warn "'flush_at_shutdown' is false, and buffer plugin '#{buf_type}' is not persistent buffer." log.warn "your configuration will lose buffered data at shutdown. please confirm your configuration again." end if (@flush_mode != :interval) && buffer_conf.has_key?('flush_interval') if buffer_conf.has_key?('flush_mode') raise Fluent::ConfigError, "'flush_interval' can't be specified when 'flush_mode' is not 'interval' explicitly: '#{@flush_mode}'" else log.warn "'flush_interval' is ignored because default 'flush_mode' is not 'interval': '#{@flush_mode}'" end end if @buffer.queued_chunks_limit_size.nil? @buffer.queued_chunks_limit_size = @buffer_config.flush_thread_count end end if @secondary_config raise Fluent::ConfigError, "Invalid <secondary> section for non-buffered plugin" unless @buffering raise Fluent::ConfigError, "<secondary> section cannot have <buffer> section" if @secondary_config.buffer raise Fluent::ConfigError, "<secondary> section cannot have <secondary> section" if @secondary_config.secondary if @buffer_config.retry_forever log.warn "<secondary> with 'retry_forever', only unrecoverable errors are moved to secondary" end secondary_type = @secondary_config[:@type] unless secondary_type secondary_type = conf['@type'] # primary plugin type end secondary_conf = conf.elements(name: 'secondary').first @secondary = Plugin.new_output(secondary_type) unless @secondary.respond_to?(:acts_as_secondary) raise Fluent::ConfigError, "Failed to setup secondary plugin in '#{conf['@type']}'. '#{secondary_type}' plugin in not allowed due to non buffered output" end @secondary.acts_as_secondary(self) @secondary.configure(secondary_conf) if (self.class != @secondary.class) && (@custom_format || @secondary.implement?(:custom_format)) log.warn "secondary type should be same with primary one", primary: self.class.to_s, secondary: @secondary.class.to_s end else @secondary = nil end self end
def emit_buffered(tag, es)
def emit_buffered(tag, es) @counters_monitor.synchronize{ @emit_count += 1 } begin execute_chunking(tag, es, enqueue: (@flush_mode == :immediate)) if !@retry && @buffer.queued? submit_flush_once end rescue # TODO: separate number of errors into emit errors and write/flush errors @counters_monitor.synchronize{ @num_errors += 1 } raise end end
def emit_events(tag, es)
def emit_events(tag, es) # actually this method will be overwritten by #configure if @buffering emit_buffered(tag, es) else emit_sync(tag, es) end end
def emit_sync(tag, es)
def emit_sync(tag, es) @counters_monitor.synchronize{ @emit_count += 1 } begin process(tag, es) @counters_monitor.synchronize{ @emit_records += es.size } rescue @counters_monitor.synchronize{ @num_errors += 1 } raise end end
def enqueue_thread_run
def enqueue_thread_run value_for_interval = nil if @flush_mode == :interval value_for_interval = @buffer_config.flush_interval end if @chunk_key_time if !value_for_interval || @buffer_config.timekey < value_for_interval value_for_interval = [@buffer_config.timekey, @buffer_config.timekey_wait].min end end unless value_for_interval raise "BUG: both of flush_interval and timekey are disabled" end interval = value_for_interval / 11.0 if interval < @buffer_config.flush_thread_interval interval = @buffer_config.flush_thread_interval end while !self.after_started? && !self.stopped? sleep 0.5 end log.debug "enqueue_thread actually running" begin while @output_enqueue_thread_running now_int = Time.now.to_i if @output_flush_interrupted sleep interval next end @output_enqueue_thread_mutex.lock begin if @flush_mode == :interval flush_interval = @buffer_config.flush_interval.to_i # This block should be done by integer values. # If both of flush_interval & flush_thread_interval are 1s, expected actual flush timing is 1.5s. # If we use integered values for this comparison, expected actual flush timing is 1.0s. @buffer.enqueue_all{ |metadata, chunk| chunk.created_at.to_i + flush_interval <= now_int } end if @chunk_key_time timekey_unit = @buffer_config.timekey timekey_wait = @buffer_config.timekey_wait current_timekey = now_int - now_int % timekey_unit @buffer.enqueue_all{ |metadata, chunk| metadata.timekey < current_timekey && metadata.timekey + timekey_unit + timekey_wait <= now_int } end rescue => e raise if @under_plugin_development log.error "unexpected error while checking flushed chunks. ignored.", error: e log.error_backtrace ensure @output_enqueue_thread_waiting = false @output_enqueue_thread_mutex.unlock end sleep interval end rescue => e # normal errors are rescued by inner begin-rescue clause. log.error "error on enqueue thread", error: e log.error_backtrace raise end end
def enqueue_thread_wait
def enqueue_thread_wait @output_enqueue_thread_mutex.synchronize do @output_flush_interrupted = false @output_enqueue_thread_waiting = true end require 'timeout' Timeout.timeout(10) do Thread.pass while @output_enqueue_thread_waiting end end
def execute_chunking(tag, es, enqueue: false)
def execute_chunking(tag, es, enqueue: false) if @simple_chunking handle_stream_simple(tag, es, enqueue: enqueue) elsif @custom_format handle_stream_with_custom_format(tag, es, enqueue: enqueue) else handle_stream_with_standard_format(tag, es, enqueue: enqueue) end end
def extract_placeholders(str, chunk)
def extract_placeholders(str, chunk) metadata = if chunk.is_a?(Fluent::Plugin::Buffer::Chunk) chunk_passed = true chunk.metadata else chunk_passed = false # For existing plugins. Old plugin passes Chunk.metadata instead of Chunk chunk end if metadata.empty? str.sub(CHUNK_ID_PLACEHOLDER_PATTERN) { if chunk_passed dump_unique_id_hex(chunk.unique_id) else log.warn "${chunk_id} is not allowed in this plugin. Pass Chunk instead of metadata in extract_placeholders's 2nd argument" end } else rvalue = str.dup # strftime formatting if @chunk_key_time # this section MUST be earlier than rest to use raw 'str' @output_time_formatter_cache[str] ||= Fluent::Timezone.formatter(@timekey_zone, str) rvalue = @output_time_formatter_cache[str].call(metadata.timekey) end # ${tag}, ${tag[0]}, ${tag[1]}, ... , ${tag[-2]}, ${tag[-1]} if @chunk_key_tag if str.include?('${tag}') rvalue = rvalue.gsub('${tag}', metadata.tag) end if str =~ CHUNK_TAG_PLACEHOLDER_PATTERN hash = {} tag_parts = metadata.tag.split('.') tag_parts.each_with_index do |part, i| hash["${tag[#{i}]}"] = part hash["${tag[#{i-tag_parts.size}]}"] = part end rvalue = rvalue.gsub(CHUNK_TAG_PLACEHOLDER_PATTERN, hash) end if rvalue =~ CHUNK_TAG_PLACEHOLDER_PATTERN log.warn "tag placeholder '#{$1}' not replaced. tag:#{metadata.tag}, template:#{str}" end end # ${a_chunk_key}, ... if !@chunk_keys.empty? && metadata.variables hash = {'${tag}' => '${tag}'} # not to erase this wrongly @chunk_keys.each do |key| hash["${#{key}}"] = metadata.variables[key.to_sym] end rvalue = rvalue.gsub(CHUNK_KEY_PLACEHOLDER_PATTERN, hash) end if rvalue =~ CHUNK_KEY_PLACEHOLDER_PATTERN log.warn "chunk key placeholder '#{$1}' not replaced. template:#{str}" end rvalue.sub(CHUNK_ID_PLACEHOLDER_PATTERN) { if chunk_passed dump_unique_id_hex(chunk.unique_id) else log.warn "${chunk_id} is not allowed in this plugin. Pass Chunk instead of metadata in extract_placeholders's 2nd argument" end } end end
def flush_thread_run(state)
def flush_thread_run(state) flush_thread_interval = @buffer_config.flush_thread_interval state.next_clock = Fluent::Clock.now + flush_thread_interval while !self.after_started? && !self.stopped? sleep 0.5 end log.debug "flush_thread actually running" state.mutex.lock begin # This thread don't use `thread_current_running?` because this thread should run in `before_shutdown` phase while @output_flush_threads_running current_clock = Fluent::Clock.now next_retry_time = nil @retry_mutex.synchronize do next_retry_time = @retry ? @retry.next_time : nil end if state.next_clock > current_clock interval = state.next_clock - current_clock elsif next_retry_time && next_retry_time > Time.now interval = next_retry_time.to_f - Time.now.to_f else state.mutex.unlock begin try_flush # next_flush_time uses flush_thread_interval or flush_thread_burst_interval (or retrying) interval = next_flush_time.to_f - Time.now.to_f # TODO: if secondary && delayed-commit, next_flush_time will be much longer than expected # because @retry still exists (#commit_write is not called yet in #try_flush) # @retry should be cleared if delayed commit is enabled? Or any other solution? state.next_clock = Fluent::Clock.now + interval ensure state.mutex.lock end end if @dequeued_chunks_mutex.synchronize{ !@dequeued_chunks.empty? && @dequeued_chunks.first.expired? } unless @output_flush_interrupted state.mutex.unlock begin try_rollback_write ensure state.mutex.lock end end end state.cond_var.wait(state.mutex, interval) if interval > 0 end rescue => e # normal errors are rescued by output plugins in #try_flush # so this rescue section is for critical & unrecoverable errors log.error "error on output thread", error: e log.error_backtrace raise ensure state.mutex.unlock end end
def flush_thread_wakeup
def flush_thread_wakeup @output_flush_threads.each do |state| state.mutex.synchronize { if state.thread && state.thread.status state.next_clock = 0 state.cond_var.signal end } Thread.pass end end
def force_flush
def force_flush if @buffering @buffer.enqueue_all(true) submit_flush_all end end
def format(tag, time, record)
def format(tag, time, record) # standard msgpack_event_stream chunk will be used if this method is not implemented in plugin subclass raise NotImplementedError, "BUG: output plugins MUST implement this method" end
def formatted_to_msgpack_binary
def formatted_to_msgpack_binary formatted_to_msgpack_binary? end
def formatted_to_msgpack_binary?
def formatted_to_msgpack_binary? # To indicate custom format method (#format) returns msgpack binary or not. # If #format returns msgpack binary, override this method to return true. false end
def generate_format_proc
def generate_format_proc if @buffer && @buffer.compress == :gzip @time_as_integer ? FORMAT_COMPRESSED_MSGPACK_STREAM_TIME_INT : FORMAT_COMPRESSED_MSGPACK_STREAM else @time_as_integer ? FORMAT_MSGPACK_STREAM_TIME_INT : FORMAT_MSGPACK_STREAM end end
def get_placeholders_keys(str)
def get_placeholders_keys(str) str.scan(CHUNK_KEY_PLACEHOLDER_PATTERN).map{|ph| ph[2..-2]}.reject{|s| (s == "tag") || (s == 'chunk_id') }.sort end
def get_placeholders_tag(str)
def get_placeholders_tag(str) # [["tag"],["tag[0]"]] parts = [] str.scan(CHUNK_TAG_PLACEHOLDER_PATTERN).map(&:first).each do |ph| if ph == "tag" parts << -1 elsif ph =~ /^tag\[(-?\d+)\]$/ parts << $1.to_i end end parts.sort end
def get_placeholders_time(str)
def get_placeholders_time(str) base_str = TIMESTAMP_CHECK_BASE_TIME.strftime(str) TIME_KEY_PLACEHOLDER_THRESHOLDS.each do |triple| sec = triple.first return triple if (TIMESTAMP_CHECK_BASE_TIME + sec).strftime(str) != base_str end nil end
def handle_stream_simple(tag, es, enqueue: false)
def handle_stream_simple(tag, es, enqueue: false) format_proc = nil meta = metadata((@chunk_key_tag ? tag : nil), nil, nil) records = es.size if @custom_format records = 0 data = [] es.each do |time, record| res = format(tag, time, record) if res data << res records += 1 end end else format_proc = generate_format_proc data = es end write_guard do @buffer.write({meta => data}, format: format_proc, enqueue: enqueue) end @counters_monitor.synchronize{ @emit_records += records } true end
def handle_stream_with_custom_format(tag, es, enqueue: false)
iteration of event stream, and it should be done just once even if total event stream size
For custom format, formatting will be done here. Custom formatting always requires
`@buffer.write` will do this splitting.
"whole event stream" may be a split of "es" here when it's bigger than chunk_limit_size.
For standard format, formatting should be done for whole event stream, but
(custom format) metadata => array of formatted event
(standard format) metadata => event stream
metadata_and_data is a Hash of:
def handle_stream_with_custom_format(tag, es, enqueue: false) meta_and_data = {} records = 0 es.each do |time, record| meta = metadata(tag, time, record) meta_and_data[meta] ||= [] res = format(tag, time, record) if res meta_and_data[meta] << res records += 1 end end write_guard do @buffer.write(meta_and_data, enqueue: enqueue) end @counters_monitor.synchronize{ @emit_records += records } true end
def handle_stream_with_standard_format(tag, es, enqueue: false)
def handle_stream_with_standard_format(tag, es, enqueue: false) format_proc = generate_format_proc meta_and_data = {} records = 0 es.each do |time, record| meta = metadata(tag, time, record) meta_and_data[meta] ||= MultiEventStream.new meta_and_data[meta].add(time, record) records += 1 end write_guard do @buffer.write(meta_and_data, format: format_proc, enqueue: enqueue) end @counters_monitor.synchronize{ @emit_records += records } true end
def implement?(feature)
def implement?(feature) methods_of_plugin = self.class.instance_methods(false) case feature when :synchronous then methods_of_plugin.include?(:process) || support_in_v12_style?(:synchronous) when :buffered then methods_of_plugin.include?(:write) || support_in_v12_style?(:buffered) when :delayed_commit then methods_of_plugin.include?(:try_write) when :custom_format then methods_of_plugin.include?(:format) || support_in_v12_style?(:custom_format) else raise ArgumentError, "Unknown feature for output plugin: #{feature}" end end
def initialize
def initialize super @counters_monitor = Monitor.new @buffering = false @delayed_commit = false @as_secondary = false @primary_instance = nil # TODO: well organized counters @num_errors = 0 @emit_count = 0 @emit_records = 0 @write_count = 0 @rollback_count = 0 @flush_time_count = 0 @slow_flush_count = 0 # How to process events is decided here at once, but it will be decided in delayed way on #configure & #start if implement?(:synchronous) if implement?(:buffered) || implement?(:delayed_commit) @buffering = nil # do #configure or #start to determine this for full-featured plugins else @buffering = false end else @buffering = true end @custom_format = implement?(:custom_format) @enable_msgpack_streamer = false # decided later @buffer = nil @secondary = nil @retry = nil @dequeued_chunks = nil @dequeued_chunks_mutex = nil @output_enqueue_thread = nil @output_flush_threads = nil @simple_chunking = nil @chunk_keys = @chunk_key_accessors = @chunk_key_time = @chunk_key_tag = nil @flush_mode = nil @timekey_zone = nil @retry_for_error_chunk = false end
def interrupt_flushes
def interrupt_flushes @output_flush_interrupted = true end
def metadata(tag, time, record)
def metadata(tag, time, record) # this arguments are ordered in output plugin's rule # Metadata 's argument order is different from this one (timekey, tag, variables) raise ArgumentError, "tag must be a String: #{tag.class}" unless tag.nil? || tag.is_a?(String) raise ArgumentError, "time must be a Fluent::EventTime (or Integer): #{time.class}" unless time.nil? || time.is_a?(Fluent::EventTime) || time.is_a?(Integer) raise ArgumentError, "record must be a Hash: #{record.class}" unless record.nil? || record.is_a?(Hash) if @chunk_keys.nil? && @chunk_key_time.nil? && @chunk_key_tag.nil? # for tests return Struct.new(:timekey, :tag, :variables).new end # timekey is int from epoch, and `timekey - timekey % 60` is assumed to mach with 0s of each minutes. # it's wrong if timezone is configured as one which supports leap second, but it's very rare and # we can ignore it (especially in production systems). if @chunk_keys.empty? if !@chunk_key_time && !@chunk_key_tag @buffer.metadata() elsif @chunk_key_time && @chunk_key_tag timekey = calculate_timekey(time) @buffer.metadata(timekey: timekey, tag: tag) elsif @chunk_key_time timekey = calculate_timekey(time) @buffer.metadata(timekey: timekey) else @buffer.metadata(tag: tag) end else timekey = if @chunk_key_time calculate_timekey(time) else nil end pairs = Hash[@chunk_key_accessors.map { |k, a| [k, a.call(record)] }] @buffer.metadata(timekey: timekey, tag: (@chunk_key_tag ? tag : nil), variables: pairs) end end
def metadata_for_test(tag, time, record)
def metadata_for_test(tag, time, record) raise "BUG: #metadata_for_test is available only when no actual metadata exists" unless @buffer.metadata_list.empty? m = metadata(tag, time, record) @buffer.metadata_list_clear! m end
def multi_workers_ready?
def multi_workers_ready? false end
def next_flush_time
def next_flush_time if @buffer.queued? @retry_mutex.synchronize do @retry ? @retry.next_time : Time.now + @buffer_config.flush_thread_burst_interval end else Time.now + @buffer_config.flush_thread_interval end end
def placeholder_validate!(name, str)
def placeholder_validate!(name, str) placeholder_validators(name, str).each do |v| v.validate! end end
def placeholder_validators(name, str, time_key = (@chunk_key_time && @buffer_config.timekey), tag_key = @chunk_key_tag, chunk_keys = @chunk_keys)
def placeholder_validators(name, str, time_key = (@chunk_key_time && @buffer_config.timekey), tag_key = @chunk_key_tag, chunk_keys = @chunk_keys) validators = [] sec, title, example = get_placeholders_time(str) if sec || time_key validators << PlaceholderValidator.new(name, str, :time, {sec: sec, title: title, example: example, timekey: time_key}) end parts = get_placeholders_tag(str) if tag_key || !parts.empty? validators << PlaceholderValidator.new(name, str, :tag, {parts: parts, tagkey: tag_key}) end keys = get_placeholders_keys(str) if chunk_keys && !chunk_keys.empty? || !keys.empty? validators << PlaceholderValidator.new(name, str, :keys, {keys: keys, chunkkeys: chunk_keys}) end validators end
def prefer_buffered_processing
def prefer_buffered_processing # override this method to return false only when all of these are true: # * plugin has both implementation for buffered and non-buffered methods # * plugin is expected to work as non-buffered plugin if no `<buffer>` sections specified true end
def prefer_delayed_commit
def prefer_delayed_commit # override this method to decide which is used of `write` or `try_write` if both are implemented true end
def process(tag, es)
def process(tag, es) raise NotImplementedError, "BUG: output plugins MUST implement this method" end
def retry_state(randomize)
def retry_state(randomize) if @secondary retry_state_create( :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout, forever: @buffer_config.retry_forever, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base, max_interval: @buffer_config.retry_max_interval, secondary: true, secondary_threshold: @buffer_config.retry_secondary_threshold, randomize: randomize ) else retry_state_create( :output_retries, @buffer_config.retry_type, @buffer_config.retry_wait, @buffer_config.retry_timeout, forever: @buffer_config.retry_forever, max_steps: @buffer_config.retry_max_times, backoff_base: @buffer_config.retry_exponential_backoff_base, max_interval: @buffer_config.retry_max_interval, randomize: randomize ) end end
def rollback_write(chunk_id, update_retry: true)
update_retry parameter is for preventing busy loop by async write
def rollback_write(chunk_id, update_retry: true) # This API is to rollback chunks explicitly from plugins. # 3rd party plugins can depend it on automatic rollback of #try_rollback_write @dequeued_chunks_mutex.synchronize do @dequeued_chunks.delete_if{ |info| info.chunk_id == chunk_id } end # returns true if chunk was rollbacked as expected # false if chunk was already flushed and couldn't be rollbacked unexpectedly # in many cases, false can be just ignored if @buffer.takeback_chunk(chunk_id) @counters_monitor.synchronize{ @rollback_count += 1 } if update_retry primary = @as_secondary ? @primary_instance : self primary.update_retry_state(chunk_id, @as_secondary) end true else false end end
def shutdown
def shutdown @secondary.shutdown if @secondary @buffer.shutdown if @buffering && @buffer super end
def start
def start super if @buffering.nil? @buffering = prefer_buffered_processing if !@buffering && @buffer @buffer.terminate # it's not started, so terminate will be enough # At here, this plugin works as non-buffered plugin. # Un-assign @buffer not to show buffering metrics (e.g., in_monitor_agent) @buffer = nil end end if @buffering m = method(:emit_buffered) singleton_class.module_eval do define_method(:emit_events, m) end @custom_format = implement?(:custom_format) @enable_msgpack_streamer = @custom_format ? formatted_to_msgpack_binary : true @delayed_commit = if implement?(:buffered) && implement?(:delayed_commit) prefer_delayed_commit else implement?(:delayed_commit) end @delayed_commit_timeout = @buffer_config.delayed_commit_timeout else # !@buffering m = method(:emit_sync) singleton_class.module_eval do define_method(:emit_events, m) end end if @buffering && !@as_secondary @retry = nil @retry_mutex = Mutex.new @buffer.start @output_enqueue_thread = nil @output_enqueue_thread_running = true @output_flush_threads = [] @output_flush_threads_mutex = Mutex.new @output_flush_threads_running = true # mainly for test: detect enqueue works as code below: # @output.interrupt_flushes # # emits # @output.enqueue_thread_wait @output_flush_interrupted = false @output_enqueue_thread_mutex = Mutex.new @output_enqueue_thread_waiting = false @dequeued_chunks = [] @dequeued_chunks_mutex = Mutex.new @buffer_config.flush_thread_count.times do |i| thread_title = "flush_thread_#{i}".to_sym thread_state = FlushThreadState.new(nil, nil, Mutex.new, ConditionVariable.new) thread = thread_create(thread_title) do flush_thread_run(thread_state) end thread_state.thread = thread @output_flush_threads_mutex.synchronize do @output_flush_threads << thread_state end end @output_flush_thread_current_position = 0 if !@under_plugin_development && (@flush_mode == :interval || @chunk_key_time) @output_enqueue_thread = thread_create(:enqueue_thread, &method(:enqueue_thread_run)) end end @secondary.start if @secondary end
def statistics
def statistics stats = { 'emit_records' => @emit_records, # Respect original name # https://github.com/fluent/fluentd/blob/45c7b75ba77763eaf87136864d4942c4e0c5bfcd/lib/fluent/plugin/in_monitor_agent.rb#L284 'retry_count' => @num_errors, 'emit_count' => @emit_count, 'write_count' => @write_count, 'rollback_count' => @rollback_count, 'slow_flush_count' => @slow_flush_count, 'flush_time_count' => @flush_time_count, } if @buffer && @buffer.respond_to?(:statistics) (@buffer.statistics['buffer'] || {}).each do |k, v| stats["buffer_#{k}"] = v end end { 'output' => stats } end
def stop
def stop @secondary.stop if @secondary @buffer.stop if @buffering && @buffer super end
def submit_flush_all
def submit_flush_all while !@retry && @buffer.queued? submit_flush_once sleep @buffer_config.flush_thread_burst_interval end end
def submit_flush_once
def submit_flush_once # Without locks: it is rough but enough to select "next" writer selection @output_flush_thread_current_position = (@output_flush_thread_current_position + 1) % @buffer_config.flush_thread_count state = @output_flush_threads[@output_flush_thread_current_position] state.mutex.synchronize { if state.thread && state.thread.status # "run"/"sleep"/"aborting" or false(successfully stop) or nil(killed by exception) state.next_clock = 0 state.cond_var.signal else log.warn "thread is already dead" end } Thread.pass end
def support_in_v12_style?(feature)
def support_in_v12_style?(feature) # for plugins written in v0.12 styles case feature when :synchronous then false when :buffered then false when :delayed_commit then false when :custom_format then false else raise ArgumentError, "unknown feature: #{feature}" end end
def terminate
def terminate @buffer.terminate if @buffering && @buffer @secondary.terminate if @secondary super end
def try_flush
def try_flush chunk = @buffer.dequeue_chunk return unless chunk log.on_trace { log.trace "trying flush for a chunk", chunk: dump_unique_id_hex(chunk.unique_id) } output = self using_secondary = false if @retry_mutex.synchronize{ @retry && @retry.secondary? } output = @secondary using_secondary = true end if @enable_msgpack_streamer chunk.extend ChunkMessagePackEventStreamer end begin chunk_write_start = Fluent::Clock.now if output.delayed_commit log.trace "executing delayed write and commit", chunk: dump_unique_id_hex(chunk.unique_id) @counters_monitor.synchronize{ @write_count += 1 } @dequeued_chunks_mutex.synchronize do # delayed_commit_timeout for secondary is configured in <buffer> of primary (<secondary> don't get <buffer>) @dequeued_chunks << DequeuedChunkInfo.new(chunk.unique_id, Time.now, self.delayed_commit_timeout) end output.try_write(chunk) check_slow_flush(chunk_write_start) else # output plugin without delayed purge chunk_id = chunk.unique_id dump_chunk_id = dump_unique_id_hex(chunk_id) log.trace "adding write count", instance: self.object_id @counters_monitor.synchronize{ @write_count += 1 } log.trace "executing sync write", chunk: dump_chunk_id output.write(chunk) check_slow_flush(chunk_write_start) log.trace "write operation done, committing", chunk: dump_chunk_id commit_write(chunk_id, delayed: false, secondary: using_secondary) log.trace "done to commit a chunk", chunk: dump_chunk_id end rescue *UNRECOVERABLE_ERRORS => e if @secondary if using_secondary log.warn "got unrecoverable error in secondary.", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) else if (self.class == @secondary.class) log.warn "got unrecoverable error in primary and secondary type is same as primary. Skip secondary", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) else # Call secondary output directly without retry update. # In this case, delayed commit causes inconsistent state in dequeued chunks so async output in secondary is not allowed for now. if @secondary.delayed_commit log.warn "got unrecoverable error in primary and secondary is async output. Skip secondary for backup", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) else log.warn "got unrecoverable error in primary. Skip retry and flush chunk to secondary", error: e log.warn_backtrace begin @secondary.write(chunk) commit_write(chunk_id, delayed: output.delayed_commit, secondary: true) rescue => e log.warn "got an error in secondary for unrecoverable error", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) end end end end else log.warn "got unrecoverable error in primary and no secondary", error: e log.warn_backtrace backup_chunk(chunk, using_secondary, output.delayed_commit) end rescue => e log.debug "taking back chunk for errors.", chunk: dump_unique_id_hex(chunk.unique_id) if output.delayed_commit @dequeued_chunks_mutex.synchronize do @dequeued_chunks.delete_if{|d| d.chunk_id == chunk.unique_id } end end if @buffer.takeback_chunk(chunk.unique_id) @counters_monitor.synchronize { @rollback_count += 1 } end update_retry_state(chunk.unique_id, using_secondary, e) raise if @under_plugin_development && !@retry_for_error_chunk end end
def try_rollback_all
def try_rollback_all return unless @dequeued_chunks @dequeued_chunks_mutex.synchronize do until @dequeued_chunks.empty? info = @dequeued_chunks.shift if @buffer.takeback_chunk(info.chunk_id) @counters_monitor.synchronize{ @rollback_count += 1 } log.info "delayed commit for buffer chunks was cancelled in shutdown", chunk_id: dump_unique_id_hex(info.chunk_id) primary = @as_secondary ? @primary_instance : self primary.update_retry_state(info.chunk_id, @as_secondary) end end end end
def try_rollback_write
def try_rollback_write @dequeued_chunks_mutex.synchronize do while @dequeued_chunks.first && @dequeued_chunks.first.expired? info = @dequeued_chunks.shift if @buffer.takeback_chunk(info.chunk_id) @counters_monitor.synchronize{ @rollback_count += 1 } log.warn "failed to flush the buffer chunk, timeout to commit.", chunk_id: dump_unique_id_hex(info.chunk_id), flushed_at: info.time primary = @as_secondary ? @primary_instance : self primary.update_retry_state(info.chunk_id, @as_secondary) end end end end
def try_write(chunk)
def try_write(chunk) raise NotImplementedError, "BUG: output plugins MUST implement this method" end
def update_retry_state(chunk_id, using_secondary, error = nil)
def update_retry_state(chunk_id, using_secondary, error = nil) @retry_mutex.synchronize do @counters_monitor.synchronize{ @num_errors += 1 } chunk_id_hex = dump_unique_id_hex(chunk_id) unless @retry @retry = retry_state(@buffer_config.retry_randomize) if error log.warn "failed to flush the buffer.", retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error log.warn_backtrace error.backtrace end return end # @retry exists if @retry.limit? if error records = @buffer.queued_records msg = "failed to flush the buffer, and hit limit for retries. dropping all chunks in the buffer queue." log.error msg, retry_times: @retry.steps, records: records, error: error log.error_backtrace error.backtrace end @buffer.clear_queue! log.debug "buffer queue cleared" @retry = nil else @retry.step if error if using_secondary msg = "failed to flush the buffer with secondary output." log.warn msg, retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error log.warn_backtrace error.backtrace else msg = "failed to flush the buffer." log.warn msg, retry_time: @retry.steps, next_retry_seconds: @retry.next_time, chunk: chunk_id_hex, error: error log.warn_backtrace error.backtrace end end end end end
def write(chunk)
def write(chunk) raise NotImplementedError, "BUG: output plugins MUST implement this method" end
def write_guard(&block)
def write_guard(&block) begin block.call rescue Fluent::Plugin::Buffer::BufferOverflowError log.warn "failed to write data into buffer by buffer overflow", action: @buffer_config.overflow_action case @buffer_config.overflow_action when :throw_exception raise when :block log.debug "buffer.write is now blocking" until @buffer.storable? if self.stopped? log.error "breaking block behavior to shutdown Fluentd" # to break infinite loop to exit Fluentd process raise end log.trace "sleeping until buffer can store more data" sleep 1 end log.debug "retrying buffer.write after blocked operation" retry when :drop_oldest_chunk begin oldest = @buffer.dequeue_chunk if oldest log.warn "dropping oldest chunk to make space after buffer overflow", chunk_id: dump_unique_id_hex(oldest.unique_id) @buffer.purge_chunk(oldest.unique_id) else log.error "no queued chunks to be dropped for drop_oldest_chunk" end rescue # ignore any errors end raise unless @buffer.storable? retry else raise "BUG: unknown overflow_action '#{@buffer_config.overflow_action}'" end end end