Build a Proxy Pool Manager: Smart IP Management System

Build a Proxy Pool Manager: Smart IP Management System

A proxy pool manager orchestrates a collection of proxy servers, handling health monitoring, rotation, geographic routing, and automatic failover. While basic proxy rotation simply cycles through a list, a proper pool manager makes intelligent decisions about which proxy to use based on real-time performance data.

Core Features

FeaturePurpose
Health monitoringDetect and remove dead proxies
Performance trackingMeasure latency and success rates
Geo-routingSelect proxies by country/city
Load balancingDistribute traffic evenly
Cooldown managementPrevent proxy overuse
Auto-scalingAdd/remove proxies based on demand

Implementation

import time
import asyncio
import aiohttp
import json
import logging
from dataclasses import dataclass, field
from typing import List, Optional, Dict
from collections import defaultdict
from enum import Enum
import random

logger = logging.getLogger(__name__)

class ProxyHealth(Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    DEAD = "dead"

@dataclass
class ManagedProxy:
    url: str
    country: str = ""
    city: str = ""
    provider: str = ""
    proxy_type: str = "residential"
    health: ProxyHealth = ProxyHealth.HEALTHY
    latency_ms: float = 0
    success_count: int = 0
    fail_count: int = 0
    consecutive_fails: int = 0
    last_used: float = 0
    last_checked: float = 0
    cooldown_until: float = 0
    tags: List[str] = field(default_factory=list)

    @property
    def success_rate(self) -> float:
        total = self.success_count + self.fail_count
        return self.success_count / total if total > 0 else 1.0

    @property
    def is_available(self) -> bool:
        return (
            self.health != ProxyHealth.DEAD
            and time.time() > self.cooldown_until
        )

class ProxyPoolManager:
    def __init__(self, config: dict = None):
        self.proxies: List[ManagedProxy] = []
        self.config = config or {
            "max_consecutive_fails": 5,
            "cooldown_seconds": 3,
            "health_check_interval": 60,
            "min_success_rate": 0.5,
            "check_url": "https://httpbin.org/ip",
        }
        self._geo_index: Dict[str, List[ManagedProxy]] = defaultdict(list)

    def add_proxy(self, url: str, **kwargs):
        proxy = ManagedProxy(url=url, **kwargs)
        self.proxies.append(proxy)
        if proxy.country:
            self._geo_index[proxy.country.upper()].append(proxy)
        return proxy

    def add_proxies_from_file(self, filepath: str):
        with open(filepath) as f:
            for line in f:
                line = line.strip()
                if line and not line.startswith("#"):
                    parts = line.split(",")
                    url = parts[0]
                    country = parts[1] if len(parts) > 1 else ""
                    self.add_proxy(url, country=country)

    def get_proxy(self, country: str = None, proxy_type: str = None) -> Optional[ManagedProxy]:
        candidates = [p for p in self.proxies if p.is_available]

        if country:
            geo_candidates = [p for p in candidates if p.country.upper() == country.upper()]
            if geo_candidates:
                candidates = geo_candidates

        if proxy_type:
            type_candidates = [p for p in candidates if p.proxy_type == proxy_type]
            if type_candidates:
                candidates = type_candidates

        if not candidates:
            self._revive_degraded()
            candidates = [p for p in self.proxies if p.is_available]

        if not candidates:
            return None

        # Weighted selection: prefer higher success rate and lower latency
        weights = []
        for p in candidates:
            w = p.success_rate * 10
            if p.latency_ms > 0:
                w += 1000 / max(p.latency_ms, 1)
            weights.append(max(w, 0.1))

        proxy = random.choices(candidates, weights=weights, k=1)[0]
        proxy.last_used = time.time()
        proxy.cooldown_until = time.time() + self.config["cooldown_seconds"]
        return proxy

    def report_success(self, proxy: ManagedProxy, latency_ms: float):
        proxy.success_count += 1
        proxy.consecutive_fails = 0
        proxy.latency_ms = proxy.latency_ms * 0.8 + latency_ms * 0.2
        if proxy.health == ProxyHealth.DEGRADED:
            proxy.health = ProxyHealth.HEALTHY

    def report_failure(self, proxy: ManagedProxy):
        proxy.fail_count += 1
        proxy.consecutive_fails += 1
        if proxy.consecutive_fails >= self.config["max_consecutive_fails"]:
            proxy.health = ProxyHealth.DEAD
            logger.warning(f"Proxy marked dead: {proxy.url}")
        elif proxy.consecutive_fails >= 3:
            proxy.health = ProxyHealth.DEGRADED

    def _revive_degraded(self):
        for p in self.proxies:
            if p.health == ProxyHealth.DEAD:
                p.health = ProxyHealth.DEGRADED
                p.consecutive_fails = 0

    async def health_check_all(self):
        async with aiohttp.ClientSession() as session:
            tasks = [self._check_one(session, p) for p in self.proxies]
            await asyncio.gather(*tasks)

        stats = self.get_pool_stats()
        logger.info(f"Health check complete: {stats}")

    async def _check_one(self, session: aiohttp.ClientSession, proxy: ManagedProxy):
        try:
            start = time.time()
            async with session.get(
                self.config["check_url"],
                proxy=proxy.url,
                timeout=aiohttp.ClientTimeout(total=10),
            ) as resp:
                if resp.status == 200:
                    latency = (time.time() - start) * 1000
                    self.report_success(proxy, latency)
                else:
                    self.report_failure(proxy)
        except Exception:
            self.report_failure(proxy)
        proxy.last_checked = time.time()

    def get_pool_stats(self) -> dict:
        return {
            "total": len(self.proxies),
            "healthy": sum(1 for p in self.proxies if p.health == ProxyHealth.HEALTHY),
            "degraded": sum(1 for p in self.proxies if p.health == ProxyHealth.DEGRADED),
            "dead": sum(1 for p in self.proxies if p.health == ProxyHealth.DEAD),
            "avg_latency_ms": sum(p.latency_ms for p in self.proxies if p.latency_ms > 0) / max(1, sum(1 for p in self.proxies if p.latency_ms > 0)),
            "avg_success_rate": sum(p.success_rate for p in self.proxies) / max(1, len(self.proxies)),
            "countries": list(set(p.country for p in self.proxies if p.country)),
        }

    def export_working(self, filepath: str):
        working = [p for p in self.proxies if p.health != ProxyHealth.DEAD]
        with open(filepath, "w") as f:
            for p in working:
                f.write(f"{p.url},{p.country},{p.latency_ms:.0f},{p.success_rate:.2f}\n")

Usage Example

import asyncio
import requests
import time

# Initialize pool
pool = ProxyPoolManager()
pool.add_proxies_from_file("proxies.csv")

# Run initial health check
asyncio.run(pool.health_check_all())
print(pool.get_pool_stats())

# Scrape with managed proxies
urls = ["https://example.com/page/{}".format(i) for i in range(100)]
for url in urls:
    proxy = pool.get_proxy(country="US")
    if not proxy:
        print("No proxies available!")
        break

    start = time.time()
    try:
        resp = requests.get(url, proxies={"http": proxy.url, "https": proxy.url}, timeout=15)
        latency = (time.time() - start) * 1000
        pool.report_success(proxy, latency)
    except Exception:
        pool.report_failure(proxy)

FAQ

How is this different from a proxy rotator?

A proxy rotator selects the next proxy from a list. A pool manager adds health monitoring, geographic routing, performance tracking, and automatic failover on top of rotation. Think of the rotator as a component within the pool manager.

How large should my proxy pool be?

Size depends on your scraping volume and target sites. As a guideline: pool_size >= (requests_per_minute * avg_cooldown_seconds / 60). For residential proxies, providers typically give access to millions of IPs through a gateway.

Should I build this or use a commercial solution?

Commercial providers like those compared in our proxy provider comparisons include pool management in their gateway infrastructure. Build your own when using self-sourced proxies, needing custom routing logic, or managing proxies from multiple providers.

Implementation Best Practices

Error Handling and Retry Logic

Production scraping tools must handle failures gracefully. Implement exponential backoff with jitter:

import random
import time

def retry_with_backoff(func, max_retries=3, base_delay=1):
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s")
            time.sleep(delay)

Logging Configuration

Set up structured logging for debugging and monitoring:

import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
        }
        if record.exc_info:
            log_entry["exception"] = self.formatException(record.exc_info)
        return json.dumps(log_entry)

