class Attio::Util::WebhookSignature
Verifies webhook signatures from Attio to ensure authenticity
def calculate_signature(payload, timestamp, secret)
def calculate_signature(payload, timestamp, secret) # Ensure payload is a string payload_string = payload.is_a?(String) ? payload : JSON.generate(payload) # Create the signed payload signed_payload = "#{timestamp}.#{payload_string}" # Calculate HMAC hmac = OpenSSL::HMAC.hexdigest("SHA256", secret, signed_payload) # Return in the format Attio uses "v1=#{hmac}" end
def extract_from_headers(headers)
def extract_from_headers(headers) signature = headers[SIGNATURE_HEADER] || headers[SIGNATURE_HEADER.upcase] || headers[SIGNATURE_HEADER.tr("-", "_").upcase] timestamp = headers[TIMESTAMP_HEADER] || headers[TIMESTAMP_HEADER.upcase] || headers[TIMESTAMP_HEADER.tr("-", "_").upcase] raise SignatureVerificationError, "Missing signature header: #{SIGNATURE_HEADER}" unless signature raise SignatureVerificationError, "Missing timestamp header: #{TIMESTAMP_HEADER}" unless timestamp { signature: signature, timestamp: timestamp } end
def secure_compare(a, b)
def secure_compare(a, b) return false unless a.bytesize == b.bytesize # Use constant-time comparison res = 0 a.bytes.zip(b.bytes) { |x, y| res |= x ^ y } res == 0 end
def validate_inputs!(payload, signature, timestamp, secret)
def validate_inputs!(payload, signature, timestamp, secret) raise ArgumentError, "Payload cannot be nil" if payload.nil? raise ArgumentError, "Signature cannot be nil or empty" if signature.nil? || signature.empty? raise ArgumentError, "Timestamp cannot be nil or empty" if timestamp.nil? || timestamp.to_s.empty? raise ArgumentError, "Secret cannot be nil or empty" if secret.nil? || secret.empty? end
def verify(payload:, signature:, timestamp:, secret:, tolerance: TOLERANCE_SECONDS)
def verify(payload:, signature:, timestamp:, secret:, tolerance: TOLERANCE_SECONDS) verify!(payload: payload, signature: signature, timestamp: timestamp, secret: secret, tolerance: tolerance) true rescue SignatureVerificationError false end
def verify!(payload:, signature:, timestamp:, secret:, tolerance: TOLERANCE_SECONDS)
def verify!(payload:, signature:, timestamp:, secret:, tolerance: TOLERANCE_SECONDS) validate_inputs!(payload, signature, timestamp, secret) # Check timestamp to prevent replay attacks verify_timestamp!(timestamp, tolerance) # Calculate expected signature expected_signature = calculate_signature(payload, timestamp, secret) # Constant-time comparison to prevent timing attacks raise SignatureVerificationError, "Invalid signature" unless secure_compare(signature, expected_signature) rescue => e raise SignatureVerificationError, "Webhook signature verification failed: #{e.message}" end
def verify_timestamp!(timestamp, tolerance)
def verify_timestamp!(timestamp, tolerance) timestamp_int = timestamp.to_i current_time = Time.now.to_i if timestamp_int < (current_time - tolerance) raise SignatureVerificationError, "Timestamp too old" end if timestamp_int > (current_time + tolerance) raise SignatureVerificationError, "Timestamp too far in the future" end end