## Fluentd## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#require'fluent/plugin/input'require'yajl'moduleFluent::PluginclassExecInput<Fluent::Plugin::InputFluent::Plugin.register_input('exec',self)helpers:compat_parameters,:extract,:parser,:child_processdesc'The command (program) to execute.'config_param:command,:stringconfig_section:parsedoconfig_set_default:@type,'tsv'config_set_default:time_type,:floatconfig_set_default:time_key,nilconfig_set_default:estimate_current_event,falseendconfig_section:extractdoconfig_set_default:time_type,:floatenddesc'Tag of the output events.'config_param:tag,:string,default: nildesc'The interval time between periodic program runs.'config_param:run_interval,:time,default: nildesc'The default block size to read if parser requires partial read.'config_param:read_block_size,:size,default: 10240# 10kattr_reader:parserdefconfigure(conf)compat_parameters_convert(conf,:extract,:parser)['parse','extract'].eachdo|subsection_name|ifsubsection=conf.elements(subsection_name).firstifsubsection.has_key?('time_format')subsection['time_type']||='string'endendendsuperif!@tag&&(!@extract_config||!@extract_config.tag_key)raiseFluent::ConfigError,"'tag' or 'tag_key' option is required on exec input"end@parser=parser_createenddefmulti_workers_ready?trueenddefstartsuperif@run_intervalchild_process_execute(:exec_input,@command,interval: @run_interval,mode: [:read],&method(:run))elsechild_process_execute(:exec_input,@command,immediate: true,mode: [:read],&method(:run))endenddefrun(io)casewhen@parser.implement?(:parse_io)@parser.parse_io(io,&method(:on_record))when@parser.implement?(:parse_partial_data)untilio.eof?@parser.parse_partial_data(io.readpartial(@read_block_size),&method(:on_record))endwhen@parser.parser_type==:text_per_lineio.each_linedo|line|@parser.parse(line.chomp,&method(:on_record))endelse@parser.parse(io.read,&method(:on_record))endenddefon_record(time,record)tag=extract_tag_from_record(record)tag||=@tagtime||=extract_time_from_record(record)||Fluent::EventTime.nowrouter.emit(tag,time,record)rescue=>elog.error"exec failed to emit",tag: tag,record: Yajl.dump(record),error: erouter.emit_error_event(tag,time,record,e)iftag&&time&&recordendendend