class Fluent::Plugin::ExecOutput
def configure(conf)
def configure(conf) compat_parameters_convert(conf, :inject, :formatter, :buffer, default_chunk_key: 'time') super @formatter = formatter_create end
def format(tag, time, record)
def format(tag, time, record) record = inject_values_to_record(tag, time, record) if @formatter.formatter_type == :text_per_line @formatter.format(tag, time, record).chomp + NEWLINE else @formatter.format(tag, time, record) end end
def multi_workers_ready?
def multi_workers_ready? true end
def try_write(chunk)
def try_write(chunk) tmpfile = nil prog = if chunk.respond_to?(:path) "#{@command} #{chunk.path}" else tmpfile = Tempfile.new("fluent-plugin-out-exec-") tmpfile.binmode chunk.write_to(tmpfile) tmpfile.close "#{@command} #{tmpfile.path}" end chunk_id = chunk.unique_id callback = ->(status){ begin if tmpfile tmpfile.delete rescue nil end if status && status.success? commit_write(chunk_id) elsif status # #rollback_write will be done automatically if it isn't called at here. # But it's after command_timeout, and this timeout should be longer than users expectation. # So here, this plugin calls it explicitly. rollback_write(chunk_id) log.warn "command exits with error code", prog: prog, status: status.exitstatus, signal: status.termsig else rollback_write(chunk_id) log.warn "command unexpectedly exits without exit status", prog: prog end rescue => e log.error "unexpected error in child process callback", error: e end } child_process_execute(:out_exec_process, prog, stderr: :connect, immediate: true, parallel: true, mode: [], wait_timeout: @command_timeout, on_exit_callback: callback) end