class Aws::Binary::EventStreamDecoder
@api private
def emit_event(raw_event)
def emit_event(raw_event) event = @event_parser.apply(raw_event) @events << event @emitter.signal(event.event_type, event) unless @emitter.nil? end
def extract_stream_class(type_class)
def extract_stream_class(type_class) parts = type_class.to_s.split('::') parts.inject(Kernel) do |const, part_name| part_name == 'Types' ? const.const_get('EventStreams') : const.const_get(part_name) end end
def initialize(protocol, rules, output_ref, error_refs, io, event_stream_handler = nil)
-
event_stream_handler
(EventStream|nil
) -- A Service EventStream object -
error_refs
(Array
) -- array of ShapeRefs for errors -
output_ref
(ShapeRef
) -- ShapeRef of output shape -
rules
(ShapeRef
) -- ShapeRef of the eventstream member -
protocol
(String
) --
def initialize(protocol, rules, output_ref, error_refs, io, event_stream_handler = nil) @decoder = Aws::EventStream::Decoder.new @event_parser = EventParser.new(parser_class(protocol), rules, error_refs, output_ref) @stream_class = extract_stream_class(rules.shape.struct_class) @emitter = event_stream_handler.event_emitter @events = [] end
def parser_class(protocol)
def parser_class(protocol) case protocol when 'rest-xml' then Aws::Xml::Parser when 'rest-json' then Aws::Json::Parser when 'json' then Aws::Json::Parser else raise "unsupported protocol #{protocol} for event stream" end end
def write(chunk)
def write(chunk) raw_event, eof = @decoder.decode_chunk(chunk) emit_event(raw_event) if raw_event while !eof # exhaust message_buffer data raw_event, eof = @decoder.decode_chunk emit_event(raw_event) if raw_event end end