Build a Proxy Health Monitor: Real-Time Status Tracking

Build a Proxy Health Monitor: Real-Time Status Tracking

A proxy health monitor continuously validates your proxy pool, detects failing proxies before they impact scraping operations, and provides alerts when pool health degrades. This is essential for maintaining high success rates in web scraping infrastructure.

Implementation

import asyncio
import aiohttp
import time
import json
import smtplib
from datetime import datetime
from dataclasses import dataclass, field
from typing import List, Dict

@dataclass
class ProxyHealthData:
    url: str
    is_healthy: bool = True
    latency_ms: float = 0
    last_check: str = ""
    consecutive_failures: int = 0
    total_checks: int = 0
    successful_checks: int = 0
    external_ip: str = ""
    country: str = ""

class ProxyHealthMonitor:
    def __init__(self, proxies: List[str], check_interval: int = 60):
        self.proxies = {url: ProxyHealthData(url=url) for url in proxies}
        self.check_interval = check_interval
        self.check_url = "https://httpbin.org/ip"
        self.alerts: List[str] = []

    async def check_proxy(self, session: aiohttp.ClientSession, proxy_url: str):
        data = self.proxies[proxy_url]
        data.total_checks += 1
        data.last_check = datetime.now().isoformat()

        try:
            start = time.time()
            async with session.get(
                self.check_url,
                proxy=proxy_url,
                timeout=aiohttp.ClientTimeout(total=10),
            ) as resp:
                if resp.status == 200:
                    latency = (time.time() - start) * 1000
                    body = await resp.json()
                    data.is_healthy = True
                    data.latency_ms = latency
                    data.consecutive_failures = 0
                    data.successful_checks += 1
                    data.external_ip = body.get("origin", "")
                else:
                    self._record_failure(data)
        except Exception:
            self._record_failure(data)

    def _record_failure(self, data: ProxyHealthData):
        data.consecutive_failures += 1
        if data.consecutive_failures >= 3:
            if data.is_healthy:
                self.alerts.append(f"Proxy DOWN: {data.url} ({data.consecutive_failures} failures)")
            data.is_healthy = False

    async def check_all(self):
        async with aiohttp.ClientSession() as session:
            tasks = [self.check_proxy(session, url) for url in self.proxies]
            await asyncio.gather(*tasks)

        # Generate report
        healthy = sum(1 for d in self.proxies.values() if d.is_healthy)
        total = len(self.proxies)
        avg_latency = sum(d.latency_ms for d in self.proxies.values() if d.is_healthy) / max(healthy, 1)

        report = {
            "timestamp": datetime.now().isoformat(),
            "total": total,
            "healthy": healthy,
            "unhealthy": total - healthy,
            "health_pct": f"{healthy/total*100:.1f}%",
            "avg_latency_ms": round(avg_latency),
        }

        if self.alerts:
            report["alerts"] = self.alerts
            self.alerts = []

        return report

    async def run_continuous(self):
        print(f"Starting health monitor for {len(self.proxies)} proxies")
        print(f"Check interval: {self.check_interval}s")
        while True:
            report = await self.check_all()
            print(f"[{report['timestamp']}] Health: {report['health_pct']} "
                  f"({report['healthy']}/{report['total']}) "
                  f"Avg latency: {report['avg_latency_ms']}ms")
            if "alerts" in report:
                for alert in report["alerts"]:
                    print(f"  ALERT: {alert}")
            await asyncio.sleep(self.check_interval)

# Usage
monitor = ProxyHealthMonitor([
    "http://user:pass@proxy1:8080",
    "http://user:pass@proxy2:8080",
    "http://user:pass@proxy3:8080",
], check_interval=60)

asyncio.run(monitor.run_continuous())

FAQ

What check interval should I use?

For production scraping, check every 30-60 seconds. For less critical operations, every 5 minutes is sufficient. More frequent checks consume proxy bandwidth and may trigger rate limits on the test URL.

How do I differentiate between a dead proxy and a temporary network issue?

Use the consecutive failure threshold. A single failure might be a network glitch. Three consecutive failures strongly suggest the proxy is down. Five failures should mark it as dead. This approach is used by most proxy pool managers.

Can I monitor proxies from multiple providers?

Yes. Tag proxies by provider in your configuration and generate per-provider health reports. This helps compare proxy providers based on real uptime data.

Implementation Best Practices

Error Handling and Retry Logic

Production scraping tools must handle failures gracefully. Implement exponential backoff with jitter:

import random
import time

def retry_with_backoff(func, max_retries=3, base_delay=1):
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s")
            time.sleep(delay)

Logging Configuration

Set up structured logging for debugging and monitoring:

