Proxy Pool Manager: Open Source Project Guide

Proxy Pool Manager: Open Source Project Guide

A proxy pool manager is the central service that stores, validates, rotates, and serves proxies to your scrapers. Instead of hardcoding proxy lists in every scraper, a pool manager provides a single API endpoint that returns the best available proxy on each request.

This guide walks through building a production-quality, open-source proxy pool manager in Python.

What the Pool Manager Does

  • Ingests proxies from multiple sources (files, APIs, scraped lists)
  • Validates proxies continuously with async health checks
  • Scores proxies based on latency, success rate, and anonymity
  • Serves proxies via REST API with filtering (country, protocol, anonymity)
  • Rotates intelligently using weighted random selection
  • Provides a dashboard to monitor pool health

Project Structure

proxy-pool-manager/
├── proxy_pool/
│   ├── __init__.py
│   ├── app.py              # FastAPI application
│   ├── models.py            # Data models
│   ├── pool.py              # Core pool logic
│   ├── checker.py           # Health checker
│   ├── fetchers/
│   │   ├── __init__.py
│   │   ├── base.py          # Base fetcher class
│   │   ├── file_fetcher.py  # Load from file
│   │   ├── api_fetcher.py   # Load from provider API
│   │   └── scrape_fetcher.py # Scrape free proxy lists
│   ├── storage/
│   │   ├── __init__.py
│   │   ├── memory.py        # In-memory storage
│   │   └── sqlite.py        # SQLite persistence
│   └── dashboard/
│       └── index.html       # Web dashboard
├── tests/
│   ├── test_pool.py
│   ├── test_checker.py
│   └── test_api.py
├── config.yaml
├── pyproject.toml
├── Dockerfile
└── docker-compose.yml

Data Models

# proxy_pool/models.py
from dataclasses import dataclass, field
from typing import Optional
from enum import Enum
import time

class Protocol(str, Enum):
    HTTP = "http"
    HTTPS = "https"
    SOCKS4 = "socks4"
    SOCKS5 = "socks5"

class Anonymity(str, Enum):
    TRANSPARENT = "transparent"
    ANONYMOUS = "anonymous"
    ELITE = "elite"
    UNKNOWN = "unknown"

@dataclass
class Proxy:
    host: str
    port: int
    protocol: Protocol = Protocol.HTTP
    username: Optional[str] = None
    password: Optional[str] = None
    country: str = "unknown"
    anonymity: Anonymity = Anonymity.UNKNOWN
    alive: bool = False
    latency_ms: float = 0
    success_rate: float = 0
    score: float = 0
    total_served: int = 0
    total_checks: int = 0
    successful_checks: int = 0
    last_check: float = 0
    last_served: float = 0
    added_at: float = field(default_factory=time.time)
    source: str = "manual"
    tags: list = field(default_factory=list)

    @property
    def url(self) -> str:
        auth = ""
        if self.username and self.password:
            auth = f"{self.username}:{self.password}@"
        return f"{self.protocol.value}://{auth}{self.host}:{self.port}"

    @property
    def id(self) -> str:
        return f"{self.host}:{self.port}"

    def update_score(self):
        """Calculate composite score from latency, success rate, and freshness."""
        latency_score = max(0, 1 - (self.latency_ms / 5000))
        freshness = max(0, 1 - (time.time() - self.last_check) / 3600)
        self.score = (
            self.success_rate * 0.5 +
            latency_score * 0.3 +
            freshness * 0.2
        )

Core Pool Logic

# proxy_pool/pool.py
import random
import threading
from typing import List, Optional, Dict
from .models import Proxy, Protocol, Anonymity

