class Lumberjack::Beats::Connection

def ack_if_needed(sequence, &block)

def ack_if_needed(sequence, &block)
  block.call
  send_ack(sequence) if @ack_handler.ack?(sequence)
end

def close

def close
  @fd.close unless @fd.closed?
end

def data(map, &block)

def data(map, &block)
  block.call(map, identity_stream(map)) if block_given?
end

def identity_stream(map)

def identity_stream(map)
  id = map.fetch("beat", {})["id"]
  if id && map["resource_id"]
    identity_values = [id, map["resource_id"]]
  else
    identity_values = [map.fetch("beat", {})["name"],
                      map["source"]]
  end
  identity_values.compact.join("-")
end

def initialize(fd, server)

def initialize(fd, server)
  @parser = Parser.new
  @fd = fd
  @server = server
  @ack_handler = nil
  # Fetch the details of the host before reading anything from the socket
  # se we can use that information when debugging connection issues with
  # remote hosts.
  begin
    @peer = "#{@fd.peeraddr[3]}:#{@fd.peeraddr[1]}"
  rescue IOError
    # This can happen if the connection is drop or close before
    # fetching the host details, lets return a generic string.
    @peer = PEER_INFORMATION_NOT_AVAILABLE
  end
end

def normalize_v1_metadata_encoding(map)

def normalize_v1_metadata_encoding(map)
  # lets normalize the metadata of the v1 frame to make
  # sure everything is in utf-8 format, because LSF don't enforce the encoding when he send
  # the data to the server. Path, offset can be in another encoding, when the data is assigned to the event.
  # the event will validate it and crash when the encoding is in the wrong format.
  map.each { |k, v| map[k].force_encoding(Encoding::UTF_8) unless k == Lumberjack::Beats::LSF_LOG_LINE_FIELD }
  map
end

def read_socket(&block)

def read_socket(&block)
  # TODO(sissel): Ack on idle.
  # X: - if any unacked, IO.select
  # X:   - on timeout, ack all.
  # X: Doing so will prevent slow streams from retransmitting
  # X: too many events after errors.
  @parser.feed(@fd.sysread(READ_SIZE)) do |event, *args|
    case event
    when :version
      version(*args)
    when :window_size
      reset_next_ack(*args)
    when :data
      sequence, map = args
      ack_if_needed(sequence) { data(normalize_v1_metadata_encoding(map), &block) }
    when :json
      # If the payload is an array of items we will emit multiple events
      # this behavior was moved from the plugin to the library.
      # see this commit: https://github.com/logstash-plugins/logstash-input-lumberjack/pull/57/files#diff-1b9590423b15f04f215635164e7376ecR158
      sequence, map = args
      ack_if_needed(sequence) do
        if map.is_a?(Array)
          map.each { |e| data(e, &block) }
        else
          data(map, &block)
        end
      end
    end
  end
end

def reset_next_ack(window_size)

def reset_next_ack(window_size)
  klass = version_1? ? AckingProtocolV1 : AckingProtocolV2
  @ack_handler = klass.new(window_size)
end

def run(&block)

def run(&block)
  while !server.closed?
    read_socket(&block)
  end
rescue *RESCUED_CONNECTION_EXCEPTIONS => e
  # EOF or other read errors, only action is to shutdown which we'll do in
  # 'ensure'
  raise ConnectionClosed.new(e)
rescue
  # when the server is shutting down we can safely ignore any exceptions
  # On windows, we can get a `SystemCallErr`
  raise unless server.closed?
ensure
  close rescue 'Already closed stream'
end # def run

def send_ack(sequence)

def send_ack(sequence)
  @fd.syswrite(@ack_handler.ack_frame(sequence))
end

def version(version)

def version(version)
  @version = version
end

def version_1?

def version_1?
  @version == Parser::PROTOCOL_VERSION_1
end