import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
        }
        if record.exc_info:
            log_entry["exception"] = self.formatException(record.exc_info)
        return json.dumps(log_entry)

# Setup
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger("scraper")
logger.addHandler(handler)
logger.setLevel(logging.INFO)

Configuration Management

Use environment variables and config files for flexibility:

import os
from dataclasses import dataclass

@dataclass
class ScraperConfig:
    proxy_url: str = os.getenv("PROXY_URL", "")
    concurrent_workers: int = int(os.getenv("CONCURRENT_WORKERS", "10"))
    request_timeout: int = int(os.getenv("REQUEST_TIMEOUT", "15"))
    max_retries: int = int(os.getenv("MAX_RETRIES", "3"))
    rate_limit_per_second: float = float(os.getenv("RATE_LIMIT", "5"))
    output_format: str = os.getenv("OUTPUT_FORMAT", "json")
    database_url: str = os.getenv("DATABASE_URL", "sqlite:///results.db")
    log_level: str = os.getenv("LOG_LEVEL", "INFO")

    @classmethod
    def from_yaml(cls, filepath: str):
        import yaml
        with open(filepath) as f:
            config = yaml.safe_load(f)
        return cls(**{k: v for k, v in config.items() if hasattr(cls, k)})

Rate Limiting

Implement token bucket rate limiting to respect target sites:

import asyncio
import time

class RateLimiter:
    def __init__(self, rate: float, burst: int = 1):
        self.rate = rate  # requests per second
        self.burst = burst
        self.tokens = burst
        self.last_refill = time.time()
        self._lock = asyncio.Lock()

    async def acquire(self):
        async with self._lock:
            now = time.time()
            elapsed = now - self.last_refill
            self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
            self.last_refill = now

            if self.tokens >= 1:
                self.tokens -= 1
                return
            else:
                wait_time = (1 - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                self.tokens = 0

Data Validation

Validate scraped data before storage:

from typing import Optional, List
import re

class DataValidator:
    @staticmethod
    def validate_url(url: str) -> bool:
        pattern = re.compile(
            r'^https?://'
            r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'
            r'localhost|'
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
            r'(?::\d+)?'
            r'(?:/?|[/?]\S+)$', re.IGNORECASE)
        return bool(pattern.match(url))

    @staticmethod
    def validate_price(price: Optional[float]) -> bool:
        if price is None:
            return True
        return 0 < price < 1_000_000

    @staticmethod
    def validate_text(text: str, min_length: int = 1, max_length: int = 10000) -> bool:
        return min_length <= len(text.strip()) <= max_length

    def validate_record(self, record: dict) -> tuple:
        errors = []
        if "url" in record and not self.validate_url(record["url"]):
            errors.append("invalid URL")
        if "price" in record and not self.validate_price(record.get("price")):
            errors.append("invalid price")
        if "title" in record and not self.validate_text(record.get("title", ""), 1, 500):
            errors.append("invalid title length")
        return len(errors) == 0, errors

Deployment

Running as a Service

# Using systemd
sudo cat > /etc/systemd/system/scraper.service << EOF
[Unit]
Description=Web Scraping Service
After=network.target

[Service]
Type=simple
User=scraper
WorkingDir=/opt/scraper
ExecStart=/opt/scraper/venv/bin/python main.py
Restart=always
RestartSec=10
Environment=PROXY_URL=http://user:pass@proxy:8080
Environment=LOG_LEVEL=INFO

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl enable scraper
sudo systemctl start scraper

Docker Deployment

FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

HEALTHCHECK --interval=30s --timeout=10s CMD python -c "import requests; requests.get('http://localhost:8000/health')"

CMD ["python", "main.py"]

Testing

Write tests for your scraping tools:

import pytest
import asyncio

class TestProxyIntegration:
    def test_proxy_connectivity(self):
        import requests
        proxy = {"http": "http://user:pass@proxy:8080", "https": "http://user:pass@proxy:8080"}
        response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
        assert response.status_code == 200
        assert "origin" in response.json()

    def test_proxy_rotation(self):
        ips = set()
        for _ in range(5):
            import requests
            proxy = {"http": "http://user:pass@rotating-proxy:8080"}
            response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
            ips.add(response.json()["origin"])
        assert len(ips) > 1, "Proxy should rotate IPs"

    def test_data_validation(self):
        validator = DataValidator()
        valid, errors = validator.validate_record({
            "url": "https://example.com",
            "title": "Test Product",
            "price": 29.99,
        })
        assert valid
        assert len(errors) == 0

For proxy infrastructure guidance, see our proxy pool management guide and web scraping proxy overview.


Related Reading

Scroll to Top