class LogStash::Inputs::BeatsSupport::ConnectionHandler

  • Should we just take the raw content of the parsed json frame
    - Should we use a codec on specific field?
    from the beat library.
    Decide which Process should be used to decode the data coming
    Handle the data coming from a connection

def accept

def accept
  @logger.debug("Beats input: waiting from new events from remote host",
                :peer => @connection.peer)
  @connection.run { |hash, identity_stream| process(hash, identity_stream) }
end

def flush(&block)

data and create duplicates then losing the data.
remote host closed the connection, its better to make sure we clear their
this will be call on any exception raised, either is a circuit breaker or the
This method is a bit tricky to decide when to be called, in the current case,

OOB call to flush the codec buffer,
def flush(&block)
  @logger.debug? && @logger.debug("Beats input, out of band call for flushing the content of this connection",
                                  :peer => @connection.peer)
  @codec.flush(&block)
end

def from_filebeat?(hash)

def from_filebeat?(hash)
  !hash[Lumberjack::Beats::FILEBEAT_LOG_LINE_FIELD].nil?
end

def from_logstash_forwarder?(hash)

def from_logstash_forwarder?(hash)
  !hash[Lumberjack::Beats::LSF_LOG_LINE_FIELD].nil?
end

def initialize(connection, input, queue)

def initialize(connection, input, queue)
  @connection = connection
  @input = input
  @queue = queue
  @logger = input.logger
  # We need to clone the codec per connection, so we can flush a specific 
  # codec when a connection is closed.
  @codec = input.codec.dup
  @nocodec_transformer = RawEventTransform.new(@input)
  @codec_transformer = DecodedEventTransform.new(@input)
end

def process(hash, identity_stream)

def process(hash, identity_stream)
  @logger.debug? && @logger.debug("Beats input: new event received",
                                  :event_hash => hash,
                                  :identity_stream => identity_stream,
                                  :peer => @connection.peer)
  # Filebeats uses the `message` key and LSF `line`
  target_field = if from_filebeat?(hash)
                   hash.delete(Lumberjack::Beats::FILEBEAT_LOG_LINE_FIELD)
                 elsif from_logstash_forwarder?(hash)
                   hash.delete(Lumberjack::Beats::LSF_LOG_LINE_FIELD)
                 end
  if target_field.nil?
    @logger.debug? && @logger.debug("Beats input: not using the codec for this event, can't find the codec target field",
                                    :target_field_for_codec => @input.target_field_for_codec,
                                    :event_hash => hash)
    event = LogStash::Event.new(hash)
    @nocodec_transformer.transform(event)
    raise LogStash::Inputs::Beats::InsertingToQueueTakeTooLong if !@queue.offer(event)
  else
    @logger.debug? && @logger.debug("Beats input: decoding this event with the codec",
                                    :target_field_value =>  target_field)
    @codec.accept(CodecCallbackListener.new(target_field,
                                            hash,
                                            identity_stream,
                                            @codec_transformer,
                                            @queue))
  end
end