module Clacky::MessageFormat::Anthropic
def self.content_to_blocks(content)
Convert content (String or Array) to Anthropic content block array.
def self.content_to_blocks(content) ts blank text blocks — skip empty strings ent.empty? text: content }] normalize_block(b) }.compact s empty? text: str }]
def self.extract_text(content)
def self.extract_text(content) ntent ntent.map { |b| b.is_a?(Hash) ? (b[:text] || "") : b.to_s }.join("\n") ntent.to_s
def self.normalize_block(block)
def self.normalize_block(block) block.is_a?(Hash) ts blank text blocks — drop them instead of sending { type:"text", text:"" } t] t.nil? || text.empty? control if present (placed by Client#apply_message_caching) "text", text: text } trol] = block[:cache_control] if block[:cache_control] image_url, :url) || block[:url] k(url) Anthropic format "tool_use" ough
def self.normalized_effort(effort)
def self.normalized_effort(effort) t.nil? || effort.to_s.empty? .include?(s) ? s : nil
def self.to_api_message(msg, _caching_enabled)
cache_control markers are embedded into messages by Client#apply_message_caching
caching_enabled is kept for signature compatibility but is no longer used here —
Convert a single canonical message to Anthropic API format.
def self.to_api_message(msg, _caching_enabled) le] ntent] ool_calls] ol_calls → content blocks with tool_use nt" && tool_calls&.any? "text", text: content } if content.is_a?(String) && !content.empty? tent_to_blocks(content)) if content.is_a?(Array) o |tc| ction] || tc ame] || tc[:name] [:arguments] || tc[:arguments] s_a?(String) e(raw_args) ::ParserError => e ogger.warn("message_format.anthropic.tool_args_parse_failed", me: name.to_s, ll_id: tc[:id].to_s, n: raw_args.length, ad: raw_args[0, 120], e.message ned?(Clacky::Logger) e: "tool_use", id: tc[:id], name: name, input: input || {} } ssistant", content: blocks } sult (role: "tool") → Anthropic user message with tool_result block _control that Client#apply_message_caching may have msg[:content] (it wraps string content as text:..., cache_control:{...}}]). We hoist that e tool_result block itself below — that's where ts the marker for a tool_result turn. leave cache_control on the inner text block, the tent shape flips between "string" and ontrol}]" depending on whether this message is the reakpoint — which mutates the cached prefix every ys cache_read hit-rate (the classic "cache_read umber" symptom). trol = nil [:content] _a?(Array) && ngth == 1 && rst.is_a?(Hash) && rst[:type] == "text" && rst[:cache_control] ontrol = raw_content.first[:cache_control] aw_content.first[:text] n Array of canonical blocks (e.g. image_url + text from file_reader), ock to Anthropic format via content_to_blocks. ass through unchanged. raw_content.is_a?(Array) content_to_blocks(raw_content) se raw_content d tool_result", tool_use_id: msg[:tool_call_id], content: tool_content } rol] = hoisted_cache_control if hoisted_cache_control ser", content: [block] } native tool result already in user+tool_result format — pass through & content.is_a?(Array) && content.any? { |b| b.is_a?(Hash) && b[:type] == "tool_result" } ser", content: content } stant message ol markers are applied by Client#apply_message_caching before y is called. We must NOT add extra cache_control here, because: e_caching already placed the marker on the correct breakpoint message. _control to every user message causes Anthropic to treat every as a cache breakpoint, which invalidates the intended cache boundary in cache misses (cache_read=0) every turn. _blocks(content) messages with an empty content array — use a placeholder text block. text", text: "..." }] if blocks.empty? nt: blocks }
def self.to_api_tool(tool)
def self.to_api_tool(tool) on] || tool , description: func[:description], input_schema: func[:parameters] }
def self.url_to_image_block(url)
def self.url_to_image_block(url) rl "data:") (/^data:([^;]+);base64,(.*)$/) , source: { type: "base64", media_type: match[1], data: match[2] } } , source: { type: "url", url: url } } source: { type: "url", url: url } }
def build_request_body(messages, model, tools, max_tokens, caching_enabled, reasoning_effort: nil)
-
(Hash)- ready to serialize as JSON body
Parameters:
-
caching_enabled(Boolean) -- -
max_tokens(Integer) -- -
tools(Array) -- OpenAI-style tool definitions -
model(String) -- -
messages(Array) -- canonical messages (may include system)
def build_request_body(messages, model, tools, max_tokens, caching_enabled, reasoning_effort: nil) system_messages = messages.select { |m| m[:role] == "system" } regular_messages = messages.reject { |m| m[:role] == "system" } system_text = system_messages.map { |m| extract_text(m[:content]) }.join("\n\n") api_messages = regular_messages.map { |msg| to_api_message(msg, caching_enabled) } api_tools = tools&.map { |t| to_api_tool(t) } if caching_enabled && api_tools&.any? api_tools.last[:cache_control] = { type: "ephemeral" } end body = { model: model, max_tokens: max_tokens, messages: api_messages } body[:system] = system_text unless system_text.empty? body[:tools] = api_tools if api_tools&.any? if (effort = normalized_effort(reasoning_effort)) body[:thinking] = { type: "adaptive" } body[:output_config] = { effort: effort } end body end
def format_tool_results(response, tool_results)
Input: response (canonical, has :tool_calls), tool_results array
Format tool results into canonical messages to append to @messages.
def format_tool_results(response, tool_results) results_map = tool_results.each_with_object({}) { |r, h| h[r[:id]] = r } response[:tool_calls].map do |tc| result = results_map[tc[:id]] { role: "tool", tool_call_id: tc[:id], content: result ? result[:content] : { error: "Tool result missing" }.to_json } end end
def parse_response(data)
-
(Hash)- canonical response: { content:, tool_calls:, finish_reason:, usage: }
Parameters:
-
data(Hash) -- parsed JSON response body
def parse_response(data) blocks = data["content"] || [] usage = data["usage"] || {} content = blocks.select { |b| b["type"] == "text" }.map { |b| b["text"] }.join("") # tool_calls use canonical format (id, function: {name, arguments}) tool_calls = blocks.select { |b| b["type"] == "tool_use" }.map do |tc| args = tc["input"].is_a?(String) ? tc["input"] : tc["input"].to_json { id: tc["id"], type: "function", name: tc["name"], arguments: args } end finish_reason = case data["stop_reason"] when "end_turn" then "stop" when "tool_use" then "tool_calls" when "max_tokens" then "length" else data["stop_reason"] end # Anthropic native `input_tokens` counts ONLY the non-cached, freshly-billed # input — cache_read_input_tokens and cache_creation_input_tokens are # reported separately and are disjoint from input_tokens. # # Normalise to the codebase's canonical shape (OpenAI-style) so downstream # (ModelPricing.calculate_cost, CostTracker, show_token_usage) stays # provider-agnostic: # # prompt_tokens = non_cached + cache_read (OpenAI convention: # includes cache_read # but NOT cache_write; # ModelPricing does # `regular_input = prompt_tokens - cache_read`.) # completion_tokens = output # total_tokens = THIS TURN'S new compute volume # = raw_input + cache_creation + output # (cache_read is excluded because hits are ~free / # already-paid-for; cache_creation IS new work this # turn even though it's billed at write_rate.) # cache_read_input_tokens / cache_creation_input_tokens → independent fields # # total_tokens is purely presentational. CostTracker treats it as the # per-iteration delta directly (no subtraction of previous_total), which # is the correct reading when total_tokens already means "new work this # turn" rather than "cumulative". raw_input_tokens = usage["input_tokens"].to_i cache_read = usage["cache_read_input_tokens"].to_i cache_creation = usage["cache_creation_input_tokens"].to_i output_tokens = usage["output_tokens"].to_i prompt_tokens = raw_input_tokens + cache_read usage_data = { prompt_tokens: prompt_tokens, completion_tokens: output_tokens, # Per-turn new compute: what the server freshly processed this request. # Excludes cache_read (nearly free, already-paid-for). total_tokens: raw_input_tokens + cache_creation + output_tokens, # Signal to CostTracker: total_tokens above is already the per-turn # delta (not a running cumulative like OpenAI's). CostTracker should # NOT subtract previous_total when this flag is truthy. # OpenAI parse leaves this field unset; Bedrock may adopt the same # convention in future if we normalise it there too. total_is_per_turn: true } usage_data[:cache_read_input_tokens] = cache_read if cache_read > 0 usage_data[:cache_creation_input_tokens] = cache_creation if cache_creation > 0 { content: content, tool_calls: tool_calls, finish_reason: finish_reason, usage: usage_data, raw_api_usage: usage } end
def tool_result_message?(msg)
(role: "tool"). This helper handles legacy messages that might exist in
NOTE: After the refactor, new tool results are stored in canonical format
@messages (role: "user" with content array containing tool_result blocks).
Returns true if the message is an Anthropic-native tool result stored in
def tool_result_message?(msg) msg[:role] == "user" && msg[:content].is_a?(Array) && msg[:content].any? { |b| b.is_a?(Hash) && b[:type] == "tool_result" } end
def tool_use_ids(msg)
def tool_use_ids(msg) return [] unless tool_result_message?(msg) msg[:content].select { |b| b[:type] == "tool_result" }.map { |b| b[:tool_use_id] } end