class Fluent::Compat::ObjectBufferedOutput
def self.propagate_default_params
def self.propagate_default_params BUFFER_PARAMS end
def configure(conf)
def configure(conf) bufconf = CompatOutputUtils.buffer_section(conf) config_style = (bufconf ? :v1 : :v0) if config_style == :v0 buf_params = { "flush_mode" => "interval", "retry_type" => "exponential_backoff", } BUFFER_PARAMS.each do |older, newer| next unless newer if conf.has_key?(older) if older == 'buffer_queue_full_action' && conf[older] == 'exception' buf_params[newer] = 'throw_exception' else buf_params[newer] = conf[older] end end end conf.elements << Fluent::Config::Element.new('buffer', 'tag', buf_params, []) end ParserUtils.convert_parser_conf(conf) FormatterUtils.convert_formatter_conf(conf) super if config_style == :v1 if @buffer_config.chunk_keys == ['tag'] raise Fluent::ConfigError, "this plugin '#{self.class}' allows <buffer tag> only" end end self.extend BufferedChunkMixin end
def detach_multi_process(&block)
def detach_multi_process(&block) log.warn "detach_process is not supported in this version. ignored." block.call end
def detach_process(&block)
def detach_process(&block) log.warn "detach_process is not supported in this version. ignored." block.call end
def extract_placeholders(str, metadata)
def extract_placeholders(str, metadata) raise "BUG: compat plugin does not support extract_placeholders: use newer plugin API" end
def format_stream(tag, es) # for BufferedOutputTestDriver
def format_stream(tag, es) # for BufferedOutputTestDriver if @compress == :gzip es.to_compressed_msgpack_stream(time_int: @time_as_integer) else es.to_msgpack_stream(time_int: @time_as_integer) end end
def initialize
def initialize super unless self.class.ancestors.include?(Fluent::Compat::CallSuperMixin) self.class.prepend Fluent::Compat::CallSuperMixin end end
def start
def start super if instance_variable_defined?(:@formatter) && @inject_config unless @formatter.class.ancestors.include?(Fluent::Compat::HandleTagAndTimeMixin) if @formatter.respond_to?(:owner) && !@formatter.owner @formatter.owner = self @formatter.singleton_class.prepend FormatterUtils::InjectMixin end end end end
def support_in_v12_style?(feature)
def support_in_v12_style?(feature) case feature when :synchronous then false when :buffered then true when :delayed_commit then false when :custom_format then false end end
def write(chunk)
def write(chunk) write_objects(chunk.metadata.tag, chunk) end