class Lumberjack::Beats::Connection
def ack_if_needed(sequence, &block)
def ack_if_needed(sequence, &block) block.call send_ack(sequence) if @ack_handler.ack?(sequence) end
def close
def close @fd.close unless @fd.closed? end
def data(map, &block)
def data(map, &block) block.call(map, identity_stream(map)) if block_given? end
def identity_stream(map)
def identity_stream(map) id = map.fetch("beat", {})["id"] if id && map["resource_id"] identity_values = [id, map["resource_id"]] else identity_values = [map.fetch("beat", {})["name"], map["source"]] end identity_values.compact.join("-") end
def initialize(fd, server)
def initialize(fd, server) @parser = Parser.new @fd = fd @server = server @ack_handler = nil # Fetch the details of the host before reading anything from the socket # se we can use that information when debugging connection issues with # remote hosts. begin @peer = "#{@fd.peeraddr[3]}:#{@fd.peeraddr[1]}" rescue IOError # This can happen if the connection is drop or close before # fetching the host details, lets return a generic string. @peer = PEER_INFORMATION_NOT_AVAILABLE end end
def normalize_v1_metadata_encoding(map)
def normalize_v1_metadata_encoding(map) # lets normalize the metadata of the v1 frame to make # sure everything is in utf-8 format, because LSF don't enforce the encoding when he send # the data to the server. Path, offset can be in another encoding, when the data is assigned to the event. # the event will validate it and crash when the encoding is in the wrong format. map.each { |k, v| map[k].force_encoding(Encoding::UTF_8) unless k == Lumberjack::Beats::LSF_LOG_LINE_FIELD } map end
def read_socket(&block)
def read_socket(&block) # TODO(sissel): Ack on idle. # X: - if any unacked, IO.select # X: - on timeout, ack all. # X: Doing so will prevent slow streams from retransmitting # X: too many events after errors. @parser.feed(@fd.sysread(READ_SIZE)) do |event, *args| case event when :version version(*args) when :window_size reset_next_ack(*args) when :data sequence, map = args ack_if_needed(sequence) { data(normalize_v1_metadata_encoding(map), &block) } when :json # If the payload is an array of items we will emit multiple events # this behavior was moved from the plugin to the library. # see this commit: https://github.com/logstash-plugins/logstash-input-lumberjack/pull/57/files#diff-1b9590423b15f04f215635164e7376ecR158 sequence, map = args ack_if_needed(sequence) do if map.is_a?(Array) map.each { |e| data(e, &block) } else data(map, &block) end end end end end
def reset_next_ack(window_size)
def reset_next_ack(window_size) klass = version_1? ? AckingProtocolV1 : AckingProtocolV2 @ack_handler = klass.new(window_size) end
def run(&block)
def run(&block) while !server.closed? read_socket(&block) end rescue *RESCUED_CONNECTION_EXCEPTIONS => e # EOF or other read errors, only action is to shutdown which we'll do in # 'ensure' raise ConnectionClosed.new(e) rescue # when the server is shutting down we can safely ignore any exceptions # On windows, we can get a `SystemCallErr` raise unless server.closed? ensure close rescue 'Already closed stream' end # def run
def send_ack(sequence)
def send_ack(sequence) @fd.syswrite(@ack_handler.ack_frame(sequence)) end
def version(version)
def version(version) @version = version end
def version_1?
def version_1? @version == Parser::PROTOCOL_VERSION_1 end