class Anthropic::Helpers::Streaming::MessageStream
@generic Elem
events while maintaining accumulated message state throughout the stream lifecycle.
the Anthropic API, yielding a mix of raw streaming events and higher-level typed
MessageStream provides a Ruby Enumerable interface over Server-Sent Events from
@api private
def accumulate_event(event:, current_snapshot:)
-
(Anthropic::Models::Message)- updated message snapshot with event applied
Parameters:
-
current_snapshot(Anthropic::Models::Message, nil) -- current accumulated message state -
event(Anthropic::Models::RawMessageStreamEvent) -- the raw streaming event to process
Other tags:
- Api: - private
def accumulate_event(event:, current_snapshot:) vent hropic::Models::RawMessageStreamEvent | Anthropic::Models::BetaRawMessageStreamEvent age = "Expected event to be a variant of RawMessageStreamEvent, got #{event.class}" e ArgumentError.new(message) rent_snapshot.nil? rn event.message if event.type == :message_start age = "Unexpected event order, got \"#{event.type}\" before \":message_start\"" e RuntimeError.new(message) vent hropic::Models::RawMessageStartEvent # Use the converter to create a new, isolated copy of the message object. is ensures proper type validation and prevents shared object references at could lead to unintended mutations during streaming accumulation. tches the Python SDK's approach of explicitly constructing Message objects. rn Anthropic::Internal::Type::Converter.coerce(Anthropic::Models::Message, event.message) hropic::Models::BetaRawMessageStartEvent rn Anthropic::Internal::Type::Converter.coerce(Anthropic::Models::BetaMessage, event.message) hropic::Models::RawContentBlockStartEvent | Anthropic::Models::BetaRawContentBlockStartEvent ent_snapshot.content = (current_snapshot.content || []) + [event.content_block] hropic::Models::RawContentBlockDeltaEvent | Anthropic::Models::BetaRawContentBlockDeltaEvent ent = current_snapshot.content[event.index] (delta = event.delta) nthropic::Models::TextDelta | Anthropic::Models::BetaTextDelta if content.type == :text ntent.text += delta.text nthropic::Models::InputJSONDelta | Anthropic::Models::BetaInputJSONDelta if content.type == :tool_use on_buf = content._json_buf.to_s on_buf += delta.partial_json ntent.input = json_buf ntent._json_buf = json_buf nthropic::Models::CitationsDelta | Anthropic::Models::BetaCitationsDelta if content.type == :text ntent.citations ||= [] ntent.citations << delta.citation nthropic::Models::ThinkingDelta | Anthropic::Models::BetaThinkingDelta if content.type == :thinking ntent.thinking += delta.thinking nthropic::Models::SignatureDelta | Anthropic::Models::BetaSignatureDelta if content.type == :thinking ntent.signature = delta.signature nthropic::Models::BetaCompactionContentBlockDelta if content.type == :compaction ntent.content = delta.content hropic::Models::RawMessageDeltaEvent | Anthropic::Models::BetaRawMessageDeltaEvent ent_snapshot.stop_reason = event.delta.stop_reason ent_snapshot.stop_sequence = event.delta.stop_sequence ent_snapshot.usage.output_tokens = event.usage.output_tokens vent.is_a?(Anthropic::Models::BetaRawMessageDeltaEvent) && !event.usage.iterations.nil? rrent_snapshot.usage.iterations = event.usage.iterations t_snapshot
def accumulated_message
-
(Anthropic::Models::Message, Anthropic::Models::Beta::BetaMessage)-
Other tags:
- Api: - public
def accumulated_message until_done parse_content_blocks!(@accumated_message_snapshot) @accumated_message_snapshot end
def accumulated_text
-
(String)-
Other tags:
- Api: - public
def accumulated_text message = accumulated_message message.content.map { _1.type == :text ? _1.text : nil }.join end
def build_events(event:, message_snapshot:)
-
(Array- events to yield (mix of raw and typed events)
Parameters:
-
message_snapshot(Anthropic::Models::Message) -- current accumulated message state -
event(Anthropic::Models::RawMessageStreamEvent) -- the raw event to process
Other tags:
- Api: - private
def build_events(event:, message_snapshot:) _to_yield = [] vent hropic::Models::RawMessageStopEvent | Anthropic::Models::BetaRawMessageStopEvent ts_to_yield << MessageStopEvent.new( pe: :message_stop, ssage: message_snapshot hropic::Models::RawContentBlockDeltaEvent | Anthropic::Models::BetaRawContentBlockDeltaEvent ts_to_yield << event ent_block = message_snapshot.content[event.index] (delta = event.delta) nthropic::Models::TextDelta | Anthropic::Models::BetaTextDelta if content_block.type == :text ents_to_yield << Anthropic::Streaming::TextEvent.new( type: :text, text: delta.text, snapshot: content_block.text nthropic::Models::InputJSONDelta | Anthropic::Models::BetaInputJSONDelta if content_block.type == :tool_use ents_to_yield << Anthropic::Streaming::InputJsonEvent.new( type: :input_json, partial_json: delta.partial_json, snapshot: content_block.input nthropic::Models::CitationsDelta | Anthropic::Models::BetaCitationsDelta if content_block.type == :text ents_to_yield << Anthropic::Streaming::CitationEvent.new( type: :citation, citation: delta.citation, snapshot: content_block.citations || [] nthropic::Models::ThinkingDelta | Anthropic::Models::BetaThinkingDelta if content_block.type == :thinking ents_to_yield << Anthropic::Streaming::ThinkingEvent.new( type: :thinking, thinking: delta.thinking, snapshot: content_block.thinking nthropic::Models::SignatureDelta | Anthropic::Models::BetaSignatureDelta if content_block.type == :thinking ents_to_yield << Anthropic::Streaming::SignatureEvent.new( type: :signature, signature: content_block.signature nthropic::Models::BetaCompactionContentBlockDelta if content_block.type == :compaction ents_to_yield << Anthropic::Streaming::CompactionEvent.new( type: :compaction, content: content_block.content hropic::Models::RawContentBlockStopEvent | Anthropic::Models::BetaRawContentBlockStopEvent ent_block = message_snapshot.content[event.index] ts_to_yield << Anthropic::Streaming::ContentBlockStopEvent.new( pe: :content_block_stop, dex: event.index, ntent_block: content_block ts_to_yield << event _to_yield
def initialize(raw_stream:, tools: {}, models: {})
-
models(Hash{String=>Class}) -- Mapping of tool names to model classes -
tools(Hash{String=>Class}) -- Mapping of tool names to model classes -
raw_stream(Anthropic::Internal::Type::BaseStream) --
Other tags:
- Api: - private
def initialize(raw_stream:, tools: {}, models: {}) # The underlying Server-Sent Event stream from the Anthropic API. @raw_stream = raw_stream # Accumulated message state that builds up as events are processed. @accumated_message_snapshot = nil # Mapping of tool names to model classes for parsing. @tools = tools @models = models # Lazy enumerable that transforms raw events into consumable events. @iterator = iterator @status = raw_stream.status @headers = raw_stream.headers @model = raw_stream.instance_variable_get(:@model) end
def iterator
-
(Enumerable->)
Other tags:
- Api: - private
def iterator tor ||= Anthropic::Internal::Util.chain_fused(@stream) do |y| _stream.each do |raw_event| ccumated_message_snapshot = accumulate_event( event: raw_event, current_snapshot: @accumated_message_snapshot ents_to_yield = build_events(event: raw_event, message_snapshot: @accumated_message_snapshot) ents_to_yield.each(&y)
def parse_content_blocks!(message)
-
(Anthropic::Models::Message)- The message with parsed content
Parameters:
-
message(Anthropic::Models::Message) -- The message to parse
Other tags:
- Api: - private
def parse_content_blocks!(message) message unless message&.content cop:disable Metrics/BlockLength e.content.each_with_index do |content, index| content.type :tool_use xt unless (tool = @tools[content.name]) rsed = begin parsed_input = if content.input.is_a?(String) JSON.parse(content.input, symbolize_names: true) else content.input end Anthropic::Internal::Type::Converter.coerce(tool, parsed_input) rescue StandardError => e e end s = case content in Anthropic::ContentBlock Anthropic::Models::ToolUseBlock else Anthropic::Models::BetaToolUseBlock end ssage.content[index] = cls.new( id: content.id, input: content.input, name: content.name, type: content.type, parsed: parsed :text xt unless (model = @models.first&.last) rsed = begin json = JSON.parse(content.text, symbolize_names: true) Anthropic::Internal::Type::Converter.coerce(model, json) rescue StandardError => e {error: e.message} end s = case content in Anthropic::ContentBlock Anthropic::Models::TextBlock else Anthropic::Models::BetaTextBlock end ssage.content[index] = cls.new( citations: content.citations, text: content.text, type: content.type, parsed: parsed cop:enable Metrics/BlockLength e
def text
-
(Enumerable-)
Other tags:
- Api: - public
def text Anthropic::Internal::Util.chain_fused(@iterator) do |y| @iterator.each do |event| if event.type == :content_block_delta && event.delta.type == :text_delta y << event.delta.text end end end end
def until_done = each {} # rubocop:disable Lint/EmptyBlock
-
(void)-
Other tags:
- Api: - public
def until_done = each {} # rubocop:disable Lint/EmptyBlock