class Attio::OAuth::Client
OAuth client for handling the OAuth 2.0 authorization flow
def authorization_url(scopes: DEFAULT_SCOPES, state: nil, extras: {})
def authorization_url(scopes: DEFAULT_SCOPES, state: nil, extras: {}) state ||= generate_state scopes = validate_scopes(scopes) params = { client_id: client_id, redirect_uri: redirect_uri, response_type: "code", scope: scopes.join(" "), state: state }.merge(extras) uri = URI.parse(OAUTH_BASE_URL) uri.query = URI.encode_www_form(params) { url: uri.to_s, state: state } end
def create_oauth_connection
def create_oauth_connection Faraday.new(url: "https://app.attio.com") do |faraday| faraday.response :json, parser_options: {symbolize_names: true} faraday.adapter Faraday.default_adapter end end
def exchange_code_for_token(code:, state: nil)
def exchange_code_for_token(code:, state: nil) raise ArgumentError, "Authorization code is required" if code.nil? || code.empty? params = { grant_type: "authorization_code", code: code, redirect_uri: redirect_uri, client_id: client_id, client_secret: client_secret } response = make_token_request(params) Token.new(response.merge(client: self)) end
def generate_state
def generate_state SecureRandom.urlsafe_base64(32) end
def handle_oauth_error(response)
def handle_oauth_error(response) error_body = begin response.body rescue {} end error_message = if error_body.is_a?(Hash) error_body[:error_description] || error_body[:error] || "OAuth error" else "OAuth error" end case response.status when 400 raise BadRequestError, error_message when 401 raise AuthenticationError, error_message when 403 raise ForbiddenError, error_message when 404 raise NotFoundError, error_message else raise Error, "OAuth error: #{error_message} (status: #{response.status})" end end
def initialize(client_id:, client_secret:, redirect_uri:)
def initialize(client_id:, client_secret:, redirect_uri:) @client_id = client_id @client_secret = client_secret @redirect_uri = redirect_uri validate_config! end
def introspect_token(token)
def introspect_token(token) token_value = token.is_a?(Token) ? token.access_token : token params = { token: token_value, client_id: client_id, client_secret: client_secret } # Use Faraday directly for OAuth endpoints conn = create_oauth_connection response = conn.post("/oauth/introspect") do |req| req.headers["Content-Type"] = "application/x-www-form-urlencoded" req.body = URI.encode_www_form(params) end if response.success? response.body else handle_oauth_error(response) end end
def make_token_request(params)
def make_token_request(params) conn = Faraday.new do |faraday| faraday.request :url_encoded faraday.response :json, parser_options: {symbolize_names: true} faraday.adapter Faraday.default_adapter end response = conn.post(TOKEN_URL, params) do |req| req.headers["Accept"] = "application/json" end if response.success? response.body else handle_oauth_error(response) end end
def refresh_token(refresh_token)
def refresh_token(refresh_token) raise ArgumentError, "Refresh token is required" if refresh_token.nil? || refresh_token.empty? params = { grant_type: "refresh_token", refresh_token: refresh_token, client_id: client_id, client_secret: client_secret } response = make_token_request(params) Token.new(response.merge(client: self)) end
def revoke_token(token)
def revoke_token(token) token_value = token.is_a?(Token) ? token.access_token : token params = { token: token_value, client_id: client_id, client_secret: client_secret } # Use Faraday directly for OAuth endpoints conn = create_oauth_connection response = conn.post("/oauth/revoke") do |req| req.headers["Content-Type"] = "application/x-www-form-urlencoded" req.body = URI.encode_www_form(params) end response.success? rescue => e # Log the error if debug mode is enabled warn "OAuth token revocation failed: #{e.message}" if Attio.configuration.debug false end
def validate_config!
def validate_config! raise ArgumentError, "client_id is required" if client_id.nil? || client_id.empty? raise ArgumentError, "client_secret is required" if client_secret.nil? || client_secret.empty? raise ArgumentError, "redirect_uri is required" if redirect_uri.nil? || redirect_uri.empty? unless redirect_uri.start_with?("http://", "https://") raise ArgumentError, "redirect_uri must be a valid HTTP(S) URL" end end
def validate_scopes(scopes)
def validate_scopes(scopes) scopes = Array(scopes).map(&:to_s) return DEFAULT_SCOPES if scopes.empty? invalid_scopes = scopes - ScopeValidator::VALID_SCOPES unless invalid_scopes.empty? raise ArgumentError, "Invalid scopes: #{invalid_scopes.join(", ")}" end scopes end