## Fluentd## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.#require'optparse'require'fluent/env'require'fluent/time'require'fluent/msgpack_factory'require'fluent/version'op=OptionParser.newop.banner+=" <tag>"op.version=Fluent::VERSIONport=24224host='127.0.0.1'unix=falsesocket_path=Fluent::DEFAULT_SOCKET_PATHconfig_path=Fluent::DEFAULT_CONFIG_PATHformat='json'message_key='message'time_as_integer=falseretry_limit=5op.on('-p','--port PORT',"fluent tcp port (default: #{port})",Integer){|i|port=i}op.on('-h','--host HOST',"fluent host (default: #{host})"){|s|host=s}op.on('-u','--unix',"use unix socket instead of tcp",TrueClass){|b|unix=b}op.on('-s','--socket PATH',"unix socket path (default: #{socket_path})"){|s|socket_path=s}op.on('-f','--format FORMAT',"input format (default: #{format})"){|s|format=s}op.on('--json',"same as: -f json",TrueClass){|b|format='json'}op.on('--msgpack',"same as: -f msgpack",TrueClass){|b|format='msgpack'}op.on('--none',"same as: -f none",TrueClass){|b|format='none'}op.on('--message-key KEY',"key field for none format (default: #{message_key})"){|s|message_key=s}op.on('--time-as-integer',"Send time as integer for v0.12 or earlier",TrueClass){|b|time_as_integer=true}op.on('--retry-limit N',"Specify the number of retry limit (default: #{retry_limit})",Integer){|n|retry_limit=n}singleton_class.module_evaldodefine_method(:usage)do|msg|putsop.to_sputs"error: #{msg}"ifmsgexit1endendbeginop.parse!(ARGV)ifARGV.length!=1usagenilendtag=ARGV.shiftrescueusage$!.to_sendrequire'thread'require'monitor'require'socket'require'yajl'require'msgpack'classWriterincludeMonitorMixinRetryLimitError=Class.new(StandardError)classTimerThreaddefinitialize(writer)@writer=writerenddefstart@finish=false@thread=Thread.new(&method(:run))enddefshutdown@finish=true@thread.joinenddefrununtil@finishsleep1@writer.on_timerendendenddefinitialize(tag,connector,time_as_integer: false,retry_limit: 5)@tag=tag@connector=connector@socket=false@socket_time=Time.now.to_i@socket_ttl=10# TODO@error_history=[]@pending=[]@pending_limit=1024# TODO@retry_wait=1@retry_limit=retry_limit@time_as_integer=time_as_integersuper()enddefwrite(record)ifrecord.class!=HashraiseArgumentError,"Input must be a map (got #{record.class})"endtime=Fluent::EventTime.nowtime=time.to_iif@time_as_integerentry=[time,record]synchronize{unlesswrite_impl([entry])# write failed@pending.push(entry)while@pending.size>@pending_limit# exceeds pending limit; trash oldest recordtime,record=@pending.shiftabort_message(time,record)endend}enddefon_timernow=Time.now.to_isynchronize{unless@pending.empty?# flush pending recordsifwrite_impl(@pending)# write succeeded@pending.clearendendif@socket&&@socket_time+@socket_ttl<now# socket is not used @socket_ttl secondscloseend}enddefclose@socket.close@socket=nilenddefstart@timer=TimerThread.new(self)@timer.startselfenddefshutdown@timer.shutdownendprivatedefwrite_impl(array)socket=get_socketunlesssocketreturnfalseendbeginpacker=Fluent::MessagePackFactory.packersocket.writepacker.pack([@tag,array])socket.flushrescue$stderr.puts"write failed: #{$!}"closereturnfalseendreturntrueenddefget_socketunless@socketunlesstry_connectreturnnilendend@socket_time=Time.now.to_ireturn@socketenddeftry_connectbeginnow=Time.now.to_iunless@error_history.empty?# wait before re-connectingwait=1#@retry_wait * (2 ** (@error_history.size-1))ifnow<=@socket_time+waitsleep(wait)try_connectendend@socket=@connector.call@error_history.clearreturntruerescueRetryLimitError=>exraiseexrescue$stderr.puts"connect failed: #{$!}"@error_history<<$!@socket_time=nowif@retry_limit<@error_history.size# abort all pending records@pending.each{|(time,record)|abort_message(time,record)}@pending.clear@error_history.clearraiseRetryLimitError,"exceed retry limit"elseretryendendenddefabort_message(time,record)$stdout.puts"!#{time}:#{Yajl.dump(record)}"endendifunixconnector=Proc.new{UNIXSocket.open(socket_path)}elseconnector=Proc.new{TCPSocket.new(host,port)}endw=Writer.new(tag,connector,time_as_integer: time_as_integer,retry_limit: retry_limit)w.startcaseformatwhen'json'beginwhileline=$stdin.getsrecord=Yajl.load(line)w.write(record)endrescue$stderr.puts$!exit1endwhen'msgpack'require'fluent/engine'beginu=Fluent::Engine.msgpack_factory.unpacker($stdin)u.each{|record|w.write(record)}rescueEOFErrorrescue$stderr.puts$!exit1endwhen'none'beginwhileline=$stdin.getsrecord={message_key=>line.chomp}w.write(record)endrescue$stderr.puts$!exit1endelse$stderr.puts"Unknown format '#{format}'"exit1end