class Attio::Util::WebhookSignature

Verifies webhook signatures from Attio to ensure authenticity

def calculate_signature(payload, timestamp, secret)

Calculate signature for a payload
def calculate_signature(payload, timestamp, secret)
  # Ensure payload is a string
  payload_string = payload.is_a?(String) ? payload : JSON.generate(payload)
  # Create the signed payload
  signed_payload = "#{timestamp}.#{payload_string}"
  # Calculate HMAC
  hmac = OpenSSL::HMAC.hexdigest("SHA256", secret, signed_payload)
  # Return in the format Attio uses
  "v1=#{hmac}"
end

def extract_from_headers(headers)

Extract signature from headers
def extract_from_headers(headers)
  signature = headers[SIGNATURE_HEADER] || headers[SIGNATURE_HEADER.upcase] || headers[SIGNATURE_HEADER.tr("-", "_").upcase]
  timestamp = headers[TIMESTAMP_HEADER] || headers[TIMESTAMP_HEADER.upcase] || headers[TIMESTAMP_HEADER.tr("-", "_").upcase]
  raise SignatureVerificationError, "Missing signature header: #{SIGNATURE_HEADER}" unless signature
  raise SignatureVerificationError, "Missing timestamp header: #{TIMESTAMP_HEADER}" unless timestamp
  {
    signature: signature,
    timestamp: timestamp
  }
end

def secure_compare(a, b)

def secure_compare(a, b)
  return false unless a.bytesize == b.bytesize
  # Use constant-time comparison
  res = 0
  a.bytes.zip(b.bytes) { |x, y| res |= x ^ y }
  res == 0
end

def validate_inputs!(payload, signature, timestamp, secret)

def validate_inputs!(payload, signature, timestamp, secret)
  raise ArgumentError, "Payload cannot be nil" if payload.nil?
  raise ArgumentError, "Signature cannot be nil or empty" if signature.nil? || signature.empty?
  raise ArgumentError, "Timestamp cannot be nil or empty" if timestamp.nil? || timestamp.to_s.empty?
  raise ArgumentError, "Secret cannot be nil or empty" if secret.nil? || secret.empty?
end

def verify(payload:, signature:, timestamp:, secret:, tolerance: TOLERANCE_SECONDS)

Verify webhook signature (returns boolean)
def verify(payload:, signature:, timestamp:, secret:, tolerance: TOLERANCE_SECONDS)
  verify!(payload: payload, signature: signature, timestamp: timestamp, secret: secret, tolerance: tolerance)
  true
rescue SignatureVerificationError
  false
end

def verify!(payload:, signature:, timestamp:, secret:, tolerance: TOLERANCE_SECONDS)

Verify webhook signature (raises exception on failure)
def verify!(payload:, signature:, timestamp:, secret:, tolerance: TOLERANCE_SECONDS)
  validate_inputs!(payload, signature, timestamp, secret)
  # Check timestamp to prevent replay attacks
  verify_timestamp!(timestamp, tolerance)
  # Calculate expected signature
  expected_signature = calculate_signature(payload, timestamp, secret)
  # Constant-time comparison to prevent timing attacks
  raise SignatureVerificationError, "Invalid signature" unless secure_compare(signature, expected_signature)
rescue => e
  raise SignatureVerificationError, "Webhook signature verification failed: #{e.message}"
end

def verify_timestamp!(timestamp, tolerance)

def verify_timestamp!(timestamp, tolerance)
  timestamp_int = timestamp.to_i
  current_time = Time.now.to_i
  if timestamp_int < (current_time - tolerance)
    raise SignatureVerificationError, "Timestamp too old"
  end
  if timestamp_int > (current_time + tolerance)
    raise SignatureVerificationError, "Timestamp too far in the future"
  end
end