class Lumberjack::Beats::Parser

def compressed_lead(&block)

def compressed_lead(&block)
  length = get.unpack("N").first
  transition(:compressed_payload, length)
end

def compressed_payload(&block)

def compressed_payload(&block)
  original = Zlib::Inflate.inflate(get)
  transition(:header, 2)
  # Parse the uncompressed payload.
  feed(original, &block)
end

def data_field_key(&block)

def data_field_key(&block)
  @key = get
  transition(:data_field_value_len, 4)
end

def data_field_key_len(&block)

def data_field_key_len(&block)
  key_len = get.unpack("N").first
  transition(:data_field_key, key_len)
end

def data_field_value(&block)

def data_field_value(&block)
  @value = get
  @data_count -= 1
  @data[@key] = @value
  if @data_count > 0
    transition(:data_field_key_len, 4)
  else
    # emit the whole map now that we found the end of the data fields list.
    yield :data, @sequence, @data
    transition(:header, 2)
  end
end # def data_field_value

def data_field_value_len(&block)

def data_field_value_len(&block)
  transition(:data_field_value, get.unpack("N").first)
end

def data_lead(&block)

def data_lead(&block)
  @sequence, @data_count = get.unpack("NN")
  @data = {}
  transition(:data_field_key_len, 4)
end

def feed(data, &block)

Returns:
  • (String, nil) - the websocket message payload, if any, nil otherwise.

Parameters:
  • the (String) -- string data to feed into the parser.
def feed(data, &block)
  @buffer << data
  #p :need => @need
  while have?(@need)
    send(@state, &block)
    #case @state
    #when :header; header(&block)
    #when :window_size; window_size(&block)
    #when :data_lead; data_lead(&block)
    #when :data_field_key_len; data_field_key_len(&block)
    #when :data_field_key; data_field_key(&block)
    #when :data_field_value_len; data_field_value_len(&block)
    #when :data_field_value; data_field_value(&block)
    #when :data_field_value; data_field_value(&block)
    #when :compressed_lead; compressed_lead(&block)
    #when :compressed_payload; compressed_payload(&block)
    #end # case @state
  end
  return nil
end # def <<

def get(length=nil)

Get 'length' string from the buffer.
def get(length=nil)
  length = @need if length.nil?
  data = @buffer[@buffer_offset ... @buffer_offset + length]
  @buffer_offset += length
  if @buffer_offset > 16384
    @buffer = @buffer[@buffer_offset  .. -1]
    @buffer_offset = 0
  end
  return data
end # def get

def handle_version(version, &block)

def handle_version(version, &block)
  if supported_protocol?(version)
    yield :version, version
  else
    raise UnsupportedProtocol, "unsupported protocol #{version}"
  end
end

def have?(length)

Do we have at least 'length' bytes in the buffer?
def have?(length)
  return length <= (@buffer.size - @buffer_offset)
end # def have?

def header(&block)

def header(&block)
  version, frame_type = get.bytes.to_a[0..1]
  version ||= PROTOCOL_VERSION_1
  handle_version(version, &block)
  case frame_type
  when FRAME_WINDOW; transition(:window_size, 4)
  when FRAME_DATA; transition(:data_lead, 8)
  when FRAME_JSON_DATA; transition(:json_data_lead, 8)
  when FRAME_COMPRESSED; transition(:compressed_lead, 4)
  else; raise "Unknown frame type: `#{frame_type}`"
  end
end

def initialize

def initialize
  @buffer_offset = 0
  @buffer = ""
  @buffer.force_encoding("BINARY")
  transition(:header, 2)
end # def initialize

def json_data_lead(&block)

def json_data_lead(&block)
  @sequence, payload_size = get.unpack("NN")
  transition(:json_data_payload, payload_size)
end

def json_data_payload(&block)

def json_data_payload(&block)
  payload = get
  yield :json, @sequence, Lumberjack::Beats::json.load(payload)
  transition(:header, 2)
end

def need(length)

Set the minimum number of bytes we need in the buffer for the next read.
def need(length)
  @need = length
end # def need

def supported_protocol?(version)

def supported_protocol?(version)
  PROTOCOL_VERSION_2 == version || PROTOCOL_VERSION_1 == version
end

def transition(state, next_length)

def transition(state, next_length)
  @state = state
  #puts :transition => state
  # TODO(sissel): Assert this self.respond_to?(state)
  # TODO(sissel): Assert state is in STATES
  # TODO(sissel): Assert next_length is a number
  need(next_length)
end # def transition

def window_size(&block)

def window_size(&block)
  @window_size = get.unpack("N").first
  transition(:header, 2)
  yield :window_size, @window_size
end # def window_size