class LogStash::Inputs::BeatsSupport::ConnectionHandler
-
Should we just take the raw content of the parsed json frame
- Should we use a codec on specific field?
from the beat library.
Decide which Process should be used to decode the data coming
Handle the data coming from a connection
def accept
def accept @logger.debug("Beats input: waiting from new events from remote host", :peer => @connection.peer) @connection.run { |hash, identity_stream| process(hash, identity_stream) } end
def flush(&block)
remote host closed the connection, its better to make sure we clear their
this will be call on any exception raised, either is a circuit breaker or the
This method is a bit tricky to decide when to be called, in the current case,
OOB call to flush the codec buffer,
def flush(&block) @logger.debug? && @logger.debug("Beats input, out of band call for flushing the content of this connection", :peer => @connection.peer) @codec.flush(&block) end
def from_filebeat?(hash)
def from_filebeat?(hash) !hash[Lumberjack::Beats::FILEBEAT_LOG_LINE_FIELD].nil? end
def from_logstash_forwarder?(hash)
def from_logstash_forwarder?(hash) !hash[Lumberjack::Beats::LSF_LOG_LINE_FIELD].nil? end
def initialize(connection, input, queue)
def initialize(connection, input, queue) @connection = connection @input = input @queue = queue @logger = input.logger # We need to clone the codec per connection, so we can flush a specific # codec when a connection is closed. @codec = input.codec.dup @nocodec_transformer = RawEventTransform.new(@input) @codec_transformer = DecodedEventTransform.new(@input) end
def process(hash, identity_stream)
def process(hash, identity_stream) @logger.debug? && @logger.debug("Beats input: new event received", :event_hash => hash, :identity_stream => identity_stream, :peer => @connection.peer) # Filebeats uses the `message` key and LSF `line` target_field = if from_filebeat?(hash) hash.delete(Lumberjack::Beats::FILEBEAT_LOG_LINE_FIELD) elsif from_logstash_forwarder?(hash) hash.delete(Lumberjack::Beats::LSF_LOG_LINE_FIELD) end if target_field.nil? @logger.debug? && @logger.debug("Beats input: not using the codec for this event, can't find the codec target field", :target_field_for_codec => @input.target_field_for_codec, :event_hash => hash) event = LogStash::Event.new(hash) @nocodec_transformer.transform(event) raise LogStash::Inputs::Beats::InsertingToQueueTakeTooLong if !@queue.offer(event) else @logger.debug? && @logger.debug("Beats input: decoding this event with the codec", :target_field_value => target_field) @codec.accept(CodecCallbackListener.new(target_field, hash, identity_stream, @codec_transformer, @queue)) end end