class Lumberjack::Beats::Parser
def compressed_lead(&block)
def compressed_lead(&block) length = get.unpack("N").first transition(:compressed_payload, length) end
def compressed_payload(&block)
def compressed_payload(&block) original = Zlib::Inflate.inflate(get) transition(:header, 2) # Parse the uncompressed payload. feed(original, &block) end
def data_field_key(&block)
def data_field_key(&block) @key = get transition(:data_field_value_len, 4) end
def data_field_key_len(&block)
def data_field_key_len(&block) key_len = get.unpack("N").first transition(:data_field_key, key_len) end
def data_field_value(&block)
def data_field_value(&block) @value = get @data_count -= 1 @data[@key] = @value if @data_count > 0 transition(:data_field_key_len, 4) else # emit the whole map now that we found the end of the data fields list. yield :data, @sequence, @data transition(:header, 2) end end # def data_field_value
def data_field_value_len(&block)
def data_field_value_len(&block) transition(:data_field_value, get.unpack("N").first) end
def data_lead(&block)
def data_lead(&block) @sequence, @data_count = get.unpack("NN") @data = {} transition(:data_field_key_len, 4) end
def feed(data, &block)
-
(String, nil)- the websocket message payload, if any, nil otherwise.
Parameters:
-
the(String) -- string data to feed into the parser.
def feed(data, &block) @buffer << data #p :need => @need while have?(@need) send(@state, &block) #case @state #when :header; header(&block) #when :window_size; window_size(&block) #when :data_lead; data_lead(&block) #when :data_field_key_len; data_field_key_len(&block) #when :data_field_key; data_field_key(&block) #when :data_field_value_len; data_field_value_len(&block) #when :data_field_value; data_field_value(&block) #when :data_field_value; data_field_value(&block) #when :compressed_lead; compressed_lead(&block) #when :compressed_payload; compressed_payload(&block) #end # case @state end return nil end # def <<
def get(length=nil)
def get(length=nil) length = @need if length.nil? data = @buffer[@buffer_offset ... @buffer_offset + length] @buffer_offset += length if @buffer_offset > 16384 @buffer = @buffer[@buffer_offset .. -1] @buffer_offset = 0 end return data end # def get
def handle_version(version, &block)
def handle_version(version, &block) if supported_protocol?(version) yield :version, version else raise UnsupportedProtocol, "unsupported protocol #{version}" end end
def have?(length)
def have?(length) return length <= (@buffer.size - @buffer_offset) end # def have?
def header(&block)
def header(&block) version, frame_type = get.bytes.to_a[0..1] version ||= PROTOCOL_VERSION_1 handle_version(version, &block) case frame_type when FRAME_WINDOW; transition(:window_size, 4) when FRAME_DATA; transition(:data_lead, 8) when FRAME_JSON_DATA; transition(:json_data_lead, 8) when FRAME_COMPRESSED; transition(:compressed_lead, 4) else; raise "Unknown frame type: `#{frame_type}`" end end
def initialize
def initialize @buffer_offset = 0 @buffer = "" @buffer.force_encoding("BINARY") transition(:header, 2) end # def initialize
def json_data_lead(&block)
def json_data_lead(&block) @sequence, payload_size = get.unpack("NN") transition(:json_data_payload, payload_size) end
def json_data_payload(&block)
def json_data_payload(&block) payload = get yield :json, @sequence, Lumberjack::Beats::json.load(payload) transition(:header, 2) end
def need(length)
def need(length) @need = length end # def need
def supported_protocol?(version)
def supported_protocol?(version) PROTOCOL_VERSION_2 == version || PROTOCOL_VERSION_1 == version end
def transition(state, next_length)
def transition(state, next_length) @state = state #puts :transition => state # TODO(sissel): Assert this self.respond_to?(state) # TODO(sissel): Assert state is in STATES # TODO(sissel): Assert next_length is a number need(next_length) end # def transition
def window_size(&block)
def window_size(&block) @window_size = get.unpack("N").first transition(:header, 2) yield :window_size, @window_size end # def window_size