class ProxyPool:
    def __init__(self):
        self._proxies: Dict[str, Proxy] = {}
        self._lock = threading.RLock()

    def add(self, proxy: Proxy) -> bool:
        with self._lock:
            if proxy.id in self._proxies:
                return False
            self._proxies[proxy.id] = proxy
            return True

    def remove(self, proxy_id: str):
        with self._lock:
            self._proxies.pop(proxy_id, None)

    def get(
        self,
        protocol: Optional[Protocol] = None,
        country: Optional[str] = None,
        anonymity: Optional[Anonymity] = None,
        min_score: float = 0,
    ) -> Optional[Proxy]:
        with self._lock:
            candidates = self._filter(
                protocol, country, anonymity, min_score
            )
            if not candidates:
                return None

            # Weighted random selection by score
            weights = [max(p.score, 0.01) for p in candidates]
            proxy = random.choices(candidates, weights=weights, k=1)[0]
            proxy.total_served += 1
            proxy.last_served = __import__('time').time()
            return proxy

    def get_many(
        self,
        count: int = 10,
        protocol: Optional[Protocol] = None,
        country: Optional[str] = None,
    ) -> List[Proxy]:
        with self._lock:
            candidates = self._filter(protocol, country)
            candidates.sort(key=lambda p: p.score, reverse=True)
            return candidates[:count]

    def _filter(
        self,
        protocol=None,
        country=None,
        anonymity=None,
        min_score=0,
    ) -> List[Proxy]:
        result = [p for p in self._proxies.values() if p.alive]

        if protocol:
            result = [p for p in result if p.protocol == protocol]
        if country:
            result = [p for p in result if p.country.lower() == country.lower()]
        if anonymity:
            result = [p for p in result if p.anonymity == anonymity]
        if min_score > 0:
            result = [p for p in result if p.score >= min_score]

        return result

    @property
    def stats(self) -> dict:
        proxies = list(self._proxies.values())
        alive = [p for p in proxies if p.alive]
        return {
            "total": len(proxies),
            "alive": len(alive),
            "dead": len(proxies) - len(alive),
            "countries": len(set(p.country for p in alive)),
            "avg_score": (
                sum(p.score for p in alive) / len(alive)
                if alive else 0
            ),
            "avg_latency_ms": (
                sum(p.latency_ms for p in alive) / len(alive)
                if alive else 0
            ),
            "by_protocol": {
                proto.value: len([p for p in alive if p.protocol == proto])
                for proto in Protocol
            },
        }

    def all_proxies(self) -> List[Proxy]:
        return list(self._proxies.values())

Proxy Fetchers

# proxy_pool/fetchers/base.py
from abc import ABC, abstractmethod
from typing import List
from ..models import Proxy

class ProxyFetcher(ABC):
    @abstractmethod
    async def fetch(self) -> List[Proxy]:
        pass

# proxy_pool/fetchers/file_fetcher.py
from .base import ProxyFetcher
from ..models import Proxy, Protocol

class FileFetcher(ProxyFetcher):
    def __init__(self, filepath: str):
        self.filepath = filepath

    async def fetch(self) -> list:
        proxies = []
        with open(self.filepath) as f:
            for line in f:
                line = line.strip()
                if not line or line.startswith('#'):
                    continue
                proxy = self._parse_line(line)
                if proxy:
                    proxies.append(proxy)
        return proxies

    def _parse_line(self, line: str):
        # Supports: host:port, protocol://host:port,
        # protocol://user:pass@host:port
        try:
            if '://' in line:
                from urllib.parse import urlparse
                parsed = urlparse(line)
                return Proxy(
                    host=parsed.hostname,
                    port=parsed.port,
                    protocol=Protocol(parsed.scheme),
                    username=parsed.username,
                    password=parsed.password,
                    source="file",
                )
            else:
                host, port = line.split(':')
                return Proxy(host=host, port=int(port), source="file")
        except Exception:
            return None

REST API

# proxy_pool/app.py
from fastapi import FastAPI, Query, HTTPException
from fastapi.responses import HTMLResponse
from typing import Optional
from .pool import ProxyPool
from .checker import HealthChecker
from .models import Protocol, Anonymity
import asyncio

app = FastAPI(title="Proxy Pool Manager", version="1.0.0")
pool = ProxyPool()
checker = None

@app.on_event("startup")
async def startup():
    global checker
    checker = HealthChecker(pool, interval=60)
    asyncio.create_task(checker.start())

