Build a Proxy Pool Manager: Smart IP Management System
A proxy pool manager orchestrates a collection of proxy servers, handling health monitoring, rotation, geographic routing, and automatic failover. While basic proxy rotation simply cycles through a list, a proper pool manager makes intelligent decisions about which proxy to use based on real-time performance data.
Core Features
| Feature | Purpose |
|---|---|
| Health monitoring | Detect and remove dead proxies |
| Performance tracking | Measure latency and success rates |
| Geo-routing | Select proxies by country/city |
| Load balancing | Distribute traffic evenly |
| Cooldown management | Prevent proxy overuse |
| Auto-scaling | Add/remove proxies based on demand |
Implementation
import time
import asyncio
import aiohttp
import json
import logging
from dataclasses import dataclass, field
from typing import List, Optional, Dict
from collections import defaultdict
from enum import Enum
import random
logger = logging.getLogger(__name__)
class ProxyHealth(Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
DEAD = "dead"
@dataclass
class ManagedProxy:
url: str
country: str = ""
city: str = ""
provider: str = ""
proxy_type: str = "residential"
health: ProxyHealth = ProxyHealth.HEALTHY
latency_ms: float = 0
success_count: int = 0
fail_count: int = 0
consecutive_fails: int = 0
last_used: float = 0
last_checked: float = 0
cooldown_until: float = 0
tags: List[str] = field(default_factory=list)
@property
def success_rate(self) -> float:
total = self.success_count + self.fail_count
return self.success_count / total if total > 0 else 1.0
@property
def is_available(self) -> bool:
return (
self.health != ProxyHealth.DEAD
and time.time() > self.cooldown_until
)
class ProxyPoolManager:
def __init__(self, config: dict = None):
self.proxies: List[ManagedProxy] = []
self.config = config or {
"max_consecutive_fails": 5,
"cooldown_seconds": 3,
"health_check_interval": 60,
"min_success_rate": 0.5,
"check_url": "https://httpbin.org/ip",
}
self._geo_index: Dict[str, List[ManagedProxy]] = defaultdict(list)
def add_proxy(self, url: str, **kwargs):
proxy = ManagedProxy(url=url, **kwargs)
self.proxies.append(proxy)
if proxy.country:
self._geo_index[proxy.country.upper()].append(proxy)
return proxy
def add_proxies_from_file(self, filepath: str):
with open(filepath) as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
parts = line.split(",")
url = parts[0]
country = parts[1] if len(parts) > 1 else ""
self.add_proxy(url, country=country)
def get_proxy(self, country: str = None, proxy_type: str = None) -> Optional[ManagedProxy]:
candidates = [p for p in self.proxies if p.is_available]
if country:
geo_candidates = [p for p in candidates if p.country.upper() == country.upper()]
if geo_candidates:
candidates = geo_candidates
if proxy_type:
type_candidates = [p for p in candidates if p.proxy_type == proxy_type]
if type_candidates:
candidates = type_candidates
if not candidates:
self._revive_degraded()
candidates = [p for p in self.proxies if p.is_available]
if not candidates:
return None
# Weighted selection: prefer higher success rate and lower latency
weights = []
for p in candidates:
w = p.success_rate * 10
if p.latency_ms > 0:
w += 1000 / max(p.latency_ms, 1)
weights.append(max(w, 0.1))
proxy = random.choices(candidates, weights=weights, k=1)[0]
proxy.last_used = time.time()
proxy.cooldown_until = time.time() + self.config["cooldown_seconds"]
return proxy
def report_success(self, proxy: ManagedProxy, latency_ms: float):
proxy.success_count += 1
proxy.consecutive_fails = 0
proxy.latency_ms = proxy.latency_ms * 0.8 + latency_ms * 0.2
if proxy.health == ProxyHealth.DEGRADED:
proxy.health = ProxyHealth.HEALTHY
def report_failure(self, proxy: ManagedProxy):
proxy.fail_count += 1
proxy.consecutive_fails += 1
if proxy.consecutive_fails >= self.config["max_consecutive_fails"]:
proxy.health = ProxyHealth.DEAD
logger.warning(f"Proxy marked dead: {proxy.url}")
elif proxy.consecutive_fails >= 3:
proxy.health = ProxyHealth.DEGRADED
def _revive_degraded(self):
for p in self.proxies:
if p.health == ProxyHealth.DEAD:
p.health = ProxyHealth.DEGRADED
p.consecutive_fails = 0
async def health_check_all(self):
async with aiohttp.ClientSession() as session:
tasks = [self._check_one(session, p) for p in self.proxies]
await asyncio.gather(*tasks)
stats = self.get_pool_stats()
logger.info(f"Health check complete: {stats}")
async def _check_one(self, session: aiohttp.ClientSession, proxy: ManagedProxy):
try:
start = time.time()
async with session.get(
self.config["check_url"],
proxy=proxy.url,
timeout=aiohttp.ClientTimeout(total=10),
) as resp:
if resp.status == 200:
latency = (time.time() - start) * 1000
self.report_success(proxy, latency)
else:
self.report_failure(proxy)
except Exception:
self.report_failure(proxy)
proxy.last_checked = time.time()
def get_pool_stats(self) -> dict:
return {
"total": len(self.proxies),
"healthy": sum(1 for p in self.proxies if p.health == ProxyHealth.HEALTHY),
"degraded": sum(1 for p in self.proxies if p.health == ProxyHealth.DEGRADED),
"dead": sum(1 for p in self.proxies if p.health == ProxyHealth.DEAD),
"avg_latency_ms": sum(p.latency_ms for p in self.proxies if p.latency_ms > 0) / max(1, sum(1 for p in self.proxies if p.latency_ms > 0)),
"avg_success_rate": sum(p.success_rate for p in self.proxies) / max(1, len(self.proxies)),
"countries": list(set(p.country for p in self.proxies if p.country)),
}
def export_working(self, filepath: str):
working = [p for p in self.proxies if p.health != ProxyHealth.DEAD]
with open(filepath, "w") as f:
for p in working:
f.write(f"{p.url},{p.country},{p.latency_ms:.0f},{p.success_rate:.2f}\n")Usage Example
import asyncio
import requests
import time
# Initialize pool
pool = ProxyPoolManager()
pool.add_proxies_from_file("proxies.csv")
# Run initial health check
asyncio.run(pool.health_check_all())
print(pool.get_pool_stats())
# Scrape with managed proxies
urls = ["https://example.com/page/{}".format(i) for i in range(100)]
for url in urls:
proxy = pool.get_proxy(country="US")
if not proxy:
print("No proxies available!")
break
start = time.time()
try:
resp = requests.get(url, proxies={"http": proxy.url, "https": proxy.url}, timeout=15)
latency = (time.time() - start) * 1000
pool.report_success(proxy, latency)
except Exception:
pool.report_failure(proxy)FAQ
How is this different from a proxy rotator?
A proxy rotator selects the next proxy from a list. A pool manager adds health monitoring, geographic routing, performance tracking, and automatic failover on top of rotation. Think of the rotator as a component within the pool manager.
How large should my proxy pool be?
Size depends on your scraping volume and target sites. As a guideline: pool_size >= (requests_per_minute * avg_cooldown_seconds / 60). For residential proxies, providers typically give access to millions of IPs through a gateway.
Should I build this or use a commercial solution?
Commercial providers like those compared in our proxy provider comparisons include pool management in their gateway infrastructure. Build your own when using self-sourced proxies, needing custom routing logic, or managing proxies from multiple providers.
Implementation Best Practices
Error Handling and Retry Logic
Production scraping tools must handle failures gracefully. Implement exponential backoff with jitter:
import random
import time
def retry_with_backoff(func, max_retries=3, base_delay=1):
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s")
time.sleep(delay)Logging Configuration
Set up structured logging for debugging and monitoring:
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
}
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry)
# Setup
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger("scraper")
logger.addHandler(handler)
logger.setLevel(logging.INFO)Configuration Management
Use environment variables and config files for flexibility:
import os
from dataclasses import dataclass
@dataclass
class ScraperConfig:
proxy_url: str = os.getenv("PROXY_URL", "")
concurrent_workers: int = int(os.getenv("CONCURRENT_WORKERS", "10"))
request_timeout: int = int(os.getenv("REQUEST_TIMEOUT", "15"))
max_retries: int = int(os.getenv("MAX_RETRIES", "3"))
rate_limit_per_second: float = float(os.getenv("RATE_LIMIT", "5"))
output_format: str = os.getenv("OUTPUT_FORMAT", "json")
database_url: str = os.getenv("DATABASE_URL", "sqlite:///results.db")
log_level: str = os.getenv("LOG_LEVEL", "INFO")
@classmethod
def from_yaml(cls, filepath: str):
import yaml
with open(filepath) as f:
config = yaml.safe_load(f)
return cls(**{k: v for k, v in config.items() if hasattr(cls, k)})Rate Limiting
Implement token bucket rate limiting to respect target sites:
import asyncio
import time
class RateLimiter:
def __init__(self, rate: float, burst: int = 1):
self.rate = rate # requests per second
self.burst = burst
self.tokens = burst
self.last_refill = time.time()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return
else:
wait_time = (1 - self.tokens) / self.rate
await asyncio.sleep(wait_time)
self.tokens = 0Data Validation
Validate scraped data before storage:
from typing import Optional, List
import re
class DataValidator:
@staticmethod
def validate_url(url: str) -> bool:
pattern = re.compile(
r'^https?://'
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'
r'localhost|'
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
r'(?::\d+)?'
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
return bool(pattern.match(url))
@staticmethod
def validate_price(price: Optional[float]) -> bool:
if price is None:
return True
return 0 < price < 1_000_000
@staticmethod
def validate_text(text: str, min_length: int = 1, max_length: int = 10000) -> bool:
return min_length <= len(text.strip()) <= max_length
def validate_record(self, record: dict) -> tuple:
errors = []
if "url" in record and not self.validate_url(record["url"]):
errors.append("invalid URL")
if "price" in record and not self.validate_price(record.get("price")):
errors.append("invalid price")
if "title" in record and not self.validate_text(record.get("title", ""), 1, 500):
errors.append("invalid title length")
return len(errors) == 0, errorsDeployment
Running as a Service
# Using systemd
sudo cat > /etc/systemd/system/scraper.service << EOF
[Unit]
Description=Web Scraping Service
After=network.target
[Service]
Type=simple
User=scraper
WorkingDir=/opt/scraper
ExecStart=/opt/scraper/venv/bin/python main.py
Restart=always
RestartSec=10
Environment=PROXY_URL=http://user:pass@proxy:8080
Environment=LOG_LEVEL=INFO
[Install]
WantedBy=multi-user.target
EOF
sudo systemctl enable scraper
sudo systemctl start scraperDocker Deployment
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
HEALTHCHECK --interval=30s --timeout=10s CMD python -c "import requests; requests.get('http://localhost:8000/health')"
CMD ["python", "main.py"]Testing
Write tests for your scraping tools:
import pytest
import asyncio
class TestProxyIntegration:
def test_proxy_connectivity(self):
import requests
proxy = {"http": "http://user:pass@proxy:8080", "https": "http://user:pass@proxy:8080"}
response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
assert response.status_code == 200
assert "origin" in response.json()
def test_proxy_rotation(self):
ips = set()
for _ in range(5):
import requests
proxy = {"http": "http://user:pass@rotating-proxy:8080"}
response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
ips.add(response.json()["origin"])
assert len(ips) > 1, "Proxy should rotate IPs"
def test_data_validation(self):
validator = DataValidator()
valid, errors = validator.validate_record({
"url": "https://example.com",
"title": "Test Product",
"price": 29.99,
})
assert valid
assert len(errors) == 0For proxy infrastructure guidance, see our proxy pool management guide and web scraping proxy overview.
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
Related Reading
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)