class Multiwoven::Integrations::Source::WatsonxAi::Client

def check_connection(connection_config)

def check_connection(connection_config)
  get_access_token(connection_config[:api_key])
  url = format(
    WATSONX_HEALTH_DEPLOYMENT_URL,
    region: connection_config[:region],
    version: API_VERSION
  )
  response = send_request(
    url: url,
    http_method: HTTP_GET,
    payload: {},
    headers: auth_headers(@access_token),
    config: connection_config[:config]
  )
  evaluate_deployment_status(response, connection_config[:deployment_id])
rescue StandardError => e
  handle_exception(e, { context: "WATSONX AI:CHECK_CONNECTION:EXCEPTION", type: "error" })
  failure_status(e)
end

def discover(_connection_config)

def discover(_connection_config)
  catalog_json = read_json(CATALOG_SPEC_PATH)
  catalog = build_catalog(catalog_json)
  catalog.to_multiwoven_message
rescue StandardError => e
  handle_exception(e, { context: "WATSONX AI:DISCOVER:EXCEPTION", type: "error" })
end

def evaluate_deployment_status(response, deployment_id)

def evaluate_deployment_status(response, deployment_id)
  response_body = JSON.parse(response.body)
  deployment_status = response_body["resources"]&.find { |res| res.dig("metadata", "id") == deployment_id }
  return failure_status unless deployment_status
  deployment_status.dig("entity", "status", "state") == "ready" ? success_status : failure_status
end

def extract_data_entries(chunk)

def extract_data_entries(chunk)
  chunk.split(/^data: /).map(&:strip).reject(&:empty?)
end

def format_data(response_body)

def format_data(response_body)
  messages = response_body.split("\n\n")
  messages.map do |message|
    match = message.match(/data:\s*(\{.*\})/)
    match ? JSON.parse(match[1]) : nil
  end.compact
end

def get_access_token(api_key)

def get_access_token(api_key)
  cache = defined?(Rails) && Rails.respond_to?(:cache) ? Rails.cache : ActiveSupport::Cache::MemoryStore.new
  cache_key = "watsonx_ai_#{api_key}"
  cached_token = cache.read(cache_key)
  if cached_token
    @access_token = cached_token
  else
    new_token = get_iam_token(api_key)
    # max expiration is 3 minutes. No way to make it higher
    cache.write(cache_key, new_token, expires_in: 180)
    @access_token = new_token
  end
end

def get_iam_token(api_key)

def get_iam_token(api_key)
  uri = URI("https://iam.cloud.ibm.com/identity/token")
  request = Net::HTTP::Post.new(uri)
  request["Content-Type"] = "application/x-www-form-urlencoded"
  request.body = "grant_type=urn:ibm:params:oauth:grant-type:apikey&apikey=#{api_key}"
  response = Net::HTTP.start(uri.hostname, uri.port, use_ssl: true) do |http|
    http.request(request)
  end
  raise "Failed to get IAM token: #{response.body}" unless response.is_a?(Net::HTTPSuccess)
  JSON.parse(response.body)["access_token"]
end

def parse_json(json_string)

def parse_json(json_string)
  JSON.parse(json_string)
rescue JSON::ParserError => e
  handle_exception(e, { context: "OPEN AI:PARSE_JSON:EXCEPTION", type: "error" })
  {}
end

def prepare_config_and_payload(sync_config)

def prepare_config_and_payload(sync_config)
  config = sync_config.source.connection_specification
  connection_config = config.with_indifferent_access.tap do |conf|
    conf[:config][:timeout] ||= 30
    conf[:is_stream] ||= false
  end
  payload = sync_config.model.query
  [connection_config, payload]
end

def process_model_request(connection_config, payload)

def process_model_request(connection_config, payload)
  if connection_config[:is_stream] && connection_config[:model_type] == "Prompt template"
    run_model_stream(connection_config, payload) { |message| yield message if block_given? }
  else
    run_model(connection_config, payload)
  end
end

def process_response(response)

def process_response(response)
  if success?(response)
    if response.body.start_with?("{") || response.body.start_with?("[")
      data = JSON.parse(response.body)
      [RecordMessage.new(data: data, emitted_at: Time.now.to_i).to_multiwoven_message]
    else
      data = format_data(response.body)
      RecordMessage.new(data: { responses: data }, emitted_at: Time.now.to_i).to_multiwoven_message
    end
  else
    create_log_message("WATSONX AI:RUN_MODEL", "error", "request failed: #{response.body}")
  end
rescue StandardError => e
  handle_exception(e, { context: "WATSONX AI:PROCESS_RESPONSE:EXCEPTION", type: "error" })
end

def process_streaming_response(chunk)

def process_streaming_response(chunk)
  data_entries = extract_data_entries(chunk)
  data_entries.each do |entry|
    data, = entry.split("\n", 2)
    next if data == "id: 1"
    data = parse_json(data)
    raise StandardError, "Error: #{data["errors"][0]["message"]}" if data["errors"] && data["errors"][0]["message"]
    next if data["results"][0]["stop_reason"] != "not_finished"
    yield [RecordMessage.new(data: data, emitted_at: Time.now.to_i).to_multiwoven_message] if block_given?
  end
end

def read(sync_config)

def read(sync_config)
  connection_config, payload = prepare_config_and_payload(sync_config)
  process_model_request(connection_config, payload) { |message| yield message if block_given? }
rescue StandardError => e
  handle_exception(e, { context: "WATSONX AI:READ:EXCEPTION", type: "error" })
end

def run_model(connection_config, payload)

def run_model(connection_config, payload)
  get_access_token(connection_config[:api_key])
  url = format(
    connection_config[:model_type] == "Machine learning model" ? WATSONX_PREDICTION_DEPLOYMENT_URL : WATSONX_GENERATION_DEPLOYMENT_URL,
    region: connection_config[:region],
    deployment_id: connection_config[:deployment_id],
    version: API_VERSION
  )
  response = send_request(
    url: url,
    http_method: HTTP_POST,
    payload: JSON.parse(payload),
    headers: auth_headers(@access_token),
    config: connection_config[:config]
  )
  process_response(response)
rescue StandardError => e
  handle_exception(e, { context: "WATSONX AI:RUN_MODEL:EXCEPTION", type: "error" })
end

def run_model_stream(connection_config, payload)

def run_model_stream(connection_config, payload)
  get_access_token(connection_config[:api_key])
  url = format(
    WATSONX_STREAM_DEPLOYMENT_URL,
    region: connection_config[:region],
    deployment_id: connection_config[:deployment_id],
    version: API_VERSION
  )
  send_streaming_request(
    url: url,
    http_method: HTTP_POST,
    payload: JSON.parse(payload),
    headers: auth_headers(@access_token),
    config: connection_config[:config]
  ) do |chunk|
    process_streaming_response(chunk) { |message| yield message if block_given? }
  end
rescue StandardError => e
  handle_exception(e, { context: "WATSONX AI:RUN_STREAM_MODEL:EXCEPTION", type: "error" })
end