@app.get("/api/proxy")
async def get_proxy(
    protocol: Optional[Protocol] = None,
    country: Optional[str] = None,
    anonymity: Optional[Anonymity] = None,
    format: str = Query("url", enum=["url", "json"]),
):
    proxy = pool.get(protocol=protocol, country=country, anonymity=anonymity)
    if not proxy:
        raise HTTPException(404, "No matching proxy available")

    if format == "url":
        return {"proxy": proxy.url}
    return {
        "proxy": proxy.url,
        "host": proxy.host,
        "port": proxy.port,
        "protocol": proxy.protocol.value,
        "country": proxy.country,
        "anonymity": proxy.anonymity.value,
        "latency_ms": proxy.latency_ms,
        "score": round(proxy.score, 3),
    }

@app.get("/api/proxies")
async def list_proxies(
    protocol: Optional[Protocol] = None,
    country: Optional[str] = None,
    alive_only: bool = True,
    limit: int = 50,
):
    if alive_only:
        proxies = pool.get_many(count=limit, protocol=protocol, country=country)
    else:
        proxies = pool.all_proxies()[:limit]

    return [
        {
            "proxy": p.url,
            "country": p.country,
            "anonymity": p.anonymity.value,
            "latency_ms": p.latency_ms,
            "score": round(p.score, 3),
            "alive": p.alive,
        }
        for p in proxies
    ]

@app.get("/api/stats")
async def get_stats():
    return pool.stats

@app.post("/api/proxies")
async def add_proxy(host: str, port: int, protocol: str = "http"):
    from .models import Proxy
    proxy = Proxy(host=host, port=port, protocol=Protocol(protocol))
    if pool.add(proxy):
        return {"message": "Proxy added", "id": proxy.id}
    return {"message": "Proxy already exists"}

@app.delete("/api/proxies/{proxy_id}")
async def remove_proxy(proxy_id: str):
    pool.remove(proxy_id)
    return {"message": "Proxy removed"}

@app.post("/api/check")
async def trigger_check():
    await checker.run_check()
    return pool.stats

Docker Deployment

FROM python:3.12-slim
WORKDIR /app
COPY pyproject.toml .
RUN pip install .
COPY . .
EXPOSE 8000
CMD ["uvicorn", "proxy_pool.app:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'
services:
  proxy-pool:
    build: .
    ports:
      - "8000:8000"
    volumes:
      - ./config.yaml:/app/config.yaml
      - ./data:/app/data
    restart: unless-stopped

Client Usage

Integrating the pool manager into your scraper takes one line:

import httpx

async def get_proxy():
    async with httpx.AsyncClient() as client:
        resp = await client.get("http://localhost:8000/api/proxy?country=US")
        return resp.json()["proxy"]

# Use in scraper
proxy_url = await get_proxy()
async with httpx.AsyncClient(proxy=proxy_url) as client:
    response = await client.get("https://target-site.com")

Internal Links

FAQ

How is this different from a rotating proxy service?

A rotating proxy service provides proxies and handles rotation for you. A pool manager is the software that manages your own proxies — whether purchased from multiple providers, scraped from free lists, or self-hosted. You control rotation strategy, health checking, and which proxies serve which domains.

How many proxies can the pool manager handle?

The in-memory pool handles 10,000+ proxies easily. Health checking is the bottleneck — checking 10,000 proxies at 100 concurrent checks takes about 2 minutes. For larger pools, increase check intervals or shard checks across multiple workers.

Should I persist the proxy pool to disk?

Yes, for production use. The SQLite storage backend saves proxy metadata, health history, and scores. On restart, the pool loads from SQLite instead of re-checking everything from scratch. Use in-memory storage only for development.

How do I add proxies from multiple providers?

Create a fetcher for each provider. The API fetcher calls provider endpoints to get proxy lists. Schedule fetchers to run periodically (every 30 minutes) to pick up new proxies. The pool deduplicates by host:port, so multiple fetchers can add overlapping proxies safely.

Can I run multiple pool manager instances?

For high availability, run multiple instances behind a load balancer. Without shared state, each instance maintains its own pool and health checks. For shared state, point all instances at the same Redis or PostgreSQL backend instead of in-memory storage.


Related Reading

Scroll to Top