"""
Spam prevention utilities for endorsement forms
"""
import ipaddress
import logging
import re
from datetime import datetime
from typing import Any
from django.conf import settings
from django.http import HttpRequest
from django.utils import timezone
try:
import akismet
except ImportError:
try:
from email_validator import EmailNotValidError, validate_email
except ImportError:
EmailNotValidError = Exception
from django_ratelimit.core import is_ratelimited
[docs]
logger = logging.getLogger(__name__)
[docs]
def secure_ip_key(group: str, request: HttpRequest) -> str:
"""
Custom rate limit key function that uses our secure IP extraction.
This prevents IP spoofing attacks by using get_client_ip() which validates
IP addresses and handles proxy headers safely.
"""
return get_client_ip(request)
[docs]
def get_client_ip(request: HttpRequest) -> str:
"""
Securely extract client IP address with validation and spoofing protection.
Validates IP addresses and handles proxy headers safely to prevent
rate limit bypass and log pollution attacks.
"""
def is_valid_ip(ip_str: str) -> bool:
"""Validate if string is a valid IP address."""
try:
ipaddress.ip_address(ip_str.strip())
return True
except (ValueError, ipaddress.AddressValueError):
return False
def is_private_ip(ip_str: str) -> bool:
"""Check if IP is in private/internal range."""
try:
ip = ipaddress.ip_address(ip_str.strip())
return ip.is_private or ip.is_loopback or ip.is_link_local
except (ValueError, ipaddress.AddressValueError):
return True # Treat invalid IPs as private for safety
# Get the direct connection IP (always trustworthy)
remote_addr = request.META.get("REMOTE_ADDR", "")
# If no proxy headers or direct connection from internet, use REMOTE_ADDR
forwarded_for = request.META.get("HTTP_X_FORWARDED_FOR", "").strip()
if not forwarded_for:
return remote_addr if is_valid_ip(remote_addr) else "127.0.0.1"
# Parse X-Forwarded-For header (format: client, proxy1, proxy2, ...)
forwarded_ips = [ip.strip() for ip in forwarded_for.split(",")]
# Only trust proxy headers if the direct connection is from a private IP
# This prevents arbitrary header spoofing from internet clients
remote_is_private = is_private_ip(remote_addr)
if not remote_is_private:
# Direct internet connection - ignore potentially spoofed headers
return remote_addr if is_valid_ip(remote_addr) else "127.0.0.1"
# Connection is from private IP (reverse proxy/load balancer)
# Find the first valid public IP in the chain
for ip in forwarded_ips:
if is_valid_ip(ip) and not is_private_ip(ip):
return ip
# No valid public IP found, fall back to REMOTE_ADDR
return remote_addr if is_valid_ip(remote_addr) else "127.0.0.1"
[docs]
class SpamPreventionService:
"""Service for preventing spam in endorsement submissions"""
# Rate limiting settings
[docs]
RATE_LIMIT_WINDOW = getattr(
settings,
"ENDORSEMENT_RATE_LIMIT_WINDOW",
300,
) # 5 minutes
[docs]
RATE_LIMIT_MAX_ATTEMPTS = getattr(
settings,
"ENDORSEMENT_RATE_LIMIT_MAX_ATTEMPTS",
3,
)
# Known disposable email domains (fallback when email-validator unavailable)
# email-validator handles most disposable domains automatically
[docs]
SUSPICIOUS_DOMAINS = [
"mailinator.com",
"10minutemail.com",
"guerrillamail.com",
"temp-mail.org",
"throwaway.email",
]
@classmethod
[docs]
def check_rate_limit(
cls,
request: HttpRequest,
) -> dict[str, Any]:
"""
Check if request has exceeded rate limit using django-ratelimit
Returns dict with 'allowed' boolean and 'remaining' count
"""
rate = f"{cls.RATE_LIMIT_MAX_ATTEMPTS}/{cls.RATE_LIMIT_WINDOW}s"
ratelimited = is_ratelimited(
request=request,
group="endorsement_submission",
key=secure_ip_key,
rate=rate,
increment=False, # Just check, don't increment yet
)
if ratelimited:
return {
"allowed": False,
"remaining": 0,
"reset_in": cls.RATE_LIMIT_WINDOW,
"message": (
f"Rate limit exceeded. Try again in "
f"{cls.RATE_LIMIT_WINDOW // 60} minutes."
),
}
else:
return {
"allowed": True,
"remaining": cls.RATE_LIMIT_MAX_ATTEMPTS,
"reset_in": cls.RATE_LIMIT_WINDOW,
}
@classmethod
[docs]
def record_submission_attempt(
cls,
request: HttpRequest,
) -> None:
"""Record a submission attempt using django-ratelimit"""
rate = f"{cls.RATE_LIMIT_MAX_ATTEMPTS}/{cls.RATE_LIMIT_WINDOW}s"
is_ratelimited(
request=request,
group="endorsement_submission",
key=secure_ip_key,
rate=rate,
increment=True, # Increment the counter
)
@classmethod
[docs]
def validate_honeypot(cls, form_data: dict[str, Any]) -> bool:
"""
Validate honeypot fields
Returns True if validation passes (human), False if spam detected
"""
# Check for honeypot fields that should be empty
honeypot_fields = ["website", "url", "homepage", "confirm_email"]
for field in honeypot_fields:
if form_data.get(field):
logger.warning(
f"Honeypot field '{field}' filled, potential spam detected",
)
return False
return True
@classmethod
[docs]
def validate_timing(cls, form_data: dict[str, Any]) -> bool:
"""
Validate form submission timing
Too fast = bot, too slow = potentially abandoned
"""
form_start_time = form_data.get("form_start_time")
if not form_start_time:
# If no timing data, allow submission but log it
logger.warning("No form timing data provided")
return True
try:
start_time = datetime.fromisoformat(form_start_time)
# Make timezone-aware if it's naive
if start_time.tzinfo is None:
start_time = timezone.make_aware(start_time)
submission_time = timezone.now()
time_taken = (submission_time - start_time).total_seconds()
# Too fast (likely bot)
if time_taken < 5: # Less than 5 seconds
logger.warning(f"Form submitted too quickly: {time_taken}s")
return False
# Too slow (likely abandoned/suspicious)
if time_taken > 1800: # More than 30 minutes
logger.warning(f"Form submitted after long delay: {time_taken}s")
return False
return True
except (ValueError, TypeError) as e:
logger.warning(f"Invalid form timing data: {e}")
return True # Allow if timing data is invalid
@classmethod
def _validate_with_email_validator(cls, email: str) -> list[str]:
"""
Validate email using email-validator library.
Returns list of validation failure reasons.
"""
reasons = []
try:
# Validate email with deliverability and domain checks
validated_email = validate_email(
email,
check_deliverability=True, # Check if domain has MX record
)
# Additional checks on the validated email
normalized_email = validated_email.email.lower()
# Check for test patterns in validated email
if "test" in normalized_email and "+" in normalized_email:
reasons.append("Test email pattern detected")
# Check for sequential numbers (common in spam)
if any(str(i) * 3 in normalized_email for i in range(10)):
reasons.append("Sequential number pattern in email")
except EmailNotValidError as e:
# Email is invalid according to email-validator
reasons.append(f"Invalid email address: {str(e)}")
logger.info(f"Email validation failed for {email}: {e}")
except Exception as e:
# Network or other errors - log but don't block
logger.warning(f"Email validation service error for {email}: {e}")
# Fall through to basic checks
return reasons
@classmethod
def _validate_with_basic_checks(cls, email: str) -> list[str]:
"""
Perform basic email validation when email-validator is unavailable.
Returns list of validation failure reasons.
"""
reasons = []
# Basic email format validation - must contain @ and have parts before/after
if "@" not in email:
reasons.append("Invalid email format: missing @ symbol")
elif email.count("@") != 1:
reasons.append("Invalid email format: multiple @ symbols")
else:
parts = email.split("@")
if not parts[0] or not parts[1]:
reasons.append("Invalid email format: missing local or domain part")
else:
domain = parts[1].lower()
# Check domain against known disposable email providers
if domain in cls.SUSPICIOUS_DOMAINS:
reasons.append(f"Disposable email domain: {domain}")
return reasons
@classmethod
def _check_email_patterns(cls, email: str) -> list[str]:
"""
Check email for suspicious patterns.
Returns list of pattern detection reasons.
"""
reasons = []
email_lower = email.lower()
# Basic pattern checks
if "+" in email_lower and "test" in email_lower:
reasons.append("Test email pattern detected")
# Check for sequential numbers
if any(str(i) * 3 in email_lower for i in range(10)):
reasons.append("Sequential number pattern in email")
return reasons
@classmethod
[docs]
def check_email_reputation(cls, email: str) -> dict[str, Any]:
"""
Check email address reputation using email-validator
Falls back to basic checks if email-validator is unavailable
Returns dict with 'suspicious' boolean and 'reasons' list
"""
reasons = []
# Use email-validator for comprehensive validation if available
if validate_email:
reasons.extend(cls._validate_with_email_validator(email))
# Fallback to basic domain checks if email-validator unavailable or failed
if not validate_email or not reasons:
reasons.extend(cls._validate_with_basic_checks(email))
# Always check for suspicious patterns
reasons.extend(cls._check_email_patterns(email))
return {
"suspicious": len(reasons) > 0,
"reasons": reasons,
}
@classmethod
[docs]
def check_content_quality(
cls,
stakeholder_data: dict[str, Any],
statement: str,
ip_address: str = None,
user_agent: str = None,
) -> dict[str, Any]:
"""
Check content quality for spam indicators using Akismet
Falls back to custom checks if Akismet is unavailable
Returns dict with 'suspicious' boolean and 'reasons' list
"""
reasons = []
# Try Akismet first if available and configured
akismet_key = getattr(settings, "AKISMET_SECRET_API_KEY", None)
site_url = getattr(settings, "SITE_URL", None)
if akismet and akismet_key and site_url:
try:
api = akismet.Akismet(
key=akismet_key,
blog_url=site_url,
)
# Verify API key is valid
if api.verify_key():
# Prepare comment data for Akismet
comment_data = {
"comment_type": "endorsement",
"comment_author": stakeholder_data.get("name", ""),
"comment_author_email": stakeholder_data.get("email", ""),
"comment_content": statement,
"user_ip": ip_address or "127.0.0.1",
"user_agent": user_agent or "Coalition Builder",
"blog_lang": "en",
"blog_charset": "UTF-8",
}
# Check with Akismet
is_spam = api.comment_check(**comment_data)
if is_spam:
reasons.append("Content flagged as spam by Akismet")
logger.info(
f"Akismet flagged content as spam from "
f"{stakeholder_data.get('email', 'unknown')}",
)
else:
logger.warning("Akismet API key verification failed")
except Exception as e:
logger.warning(f"Akismet check failed: {e}")
# Fallback to custom checks (always run as additional layer)
# Keep basic quality checks
name = stakeholder_data.get("name", "").lower()
org = stakeholder_data.get("organization", "").lower()
if "test" in name or "fake" in name:
reasons.append("Suspicious name pattern")
if "test" in org or "fake" in org or len(org) < 3:
reasons.append("Suspicious organization name")
# Check for excessive character repetition
if statement and re.search(r"(.)\1{3,}", statement.lower()):
reasons.append("Excessive character repetition in statement")
# Check for missing required context
if not statement and not stakeholder_data.get("role"):
reasons.append("Minimal content provided")
return {
"suspicious": len(reasons) > 0,
"reasons": reasons,
}
@classmethod
[docs]
def comprehensive_spam_check(
cls,
request: HttpRequest,
stakeholder_data: dict[str, Any],
statement: str,
form_data: dict[str, Any],
user_agent: str = None,
skip_rate_limiting: bool = False,
) -> dict[str, Any]:
"""
Run comprehensive spam check
Returns dict with overall assessment and details
"""
# Extract IP address from request with validation and spoofing protection
ip_address = get_client_ip(request)
results = {
"is_spam": False,
"confidence_score": 0.0, # 0.0 = definitely human, 1.0 = definitely spam
"reasons": [],
"rate_limit": None,
"recommendations": [],
}
# Handle rate limiting if not skipped
if not skip_rate_limiting:
# Record this attempt for rate limiting first
cls.record_submission_attempt(request)
# Check rate limiting
rate_limit_result = cls.check_rate_limit(request)
results["rate_limit"] = rate_limit_result
if not rate_limit_result["allowed"]:
results["is_spam"] = True
results["confidence_score"] = 1.0
results["reasons"].append("Rate limit exceeded")
return results
# Check honeypot
if not cls.validate_honeypot(form_data):
results["is_spam"] = True
results["confidence_score"] = 1.0
results["reasons"].append("Honeypot field filled")
return results
# Check timing
if not cls.validate_timing(form_data):
results["confidence_score"] += 0.5
results["reasons"].append("Suspicious submission timing")
# Check email reputation
email_check = cls.check_email_reputation(stakeholder_data.get("email", ""))
if email_check["suspicious"]:
results["confidence_score"] += 0.3
results["reasons"].extend(email_check["reasons"])
# Check content quality
content_check = cls.check_content_quality(
stakeholder_data,
statement,
ip_address,
user_agent,
)
if content_check["suspicious"]:
results["confidence_score"] += 0.3
results["reasons"].extend(content_check["reasons"])
# Final determination
if results["confidence_score"] >= 0.7:
results["is_spam"] = True
results["recommendations"].append("Block submission")
elif results["confidence_score"] >= 0.4:
results["recommendations"].append("Require additional verification")
elif results["confidence_score"] >= 0.2:
results["recommendations"].append("Flag for manual review")
else:
results["recommendations"].append("Allow submission")
return results