# frozen_string_literal: truemoduleMultiwoven::Integrations::SourcemoduleAnthropicincludeMultiwoven::Integrations::CoreclassClient<SourceConnectorAPI_VERSION="2023-06-01"defcheck_connection(connection_config)connection_config=prepare_config(connection_config)response=make_request(ANTHROPIC_URL,HTTP_POST,connection_config[:request_format],connection_config)success?(response)?success_status:failure_status(nil)rescueStandardError=>ehandle_exception(e,{context: "ANTHROPIC:CHECK_CONNECTION:EXCEPTION",type: "error"})failure_status(e)enddefdiscover(_connection_config=nil)catalog_json=read_json(CATALOG_SPEC_PATH)catalog=build_catalog(catalog_json)catalog.to_multiwoven_messagerescueStandardError=>ehandle_exception(e,{context: "ANTHROPIC:DISCOVER:EXCEPTION",type: "error"})enddefread(sync_config)# The server checks the ConnectorQueryType.# If it's "ai_ml," the server calculates the payload and passes it as a query in the sync config model protocol.# This query is then sent to the AI/ML model.connection_config=prepare_config(sync_config.source.connection_specification)stream=connection_config[:is_stream]||=falsepayload=sync_config.model.queryifstreamrun_model_stream(connection_config,payload){|message|yieldmessageifblock_given?}elserun_model(connection_config,payload)endrescueStandardError=>ehandle_exception(e,{context: "ANTHROPIC:READ:EXCEPTION",type: "error"})endprivatedefprepare_config(config)config.with_indifferent_access.tapdo|conf|conf[:config][:timeout]||=30endenddefparse_json(json_string)JSON.parse(json_string)rescueJSON::ParserError=>ehandle_exception(e,{context: "ANTHROPIC:PARSE_JSON:EXCEPTION",type: "error"}){}enddefbuild_headers(connection_config,streaming: false){"x-api-key"=>connection_config[:api_key],"anthropic-version"=>API_VERSION,"content-type"=>"application/json"}.tapdo|headers|headers["transfer-encoding"]="chunked"ifstreamingendenddefmake_request(url,http_method,payload,connection_config)send_request(url: url,http_method: http_method,payload: JSON.parse(payload),headers: build_headers(connection_config,streaming: false),config: connection_config[:config])enddefrun_model(connection_config,payload)response=make_request(ANTHROPIC_URL,HTTP_POST,payload,connection_config)process_response(response)rescueStandardError=>ehandle_exception(e,{context: "ANTHROPIC:RUN_MODEL:EXCEPTION",type: "error"})enddefrun_model_stream(connection_config,payload)send_streaming_request(url: ANTHROPIC_URL,http_method: HTTP_POST,payload: JSON.parse(payload),headers: build_headers(connection_config,streaming: true),config: connection_config[:config])do|chunk|process_streaming_response(chunk){|message|yieldmessageifblock_given?}endrescueStandardError=>ehandle_exception(e,{context: "ANTHROPIC:RUN_STREAM_MODEL:EXCEPTION",type: "error"})enddefprocess_response(response)ifsuccess?(response)data=JSON.parse(response.body)[RecordMessage.new(data: data,emitted_at: Time.now.to_i).to_multiwoven_message]elsecreate_log_message("ANTHROPIC:RUN_MODEL","error","request failed: #{response.body}")endrescueStandardError=>ehandle_exception(e,{context: "ANTHROPIC:PROCESS_RESPONSE:EXCEPTION",type: "error"})enddefcheck_chunk_error(chunk)returnunlesschunk.include?("{\"type\":\"error\"")data=JSON.parse(chunk)raiseStandardError,"Error: #{data["error"]}"ifdata["error"]&&data["error"]["message"]enddefextract_content_event(chunk)events=chunk.split("\n\n")events.find{|e|e.include?("event: content_block_delta")}enddefprocess_streaming_response(chunk)check_chunk_error(chunk)chunk.each_linedo|event|nextunlessevent.include?("\"type\":\"content_block_delta\"")json_string=event.split("\n").find{|line|line.start_with?("data: ")}&.sub(/^data: /,"")nextunlessjson_stringparsed_data=JSON.parse(json_string)yield[RecordMessage.new(data: parsed_data,emitted_at: Time.now.to_i).to_multiwoven_message]ifblock_given?endendendendend