class Anthropic::Helpers::Streaming::MessageStream

@generic Elem
events while maintaining accumulated message state throughout the stream lifecycle.
the Anthropic API, yielding a mix of raw streaming events and higher-level typed
MessageStream provides a Ruby Enumerable interface over Server-Sent Events from
@api private

def accumulate_event(event:, current_snapshot:)

Returns:
  • (Anthropic::Models::Message) - updated message snapshot with event applied

Parameters:
  • current_snapshot (Anthropic::Models::Message, nil) -- current accumulated message state
  • event (Anthropic::Models::RawMessageStreamEvent) -- the raw streaming event to process

Other tags:
    Api: - private
def accumulate_event(event:, current_snapshot:)
vent
hropic::Models::RawMessageStreamEvent | Anthropic::Models::BetaRawMessageStreamEvent
age = "Expected event to be a variant of RawMessageStreamEvent, got #{event.class}"
e ArgumentError.new(message)
rent_snapshot.nil?
rn event.message if event.type == :message_start
age = "Unexpected event order, got \"#{event.type}\" before \":message_start\""
e RuntimeError.new(message)
vent
hropic::Models::RawMessageStartEvent # Use the converter to create a new, isolated copy of the message object.
is ensures proper type validation and prevents shared object references
at could lead to unintended mutations during streaming accumulation.
tches the Python SDK's approach of explicitly constructing Message objects.
rn Anthropic::Internal::Type::Converter.coerce(Anthropic::Models::Message, event.message)
hropic::Models::BetaRawMessageStartEvent
rn Anthropic::Internal::Type::Converter.coerce(Anthropic::Models::BetaMessage, event.message)
hropic::Models::RawContentBlockStartEvent | Anthropic::Models::BetaRawContentBlockStartEvent
ent_snapshot.content = (current_snapshot.content || []) + [event.content_block]
hropic::Models::RawContentBlockDeltaEvent | Anthropic::Models::BetaRawContentBlockDeltaEvent
ent = current_snapshot.content[event.index]
 (delta = event.delta)
nthropic::Models::TextDelta | Anthropic::Models::BetaTextDelta if content.type == :text
ntent.text += delta.text
nthropic::Models::InputJSONDelta | Anthropic::Models::BetaInputJSONDelta if content.type == :tool_use
on_buf = content._json_buf.to_s
on_buf += delta.partial_json
ntent.input = json_buf
ntent._json_buf = json_buf
nthropic::Models::CitationsDelta | Anthropic::Models::BetaCitationsDelta if content.type == :text
ntent.citations ||= []
ntent.citations << delta.citation
nthropic::Models::ThinkingDelta | Anthropic::Models::BetaThinkingDelta if content.type == :thinking
ntent.thinking += delta.thinking
nthropic::Models::SignatureDelta | Anthropic::Models::BetaSignatureDelta if content.type == :thinking
ntent.signature = delta.signature
nthropic::Models::BetaCompactionContentBlockDelta if content.type == :compaction
ntent.content = delta.content

hropic::Models::RawMessageDeltaEvent | Anthropic::Models::BetaRawMessageDeltaEvent
ent_snapshot.stop_reason = event.delta.stop_reason
ent_snapshot.stop_sequence = event.delta.stop_sequence
ent_snapshot.usage.output_tokens = event.usage.output_tokens
vent.is_a?(Anthropic::Models::BetaRawMessageDeltaEvent) && !event.usage.iterations.nil?
rrent_snapshot.usage.iterations = event.usage.iterations
t_snapshot

def accumulated_message

Returns:
  • (Anthropic::Models::Message, Anthropic::Models::Beta::BetaMessage) -

Other tags:
    Api: - public
def accumulated_message
  until_done
  parse_content_blocks!(@accumated_message_snapshot)
  @accumated_message_snapshot
end

def accumulated_text

Returns:
  • (String) -

Other tags:
    Api: - public
def accumulated_text
  message = accumulated_message
  message.content.map { _1.type == :text ? _1.text : nil }.join
end

def build_events(event:, message_snapshot:)

