Build a Proxy Testing Framework in Python

Build a Proxy Testing Framework in Python

A proxy testing framework goes beyond simple connectivity checks. It validates anonymity levels, detects IP leaks, measures performance under load, and verifies geographic accuracy — everything you need to ensure your proxy infrastructure performs reliably in production.

Framework Components

import asyncio
import aiohttp
import time
import json
from dataclasses import dataclass
from typing import List, Optional

@dataclass
class TestResult:
    test_name: str
    passed: bool
    details: dict
    duration_ms: float

class ProxyTestFramework:
    def __init__(self, proxy_url: str):
        self.proxy_url = proxy_url
        self.results: List[TestResult] = []

    async def run_all_tests(self):
        tests = [
            self.test_connectivity,
            self.test_latency,
            self.test_anonymity,
            self.test_geo_accuracy,
            self.test_https_support,
            self.test_concurrent_load,
        ]
        for test in tests:
            result = await test()
            self.results.append(result)
            status = "PASS" if result.passed else "FAIL"
            print(f"  [{status}] {result.test_name} ({result.duration_ms:.0f}ms)")
        return self.results

    async def test_connectivity(self) -> TestResult:
        start = time.time()
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    "https://httpbin.org/ip",
                    proxy=self.proxy_url,
                    timeout=aiohttp.ClientTimeout(total=15),
                ) as resp:
                    data = await resp.json()
                    return TestResult(
                        test_name="Connectivity",
                        passed=resp.status == 200,
                        details={"ip": data.get("origin"), "status": resp.status},
                        duration_ms=(time.time() - start) * 1000,
                    )
        except Exception as e:
            return TestResult("Connectivity", False, {"error": str(e)}, (time.time() - start) * 1000)

    async def test_latency(self, iterations=10) -> TestResult:
        start = time.time()
        latencies = []
        async with aiohttp.ClientSession() as session:
            for _ in range(iterations):
                req_start = time.time()
                try:
                    async with session.get(
                        "https://httpbin.org/ip",
                        proxy=self.proxy_url,
                        timeout=aiohttp.ClientTimeout(total=15),
                    ) as resp:
                        await resp.read()
                        latencies.append((time.time() - req_start) * 1000)
                except Exception:
                    pass

        if not latencies:
            return TestResult("Latency", False, {"error": "No successful requests"}, (time.time() - start) * 1000)

        avg = sum(latencies) / len(latencies)
        p95 = sorted(latencies)[int(len(latencies) * 0.95)]
        return TestResult(
            "Latency",
            passed=avg < 5000,
            details={"avg_ms": round(avg), "p95_ms": round(p95), "min_ms": round(min(latencies)), "max_ms": round(max(latencies))},
            duration_ms=(time.time() - start) * 1000,
        )

    async def test_anonymity(self) -> TestResult:
        start = time.time()
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    "https://httpbin.org/headers",
                    proxy=self.proxy_url,
                    timeout=aiohttp.ClientTimeout(total=15),
                ) as resp:
                    data = await resp.json()
                    headers = data.get("headers", {})
                    revealing = ["X-Forwarded-For", "Via", "X-Real-Ip", "Forwarded"]
                    found = [h for h in revealing if h in headers]
                    level = "elite" if not found else ("anonymous" if "X-Forwarded-For" not in found else "transparent")
                    return TestResult("Anonymity", True, {"level": level, "revealing_headers": found}, (time.time() - start) * 1000)
        except Exception as e:
            return TestResult("Anonymity", False, {"error": str(e)}, (time.time() - start) * 1000)

    async def test_geo_accuracy(self) -> TestResult:
        start = time.time()
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    "https://ipapi.co/json/",
                    proxy=self.proxy_url,
                    timeout=aiohttp.ClientTimeout(total=15),
                ) as resp:
                    data = await resp.json()
                    return TestResult("Geo Accuracy", True, {
                        "country": data.get("country_name"),
                        "city": data.get("city"),
                        "org": data.get("org"),
                    }, (time.time() - start) * 1000)
        except Exception as e:
            return TestResult("Geo Accuracy", False, {"error": str(e)}, (time.time() - start) * 1000)

    async def test_https_support(self) -> TestResult:
        start = time.time()
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    "https://www.google.com",
                    proxy=self.proxy_url,
                    timeout=aiohttp.ClientTimeout(total=15),
                ) as resp:
                    return TestResult("HTTPS Support", resp.status == 200, {"status": resp.status}, (time.time() - start) * 1000)
        except Exception as e:
            return TestResult("HTTPS Support", False, {"error": str(e)}, (time.time() - start) * 1000)

    async def test_concurrent_load(self, concurrent=20) -> TestResult:
        start = time.time()
        success = 0
        total = concurrent
        sem = asyncio.Semaphore(concurrent)
        async with aiohttp.ClientSession() as session:
            async def fetch():
                nonlocal success
                async with sem:
                    try:
                        async with session.get("https://httpbin.org/ip", proxy=self.proxy_url, timeout=aiohttp.ClientTimeout(total=15)) as resp:
                            if resp.status == 200:
                                success += 1
                    except Exception:
                        pass
            await asyncio.gather(*[fetch() for _ in range(total)])
        return TestResult("Concurrent Load", success > total * 0.8, {"success": success, "total": total, "rate": f"{success/total*100:.0f}%"}, (time.time() - start) * 1000)

# Usage
async def main():
    framework = ProxyTestFramework("http://user:pass@proxy:8080")
    print("Running proxy tests...")
    results = await framework.run_all_tests()
    passed = sum(1 for r in results if r.passed)
    print(f"\nResults: {passed}/{len(results)} tests passed")

