Build a Proxy Health Monitor: Real-Time Status Tracking
A proxy health monitor continuously validates your proxy pool, detects failing proxies before they impact scraping operations, and provides alerts when pool health degrades. This is essential for maintaining high success rates in web scraping infrastructure.
Implementation
import asyncio
import aiohttp
import time
import json
import smtplib
from datetime import datetime
from dataclasses import dataclass, field
from typing import List, Dict
@dataclass
class ProxyHealthData:
url: str
is_healthy: bool = True
latency_ms: float = 0
last_check: str = ""
consecutive_failures: int = 0
total_checks: int = 0
successful_checks: int = 0
external_ip: str = ""
country: str = ""
class ProxyHealthMonitor:
def __init__(self, proxies: List[str], check_interval: int = 60):
self.proxies = {url: ProxyHealthData(url=url) for url in proxies}
self.check_interval = check_interval
self.check_url = "https://httpbin.org/ip"
self.alerts: List[str] = []
async def check_proxy(self, session: aiohttp.ClientSession, proxy_url: str):
data = self.proxies[proxy_url]
data.total_checks += 1
data.last_check = datetime.now().isoformat()
try:
start = time.time()
async with session.get(
self.check_url,
proxy=proxy_url,
timeout=aiohttp.ClientTimeout(total=10),
) as resp:
if resp.status == 200:
latency = (time.time() - start) * 1000
body = await resp.json()
data.is_healthy = True
data.latency_ms = latency
data.consecutive_failures = 0
data.successful_checks += 1
data.external_ip = body.get("origin", "")
else:
self._record_failure(data)
except Exception:
self._record_failure(data)
def _record_failure(self, data: ProxyHealthData):
data.consecutive_failures += 1
if data.consecutive_failures >= 3:
if data.is_healthy:
self.alerts.append(f"Proxy DOWN: {data.url} ({data.consecutive_failures} failures)")
data.is_healthy = False
async def check_all(self):
async with aiohttp.ClientSession() as session:
tasks = [self.check_proxy(session, url) for url in self.proxies]
await asyncio.gather(*tasks)
# Generate report
healthy = sum(1 for d in self.proxies.values() if d.is_healthy)
total = len(self.proxies)
avg_latency = sum(d.latency_ms for d in self.proxies.values() if d.is_healthy) / max(healthy, 1)
report = {
"timestamp": datetime.now().isoformat(),
"total": total,
"healthy": healthy,
"unhealthy": total - healthy,
"health_pct": f"{healthy/total*100:.1f}%",
"avg_latency_ms": round(avg_latency),
}
if self.alerts:
report["alerts"] = self.alerts
self.alerts = []
return report
async def run_continuous(self):
print(f"Starting health monitor for {len(self.proxies)} proxies")
print(f"Check interval: {self.check_interval}s")
while True:
report = await self.check_all()
print(f"[{report['timestamp']}] Health: {report['health_pct']} "
f"({report['healthy']}/{report['total']}) "
f"Avg latency: {report['avg_latency_ms']}ms")
if "alerts" in report:
for alert in report["alerts"]:
print(f" ALERT: {alert}")
await asyncio.sleep(self.check_interval)
# Usage
monitor = ProxyHealthMonitor([
"http://user:pass@proxy1:8080",
"http://user:pass@proxy2:8080",
"http://user:pass@proxy3:8080",
], check_interval=60)
asyncio.run(monitor.run_continuous())FAQ
What check interval should I use?
For production scraping, check every 30-60 seconds. For less critical operations, every 5 minutes is sufficient. More frequent checks consume proxy bandwidth and may trigger rate limits on the test URL.
How do I differentiate between a dead proxy and a temporary network issue?
Use the consecutive failure threshold. A single failure might be a network glitch. Three consecutive failures strongly suggest the proxy is down. Five failures should mark it as dead. This approach is used by most proxy pool managers.
Can I monitor proxies from multiple providers?
Yes. Tag proxies by provider in your configuration and generate per-provider health reports. This helps compare proxy providers based on real uptime data.
Implementation Best Practices
Error Handling and Retry Logic
Production scraping tools must handle failures gracefully. Implement exponential backoff with jitter:
import random
import time
def retry_with_backoff(func, max_retries=3, base_delay=1):
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s")
time.sleep(delay)Logging Configuration
Set up structured logging for debugging and monitoring:
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
}
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry)
# Setup
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger("scraper")
logger.addHandler(handler)
logger.setLevel(logging.INFO)Configuration Management
Use environment variables and config files for flexibility:
import os
from dataclasses import dataclass
@dataclass
class ScraperConfig:
proxy_url: str = os.getenv("PROXY_URL", "")
concurrent_workers: int = int(os.getenv("CONCURRENT_WORKERS", "10"))
request_timeout: int = int(os.getenv("REQUEST_TIMEOUT", "15"))
max_retries: int = int(os.getenv("MAX_RETRIES", "3"))
rate_limit_per_second: float = float(os.getenv("RATE_LIMIT", "5"))
output_format: str = os.getenv("OUTPUT_FORMAT", "json")
database_url: str = os.getenv("DATABASE_URL", "sqlite:///results.db")
log_level: str = os.getenv("LOG_LEVEL", "INFO")
@classmethod
def from_yaml(cls, filepath: str):
import yaml
with open(filepath) as f:
config = yaml.safe_load(f)
return cls(**{k: v for k, v in config.items() if hasattr(cls, k)})Rate Limiting
Implement token bucket rate limiting to respect target sites:
import asyncio
import time
class RateLimiter:
def __init__(self, rate: float, burst: int = 1):
self.rate = rate # requests per second
self.burst = burst
self.tokens = burst
self.last_refill = time.time()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return
else:
wait_time = (1 - self.tokens) / self.rate
await asyncio.sleep(wait_time)
self.tokens = 0Data Validation
Validate scraped data before storage:
from typing import Optional, List
import re
class DataValidator:
@staticmethod
def validate_url(url: str) -> bool:
pattern = re.compile(
r'^https?://'
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'
r'localhost|'
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
r'(?::\d+)?'
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
return bool(pattern.match(url))
@staticmethod
def validate_price(price: Optional[float]) -> bool:
if price is None:
return True
return 0 < price < 1_000_000
@staticmethod
def validate_text(text: str, min_length: int = 1, max_length: int = 10000) -> bool:
return min_length <= len(text.strip()) <= max_length
def validate_record(self, record: dict) -> tuple:
errors = []
if "url" in record and not self.validate_url(record["url"]):
errors.append("invalid URL")
if "price" in record and not self.validate_price(record.get("price")):
errors.append("invalid price")
if "title" in record and not self.validate_text(record.get("title", ""), 1, 500):
errors.append("invalid title length")
return len(errors) == 0, errorsDeployment
Running as a Service
# Using systemd
sudo cat > /etc/systemd/system/scraper.service << EOF
[Unit]
Description=Web Scraping Service
After=network.target
[Service]
Type=simple
User=scraper
WorkingDir=/opt/scraper
ExecStart=/opt/scraper/venv/bin/python main.py
Restart=always
RestartSec=10
Environment=PROXY_URL=http://user:pass@proxy:8080
Environment=LOG_LEVEL=INFO
[Install]
WantedBy=multi-user.target
EOF
sudo systemctl enable scraper
sudo systemctl start scraperDocker Deployment
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
HEALTHCHECK --interval=30s --timeout=10s CMD python -c "import requests; requests.get('http://localhost:8000/health')"
CMD ["python", "main.py"]Testing
Write tests for your scraping tools:
import pytest
import asyncio
class TestProxyIntegration:
def test_proxy_connectivity(self):
import requests
proxy = {"http": "http://user:pass@proxy:8080", "https": "http://user:pass@proxy:8080"}
response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
assert response.status_code == 200
assert "origin" in response.json()
def test_proxy_rotation(self):
ips = set()
for _ in range(5):
import requests
proxy = {"http": "http://user:pass@rotating-proxy:8080"}
response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
ips.add(response.json()["origin"])
assert len(ips) > 1, "Proxy should rotate IPs"
def test_data_validation(self):
validator = DataValidator()
valid, errors = validator.validate_record({
"url": "https://example.com",
"title": "Test Product",
"price": 29.99,
})
assert valid
assert len(errors) == 0For proxy infrastructure guidance, see our proxy pool management guide and web scraping proxy overview.
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
Related Reading
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)