class Anthropic::Helpers::Streaming::MessageStream

@generic Elem
events while maintaining accumulated message state throughout the stream lifecycle.
the Anthropic API, yielding a mix of raw streaming events and higher-level typed
MessageStream provides a Ruby Enumerable interface over Server-Sent Events from
@api private

def accumulate_event(event:, current_snapshot:)

Returns:
  • (Anthropic::Models::Message) - updated message snapshot with event applied

Parameters:
  • current_snapshot (Anthropic::Models::Message, nil) -- current accumulated message state
  • event (Anthropic::Models::RawMessageStreamEvent) -- the raw streaming event to process

Other tags:
    Api: - private
def accumulate_event(event:, current_snapshot:)
 event in Anthropic::Models::RawMessageStreamEvent
age = "Expected event to be a variant of RawMessageStreamEvent, got #{event.class}"
e ArgumentError.new(message)
rent_snapshot.nil?
rn event.message if event.type == :message_start
age = "Unexpected event order, got \"#{event.type}\" before \":message_start\""
e RuntimeError.new(message)
vent
hropic::Models::RawMessageStartEvent
e the converter to create a new, isolated copy of the message object.
is ensures proper type validation and prevents shared object references
at could lead to unintended mutations during streaming accumulation.
tches the Python SDK's approach of explicitly constructing Message objects.
rn Anthropic::Internal::Type::Converter.coerce(Anthropic::Models::Message, event.message)
hropic::Models::RawContentBlockStartEvent
ent_snapshot.content = (current_snapshot.content || []) + [event.content_block]
hropic::Models::RawContentBlockDeltaEvent
ent = current_snapshot.content[event.index]
 (delta = event.delta)
nthropic::Models::TextDelta if content.type == :text
ntent.text += delta.text
nthropic::Models::InputJSONDelta if content.type == :tool_use
on_buf = content.json_buf.to_s
on_buf += delta.partial_json
ntent.input = json_buf
ntent.json_buf = json_buf
nthropic::Models::CitationsDelta if content.type == :text
ntent.citations ||= []
ntent.citations << delta.citation
nthropic::Models::ThinkingDelta if content.type == :thinking
ntent.thinking += delta.thinking
nthropic::Models::SignatureDelta if content.type == :thinking
ntent.signature = delta.signature

hropic::Models::RawMessageDeltaEvent
ent_snapshot.stop_reason = event.delta.stop_reason
ent_snapshot.stop_sequence = event.delta.stop_sequence
ent_snapshot.usage.output_tokens = event.usage.output_tokens
t_snapshot

def accumulated_message

Returns:
  • (Anthropic::Models::Message) -

Other tags:
    Api: - public
def accumulated_message
  until_done
  @accumated_message_snapshot
end

def accumulated_text

Returns:
  • (String) -

Other tags:
    Api: - public
def accumulated_text
  message = accumulated_message
  text_blocks = []
  message.content.each do |block|
    if block.type == :text
      text_blocks << block.text
    end
  end
  if text_blocks.empty?
    raise RuntimeError.new("Expected to have received at least 1 text block")
  end
  text_blocks.join
end

def build_events(event:, message_snapshot:)

Returns:
  • (Array) - events to yield (mix of raw and typed events)
    Parameters:
    • message_snapshot (Anthropic::Models::Message) -- current accumulated message state
    • event (Anthropic::Models::RawMessageStreamEvent) -- the raw event to process

    Other tags:
      Api: - private
    def build_events(event:, message_snapshot:)
    _to_yield = []
    vent
    hropic::Models::RawMessageStopEvent
    ts_to_yield << MessageStopEvent.new(
    pe: :message_stop,
    ssage: message_snapshot
    hropic::Models::RawContentBlockDeltaEvent
    ts_to_yield << event
    ent_block = message_snapshot.content[event.index]
     (delta = event.delta)
    nthropic::Models::TextDelta if content_block.type == :text
    ents_to_yield << Anthropic::Streaming::TextEvent.new(
    type: :text,
    text: delta.text,
    snapshot: content_block.text
    nthropic::Models::InputJSONDelta if content_block.type == :tool_use
    ents_to_yield << Anthropic::Streaming::InputJsonEvent.new(
    type: :input_json,
    partial_json: delta.partial_json,
    snapshot: content_block.input
    nthropic::Models::CitationsDelta if content_block.type == :text
    ents_to_yield << Anthropic::Streaming::CitationEvent.new(
    type: :citation,
    citation: delta.citation,
    snapshot: content_block.citations || []
    nthropic::Models::ThinkingDelta if content_block.type == :thinking
    ents_to_yield << Anthropic::Streaming::ThinkingEvent.new(
    type: :thinking,
    thinking: delta.thinking,
    snapshot: content_block.thinking
    nthropic::Models::SignatureDelta if content_block.type == :thinking
    ents_to_yield << Anthropic::Streaming::SignatureEvent.new(
    type: :signature,
    signature: content_block.signature
    
    hropic::Models::RawContentBlockStopEvent
    ent_block = message_snapshot.content[event.index]
    ts_to_yield << ContentBlockStopEvent.new(
    pe: :content_block_stop,
    dex: event.index,
    ntent_block: content_block
    ts_to_yield << event
    _to_yield

    def initialize(raw_stream:)

    Parameters:
    • raw_stream (Anthropic::Internal::Type::BaseStream) --

    Other tags:
      Api: - private
    def initialize(raw_stream:)
      # The underlying Server-Sent Event stream from the Anthropic API.
      @raw_stream = raw_stream
      # Accumulated message state that builds up as events are processed.
      @accumated_message_snapshot = nil
      # Lazy enumerable that transforms raw events into consumable events.
      @iterator = iterator
    end

    def iterator

    Returns:
    • (Enumerable>) -

    Other tags:
      Api: - private
    def iterator
    tor ||= Anthropic::Internal::Util.chain_fused(@stream) do |y|
    _stream.each do |raw_event|
    ccumated_message_snapshot = accumulate_event(
    event: raw_event,
    current_snapshot: @accumated_message_snapshot
    ents_to_yield = build_events(event: raw_event, message_snapshot: @accumated_message_snapshot)
    ents_to_yield.each(&y)

    def text

    Returns:
    • (Enumerable) -

    Other tags:
      Api: - public
    def text
      Anthropic::Internal::Util.chain_fused(@iterator) do |y|
        @iterator.each do |event|
          if event.type == :content_block_delta && event.delta.type == :text_delta
            y << event.delta.text
          end
        end
      end
    end

    def until_done = each {} # rubocop:disable Lint/EmptyBlock

    Returns:
    • (void) -

    Other tags:
      Api: - public
    def until_done = each {} # rubocop:disable Lint/EmptyBlock