class Anthropic::Helpers::Streaming::MessageStream
@generic Elem
events while maintaining accumulated message state throughout the stream lifecycle.
the Anthropic API, yielding a mix of raw streaming events and higher-level typed
MessageStream provides a Ruby Enumerable interface over Server-Sent Events from
@api private
def accumulate_event(event:, current_snapshot:)
-
(Anthropic::Models::Message)
- updated message snapshot with event applied
Parameters:
-
current_snapshot
(Anthropic::Models::Message, nil
) -- current accumulated message state -
event
(Anthropic::Models::RawMessageStreamEvent
) -- the raw streaming event to process
Other tags:
- Api: - private
def accumulate_event(event:, current_snapshot:) event in Anthropic::Models::RawMessageStreamEvent age = "Expected event to be a variant of RawMessageStreamEvent, got #{event.class}" e ArgumentError.new(message) rent_snapshot.nil? rn event.message if event.type == :message_start age = "Unexpected event order, got \"#{event.type}\" before \":message_start\"" e RuntimeError.new(message) vent hropic::Models::RawMessageStartEvent e the converter to create a new, isolated copy of the message object. is ensures proper type validation and prevents shared object references at could lead to unintended mutations during streaming accumulation. tches the Python SDK's approach of explicitly constructing Message objects. rn Anthropic::Internal::Type::Converter.coerce(Anthropic::Models::Message, event.message) hropic::Models::RawContentBlockStartEvent ent_snapshot.content = (current_snapshot.content || []) + [event.content_block] hropic::Models::RawContentBlockDeltaEvent ent = current_snapshot.content[event.index] (delta = event.delta) nthropic::Models::TextDelta if content.type == :text ntent.text += delta.text nthropic::Models::InputJSONDelta if content.type == :tool_use on_buf = content.json_buf.to_s on_buf += delta.partial_json ntent.input = json_buf ntent.json_buf = json_buf nthropic::Models::CitationsDelta if content.type == :text ntent.citations ||= [] ntent.citations << delta.citation nthropic::Models::ThinkingDelta if content.type == :thinking ntent.thinking += delta.thinking nthropic::Models::SignatureDelta if content.type == :thinking ntent.signature = delta.signature hropic::Models::RawMessageDeltaEvent ent_snapshot.stop_reason = event.delta.stop_reason ent_snapshot.stop_sequence = event.delta.stop_sequence ent_snapshot.usage.output_tokens = event.usage.output_tokens t_snapshot
def accumulated_message
-
(Anthropic::Models::Message)
-
Other tags:
- Api: - public
def accumulated_message until_done @accumated_message_snapshot end
def accumulated_text
-
(String)
-
Other tags:
- Api: - public
def accumulated_text message = accumulated_message text_blocks = [] message.content.each do |block| if block.type == :text text_blocks << block.text end end if text_blocks.empty? raise RuntimeError.new("Expected to have received at least 1 text block") end text_blocks.join end
def build_events(event:, message_snapshot:)
-
(Array
- events to yield (mix of raw and typed events)
Parameters:
-
message_snapshot
(Anthropic::Models::Message
) -- current accumulated message state -
event
(Anthropic::Models::RawMessageStreamEvent
) -- the raw event to process
Other tags:
- Api: - private
def build_events(event:, message_snapshot:) _to_yield = [] vent hropic::Models::RawMessageStopEvent ts_to_yield << MessageStopEvent.new( pe: :message_stop, ssage: message_snapshot hropic::Models::RawContentBlockDeltaEvent ts_to_yield << event ent_block = message_snapshot.content[event.index] (delta = event.delta) nthropic::Models::TextDelta if content_block.type == :text ents_to_yield << Anthropic::Streaming::TextEvent.new( type: :text, text: delta.text, snapshot: content_block.text nthropic::Models::InputJSONDelta if content_block.type == :tool_use ents_to_yield << Anthropic::Streaming::InputJsonEvent.new( type: :input_json, partial_json: delta.partial_json, snapshot: content_block.input nthropic::Models::CitationsDelta if content_block.type == :text ents_to_yield << Anthropic::Streaming::CitationEvent.new( type: :citation, citation: delta.citation, snapshot: content_block.citations || [] nthropic::Models::ThinkingDelta if content_block.type == :thinking ents_to_yield << Anthropic::Streaming::ThinkingEvent.new( type: :thinking, thinking: delta.thinking, snapshot: content_block.thinking nthropic::Models::SignatureDelta if content_block.type == :thinking ents_to_yield << Anthropic::Streaming::SignatureEvent.new( type: :signature, signature: content_block.signature hropic::Models::RawContentBlockStopEvent ent_block = message_snapshot.content[event.index] ts_to_yield << ContentBlockStopEvent.new( pe: :content_block_stop, dex: event.index, ntent_block: content_block ts_to_yield << event _to_yield
def initialize(raw_stream:)
-
raw_stream
(Anthropic::Internal::Type::BaseStream
) --
Other tags:
- Api: - private
def initialize(raw_stream:) # The underlying Server-Sent Event stream from the Anthropic API. @raw_stream = raw_stream # Accumulated message state that builds up as events are processed. @accumated_message_snapshot = nil # Lazy enumerable that transforms raw events into consumable events. @iterator = iterator end
def iterator
-
(Enumerable
->)
Other tags:
- Api: - private
def iterator tor ||= Anthropic::Internal::Util.chain_fused(@stream) do |y| _stream.each do |raw_event| ccumated_message_snapshot = accumulate_event( event: raw_event, current_snapshot: @accumated_message_snapshot ents_to_yield = build_events(event: raw_event, message_snapshot: @accumated_message_snapshot) ents_to_yield.each(&y)
def text
-
(Enumerable
-)
Other tags:
- Api: - public
def text Anthropic::Internal::Util.chain_fused(@iterator) do |y| @iterator.each do |event| if event.type == :content_block_delta && event.delta.type == :text_delta y << event.delta.text end end end end
def until_done = each {} # rubocop:disable Lint/EmptyBlock
-
(void)
-
Other tags:
- Api: - public
def until_done = each {} # rubocop:disable Lint/EmptyBlock