Returns:
  • (Array) - events to yield (mix of raw and typed events)
    Parameters:
    • message_snapshot (Anthropic::Models::Message) -- current accumulated message state
    • event (Anthropic::Models::RawMessageStreamEvent) -- the raw event to process

    Other tags:
      Api: - private
    def build_events(event:, message_snapshot:)
    _to_yield = []
    vent
    hropic::Models::RawMessageStopEvent | Anthropic::Models::BetaRawMessageStopEvent
    ts_to_yield << MessageStopEvent.new(
    pe: :message_stop,
    ssage: message_snapshot
    hropic::Models::RawContentBlockDeltaEvent | Anthropic::Models::BetaRawContentBlockDeltaEvent
    ts_to_yield << event
    ent_block = message_snapshot.content[event.index]
     (delta = event.delta)
    nthropic::Models::TextDelta | Anthropic::Models::BetaTextDelta if content_block.type == :text
    ents_to_yield << Anthropic::Streaming::TextEvent.new(
    type: :text,
    text: delta.text,
    snapshot: content_block.text
    nthropic::Models::InputJSONDelta | Anthropic::Models::BetaInputJSONDelta if content_block.type == :tool_use
    ents_to_yield << Anthropic::Streaming::InputJsonEvent.new(
    type: :input_json,
    partial_json: delta.partial_json,
    snapshot: content_block.input
    nthropic::Models::CitationsDelta | Anthropic::Models::BetaCitationsDelta if content_block.type == :text
    ents_to_yield << Anthropic::Streaming::CitationEvent.new(
    type: :citation,
    citation: delta.citation,
    snapshot: content_block.citations || []
    nthropic::Models::ThinkingDelta | Anthropic::Models::BetaThinkingDelta if content_block.type == :thinking
    ents_to_yield << Anthropic::Streaming::ThinkingEvent.new(
    type: :thinking,
    thinking: delta.thinking,
    snapshot: content_block.thinking
    nthropic::Models::SignatureDelta | Anthropic::Models::BetaSignatureDelta if content_block.type == :thinking
    ents_to_yield << Anthropic::Streaming::SignatureEvent.new(
    type: :signature,
    signature: content_block.signature
    nthropic::Models::BetaCompactionContentBlockDelta if content_block.type == :compaction
    ents_to_yield << Anthropic::Streaming::CompactionEvent.new(
    type: :compaction,
    content: content_block.content
    
    hropic::Models::RawContentBlockStopEvent | Anthropic::Models::BetaRawContentBlockStopEvent
    ent_block = message_snapshot.content[event.index]
    ts_to_yield << Anthropic::Streaming::ContentBlockStopEvent.new(
    pe: :content_block_stop,
    dex: event.index,
    ntent_block: content_block
    ts_to_yield << event
    _to_yield

    def initialize(raw_stream:, tools: {}, models: {})

    Parameters:
    • models (Hash{String=>Class}) -- Mapping of tool names to model classes
    • tools (Hash{String=>Class}) -- Mapping of tool names to model classes
    • raw_stream (Anthropic::Internal::Type::BaseStream) --

    Other tags:
      Api: - private
    def initialize(raw_stream:, tools: {}, models: {})
      # The underlying Server-Sent Event stream from the Anthropic API.
      @raw_stream = raw_stream
      # Accumulated message state that builds up as events are processed.
      @accumated_message_snapshot = nil
      # Mapping of tool names to model classes for parsing.
      @tools = tools
      @models = models
      # Lazy enumerable that transforms raw events into consumable events.
      @iterator = iterator
      @status = raw_stream.status
      @headers = raw_stream.headers
      @model = raw_stream.instance_variable_get(:@model)
    end

    def iterator

    Returns:
    • (Enumerable>) -

    Other tags:
      Api: - private
    def iterator
    tor ||= Anthropic::Internal::Util.chain_fused(@stream) do |y|
    _stream.each do |raw_event|
    ccumated_message_snapshot = accumulate_event(
    event: raw_event,
    current_snapshot: @accumated_message_snapshot
    ents_to_yield = build_events(event: raw_event, message_snapshot: @accumated_message_snapshot)
    ents_to_yield.each(&y)

    def parse_content_blocks!(message)

    Returns:
    • (Anthropic::Models::Message) - The message with parsed content

    Parameters:
    • message (Anthropic::Models::Message) -- The message to parse

    Other tags:
      Api: - private
    def parse_content_blocks!(message)
     message unless message&.content
    cop:disable Metrics/BlockLength
    e.content.each_with_index do |content, index|
     content.type
     :tool_use
    xt unless (tool = @tools[content.name])
    rsed =
    begin
      parsed_input = if content.input.is_a?(String)
        JSON.parse(content.input, symbolize_names: true)
      else
        content.input
      end
      Anthropic::Internal::Type::Converter.coerce(tool, parsed_input)
    rescue StandardError => e
      e
    end
    s =
    case content
    in Anthropic::ContentBlock
      Anthropic::Models::ToolUseBlock
    else
      Anthropic::Models::BetaToolUseBlock
    end
    ssage.content[index] = cls.new(
    id: content.id,
    input: content.input,
    name: content.name,
    type: content.type,
    parsed: parsed
     :text
    xt unless (model = @models.first&.last)
    rsed =
    begin
      json = JSON.parse(content.text, symbolize_names: true)
      Anthropic::Internal::Type::Converter.coerce(model, json)
    rescue StandardError => e
      {error: e.message}
    end
    s =
    case content
    in Anthropic::ContentBlock
      Anthropic::Models::TextBlock
    else
      Anthropic::Models::BetaTextBlock
    end
    ssage.content[index] = cls.new(
    citations: content.citations,
    text: content.text,
    type: content.type,
    parsed: parsed
    cop:enable Metrics/BlockLength
    e

    def text

    Returns:
    • (Enumerable) -

    Other tags:
      Api: - public
    def text
      Anthropic::Internal::Util.chain_fused(@iterator) do |y|
        @iterator.each do |event|
          if event.type == :content_block_delta && event.delta.type == :text_delta
            y << event.delta.text
          end
        end
      end
    end

    def until_done = each {} # rubocop:disable Lint/EmptyBlock

    Returns:
    • (void) -

    Other tags:
      Api: - public
    def until_done = each {} # rubocop:disable Lint/EmptyBlock