# encoding: utf-8require"lumberjack/beats"require"socket"require"thread"require"openssl"require"zlib"moduleLumberjackmoduleBeatsclassClientdefinitialize(opts={})@opts={:port=>0,:addresses=>[],:ssl_certificate=>nil,:ssl_certificate_key=>nil,:ssl_certificate_authorities=>nil,:ssl=>true,:json=>false,}.merge(opts)@opts[:addresses]=Array(@opts[:addresses])raise"Must set a port."if@opts[:port]==0raise"Must set atleast one address"if@opts[:addresses].empty?==0if@opts[:ssl]if@opts[:ssl_certificate_authorities].nil?&&(@opts[:ssl_certificate].nil?||@opts[:ssl_certificate_key].nil?)raise"Must set a ssl certificate or path"endend@socket=connectendprivatedefconnectaddrs=@opts[:addresses].shufflebeginraise"Could not connect to any hosts"ifaddrs.empty?opts=@optsopts[:address]=addrs.popLumberjack::Beats::Socket.new(opts)rescue*[Errno::ECONNREFUSED,SocketError]retryendendpublicdefwrite(elements,opts={})@socket.write_sync(elements,opts)endpublicdefhost@socket.hostendendclassSocket# Create a new Lumberjack Socket.## - options is a hash. Valid options are:## * :port - the port to listen on# * :address - the host/address to bind to# * :ssl - enable/disable ssl support# * :ssl_certificate - the path to the ssl cert to use.# If ssl_certificate is not set, a plain tcp connection# will be used.attr_reader:sequenceattr_reader:hostdefinitialize(opts={})@sequence=0@last_ack=0@opts={:port=>0,:address=>"127.0.0.1",:ssl_certificate_authorities=>[],# use the same naming as beats' TLS options:ssl_certificate=>nil,:ssl_certificate_key=>nil,:ssl_certificate_password=>nil,:ssl=>true,:json=>false,}.merge(opts)@host=@opts[:address]connection_startendprivatedefconnection_starttcp_socket=TCPSocket.new(@opts[:address],@opts[:port])if!@opts[:ssl]@socket=tcp_socketelse@socket=OpenSSL::SSL::SSLSocket.new(tcp_socket,setup_ssl)@socket.connectendendprivatedefsetup_sslssl_context=OpenSSL::SSL::SSLContext.newssl_context.cert=certificatessl_context.key=private_keyssl_context.verify_mode=OpenSSL::SSL::VERIFY_PEERssl_context.cert_store=trust_storessl_contextendprivatedefcertificateif@opts[:ssl_certificate]OpenSSL::X509::Certificate.new(File.open(@opts[:ssl_certificate]))endendprivatedefprivate_keyOpenSSL::PKey::RSA.new(File.read(@opts[:ssl_certificate_key]),@opts[:ssl_certificate_password])if@opts[:ssl_certificate_key]endprivatedeftrust_storestore=OpenSSL::X509::Store.newArray(@opts[:ssl_certificate_authorities]).eachdo|certificate_authority|ifFile.file?(certificate_authority)store.add_file(certificate_authority)else# add_path is no implemented under jruby# so recursively try to load all the certificate from this directory# https://github.com/jruby/jruby-openssl/blob/master/src/main/java/org/jruby/ext/openssl/X509Store.java#L159if!!(RUBY_PLATFORM=="java")Dir.glob(File.join(certificate_authority,"**","*")).each{|f|store.add_file(f)}elsestore.add_path(certificate_authority)endendendstoreendprivatedefinc@sequence=0if@sequence+1>Lumberjack::Beats::SEQUENCE_MAX@sequence=@sequence+1endprivatedefsend_window_size(size)@socket.syswrite(["1","W",size].pack("AAN"))endprivatedefsend_payload(payload)# SSLSocket has a limit of 16k per message# execute multiple writes if neededbytes_written=0whilebytes_written<payload.bytesizebytes_written+=@socket.syswrite(payload.byteslice(bytes_written..-1))endendpublicdefwrite_sync(elements,opts={})options={:json=>@opts[:json],}.merge(opts)elements=[elements]ifelements.is_a?(Hash)send_window_size(elements.size)encoder=options[:json]?JsonEncoder:FrameEncoderpayload=elements.map{|element|encoder.to_frame(element,inc)}.joincompress=compress_payload(payload)send_payload(compress)ack(elements.size)endprivatedefcompress_payload(payload)compress=Zlib::Deflate.deflate(payload)["1","C",compress.bytesize,compress].pack("AANA*")endprivatedefack(size)_,type=read_version_and_typeraise"Whoa we shouldn't get this frame: #{type}"iftype!="A"@last_ack=read_last_ackendprivatedefunacked_sequence_sizesequence-(@last_ack+1)endprivatedefread_version_and_typeversion=@socket.read(1)type=@socket.read(1)[version,type]endprivatedefread_last_ack@socket.read(4).unpack("N").firstendendmoduleJsonEncoderdefself.to_frame(hash,sequence)json=Lumberjack::Beats::json.dump(hash)json_length=json.bytesizepack="AANNA#{json_length}"frame=["1","J",sequence,json_length,json]frame.pack(pack)endend# JsonEncodermoduleFrameEncoderdefself.to_frame(hash,sequence)frame=["1","D",sequence]pack="AAN"keys=deep_keys(hash)frame<<keys.lengthpack<<"N"keys.eachdo|k|val=deep_get(hash,k)key_length=k.bytesizeval_length=val.bytesizeframe<<key_lengthpack<<"N"frame<<kpack<<"A#{key_length}"frame<<val_lengthpack<<"N"frame<<valpack<<"A#{val_length}"endframe.pack(pack)endprivatedefself.deep_get(hash,key="")returnhashifkey.nil?deep_get(hash[key.split('.').first],key[key.split('.').first.length+1..key.length])endprivatedefself.deep_keys(hash,prefix="")keys=[]hash.eachdo|k,v|keys<<"#{prefix}#{k}"ifv.class==Stringkeys<<deep_keys(hash[k],"#{k}.")ifv.class==Hashendkeys.flattenendend# module Encoderend;end