# Setup
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger("scraper")
logger.addHandler(handler)
logger.setLevel(logging.INFO)

Configuration Management

Use environment variables and config files for flexibility:

import os
from dataclasses import dataclass

@dataclass
class ScraperConfig:
    proxy_url: str = os.getenv("PROXY_URL", "")
    concurrent_workers: int = int(os.getenv("CONCURRENT_WORKERS", "10"))
    request_timeout: int = int(os.getenv("REQUEST_TIMEOUT", "15"))
    max_retries: int = int(os.getenv("MAX_RETRIES", "3"))
    rate_limit_per_second: float = float(os.getenv("RATE_LIMIT", "5"))
    output_format: str = os.getenv("OUTPUT_FORMAT", "json")
    database_url: str = os.getenv("DATABASE_URL", "sqlite:///results.db")
    log_level: str = os.getenv("LOG_LEVEL", "INFO")

    @classmethod
    def from_yaml(cls, filepath: str):
        import yaml
        with open(filepath) as f:
            config = yaml.safe_load(f)
        return cls(**{k: v for k, v in config.items() if hasattr(cls, k)})

Rate Limiting

Implement token bucket rate limiting to respect target sites:

import asyncio
import time

class RateLimiter:
    def __init__(self, rate: float, burst: int = 1):
        self.rate = rate  # requests per second
        self.burst = burst
        self.tokens = burst
        self.last_refill = time.time()
        self._lock = asyncio.Lock()

    async def acquire(self):
        async with self._lock:
            now = time.time()
            elapsed = now - self.last_refill
            self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
            self.last_refill = now

            if self.tokens >= 1:
                self.tokens -= 1
                return
            else:
                wait_time = (1 - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                self.tokens = 0

Data Validation

Validate scraped data before storage:

from typing import Optional, List
import re

class DataValidator:
    @staticmethod
    def validate_url(url: str) -> bool:
        pattern = re.compile(
            r'^https?://'
            r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'
            r'localhost|'
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
            r'(?::\d+)?'
            r'(?:/?|[/?]\S+)$', re.IGNORECASE)
        return bool(pattern.match(url))

    @staticmethod
    def validate_price(price: Optional[float]) -> bool:
        if price is None:
            return True
        return 0 < price < 1_000_000

    @staticmethod
    def validate_text(text: str, min_length: int = 1, max_length: int = 10000) -> bool:
        return min_length <= len(text.strip()) <= max_length

    def validate_record(self, record: dict) -> tuple:
        errors = []
        if "url" in record and not self.validate_url(record["url"]):
            errors.append("invalid URL")
        if "price" in record and not self.validate_price(record.get("price")):
            errors.append("invalid price")
        if "title" in record and not self.validate_text(record.get("title", ""), 1, 500):
            errors.append("invalid title length")
        return len(errors) == 0, errors

Deployment

Running as a Service

# Using systemd
sudo cat > /etc/systemd/system/scraper.service << EOF
[Unit]
Description=Web Scraping Service
After=network.target

[Service]
Type=simple
User=scraper
WorkingDir=/opt/scraper
ExecStart=/opt/scraper/venv/bin/python main.py
Restart=always
RestartSec=10
Environment=PROXY_URL=http://user:pass@proxy:8080
Environment=LOG_LEVEL=INFO

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl enable scraper
sudo systemctl start scraper

Docker Deployment

FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

HEALTHCHECK --interval=30s --timeout=10s CMD python -c "import requests; requests.get('http://localhost:8000/health')"

CMD ["python", "main.py"]

Testing

Write tests for your scraping tools:

import pytest
import asyncio

class TestProxyIntegration:
    def test_proxy_connectivity(self):
        import requests
        proxy = {"http": "http://user:pass@proxy:8080", "https": "http://user:pass@proxy:8080"}
        response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
        assert response.status_code == 200
        assert "origin" in response.json()

    def test_proxy_rotation(self):
        ips = set()
        for _ in range(5):
            import requests
            proxy = {"http": "http://user:pass@rotating-proxy:8080"}
            response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
            ips.add(response.json()["origin"])
        assert len(ips) > 1, "Proxy should rotate IPs"

    def test_data_validation(self):
        validator = DataValidator()
        valid, errors = validator.validate_record({
            "url": "https://example.com",
            "title": "Test Product",
            "price": 29.99,
        })
        assert valid
        assert len(errors) == 0

For proxy infrastructure guidance, see our proxy pool management guide and web scraping proxy overview.


Related Reading

Scroll to Top