asyncio.run(main())

FAQ

How often should I run proxy tests?

Run connectivity tests every 5-15 minutes for production proxies. Full test suites (including load tests) should run hourly or when adding new proxies to your pool.

Can I test residential proxies differently from datacenter proxies?

Yes. Residential proxies should additionally be tested for IP consistency (sticky sessions), geographic accuracy, and carrier identification. Datacenter proxies should be tested for subnet diversity and speed.

What is an acceptable success rate for a proxy?

Above 95% for premium providers, above 85% for standard residential proxy pools. Below 80% indicates proxy quality issues that need investigation.

Implementation Best Practices

Error Handling and Retry Logic

Production scraping tools must handle failures gracefully. Implement exponential backoff with jitter:

import random
import time

def retry_with_backoff(func, max_retries=3, base_delay=1):
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s")
            time.sleep(delay)

Logging Configuration

Set up structured logging for debugging and monitoring:

import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
        }
        if record.exc_info:
            log_entry["exception"] = self.formatException(record.exc_info)
        return json.dumps(log_entry)

# Setup
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger("scraper")
logger.addHandler(handler)
logger.setLevel(logging.INFO)

Configuration Management

Use environment variables and config files for flexibility:

import os
from dataclasses import dataclass

@dataclass
class ScraperConfig:
    proxy_url: str = os.getenv("PROXY_URL", "")
    concurrent_workers: int = int(os.getenv("CONCURRENT_WORKERS", "10"))
    request_timeout: int = int(os.getenv("REQUEST_TIMEOUT", "15"))
    max_retries: int = int(os.getenv("MAX_RETRIES", "3"))
    rate_limit_per_second: float = float(os.getenv("RATE_LIMIT", "5"))
    output_format: str = os.getenv("OUTPUT_FORMAT", "json")
    database_url: str = os.getenv("DATABASE_URL", "sqlite:///results.db")
    log_level: str = os.getenv("LOG_LEVEL", "INFO")

    @classmethod
    def from_yaml(cls, filepath: str):
        import yaml
        with open(filepath) as f:
            config = yaml.safe_load(f)
        return cls(**{k: v for k, v in config.items() if hasattr(cls, k)})

Rate Limiting

Implement token bucket rate limiting to respect target sites:

import asyncio
import time

class RateLimiter:
    def __init__(self, rate: float, burst: int = 1):
        self.rate = rate  # requests per second
        self.burst = burst
        self.tokens = burst
        self.last_refill = time.time()
        self._lock = asyncio.Lock()

    async def acquire(self):
        async with self._lock:
            now = time.time()
            elapsed = now - self.last_refill
            self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
            self.last_refill = now

            if self.tokens >= 1:
                self.tokens -= 1
                return
            else:
                wait_time = (1 - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                self.tokens = 0

Data Validation

Validate scraped data before storage:

from typing import Optional, List
import re

class DataValidator:
    @staticmethod
    def validate_url(url: str) -> bool:
        pattern = re.compile(
            r'^https?://'
            r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'
            r'localhost|'
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
            r'(?::\d+)?'
            r'(?:/?|[/?]\S+)$', re.IGNORECASE)
        return bool(pattern.match(url))

    @staticmethod
    def validate_price(price: Optional[float]) -> bool:
        if price is None:
            return True
        return 0 < price < 1_000_000

    @staticmethod
    def validate_text(text: str, min_length: int = 1, max_length: int = 10000) -> bool:
        return min_length <= len(text.strip()) <= max_length

    def validate_record(self, record: dict) -> tuple:
        errors = []
        if "url" in record and not self.validate_url(record["url"]):
            errors.append("invalid URL")
        if "price" in record and not self.validate_price(record.get("price")):
            errors.append("invalid price")
        if "title" in record and not self.validate_text(record.get("title", ""), 1, 500):
            errors.append("invalid title length")
        return len(errors) == 0, errors

Deployment

Running as a Service

# Using systemd
sudo cat > /etc/systemd/system/scraper.service << EOF
[Unit]
Description=Web Scraping Service
After=network.target

[Service]
Type=simple
User=scraper
WorkingDir=/opt/scraper
ExecStart=/opt/scraper/venv/bin/python main.py
Restart=always
RestartSec=10
Environment=PROXY_URL=http://user:pass@proxy:8080
Environment=LOG_LEVEL=INFO

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl enable scraper
sudo systemctl start scraper

Docker Deployment

FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

HEALTHCHECK --interval=30s --timeout=10s CMD python -c "import requests; requests.get('http://localhost:8000/health')"

CMD ["python", "main.py"]

Testing

Write tests for your scraping tools:

import pytest
import asyncio

class TestProxyIntegration:
    def test_proxy_connectivity(self):
        import requests
        proxy = {"http": "http://user:pass@proxy:8080", "https": "http://user:pass@proxy:8080"}
        response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
        assert response.status_code == 200
        assert "origin" in response.json()

    def test_proxy_rotation(self):
        ips = set()
        for _ in range(5):
            import requests
            proxy = {"http": "http://user:pass@rotating-proxy:8080"}
            response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
            ips.add(response.json()["origin"])
        assert len(ips) > 1, "Proxy should rotate IPs"

    def test_data_validation(self):
        validator = DataValidator()
        valid, errors = validator.validate_record({
            "url": "https://example.com",
            "title": "Test Product",
            "price": 29.99,
        })
        assert valid
        assert len(errors) == 0

For proxy infrastructure guidance, see our proxy pool management guide and web scraping proxy overview.


Related Reading

Scroll to Top