Build a Headless Browser Farm: Scalable Browser Pool

Build a Headless Browser Farm: Scalable Browser Pool

A headless browser farm manages a pool of browser instances for JavaScript-heavy web scraping. Instead of launching and closing browsers for each request, a farm keeps browsers warm, distributes work across instances, and handles crashes gracefully.

Architecture

Browser Farm Manager
├── Browser Pool (10-50 instances)
│   ├── Chromium Instance 1 (5 tabs)
│   ├── Chromium Instance 2 (5 tabs)
│   └── Chromium Instance N (5 tabs)
├── Request Queue (Redis)
├── Health Monitor
└── Auto-scaler

Implementation with Playwright

import asyncio
from playwright.async_api import async_playwright, Browser, BrowserContext, Page
from typing import List, Optional, Callable
import time
import logging

logger = logging.getLogger(__name__)

class BrowserInstance:
    def __init__(self, browser: Browser, proxy_url: str = ""):
        self.browser = browser
        self.proxy_url = proxy_url
        self.active_pages = 0
        self.max_pages = 5
        self.total_requests = 0
        self.created_at = time.time()

    @property
    def is_available(self):
        return self.active_pages < self.max_pages

    @property
    def age_minutes(self):
        return (time.time() - self.created_at) / 60

class BrowserFarm:
    def __init__(self, pool_size: int = 10, proxy_urls: List[str] = None):
        self.pool_size = pool_size
        self.proxy_urls = proxy_urls or []
        self.instances: List[BrowserInstance] = []
        self.playwright = None
        self._lock = asyncio.Lock()

    async def start(self):
        self.playwright = await async_playwright().start()
        for i in range(self.pool_size):
            await self._create_instance(i)
        logger.info(f"Browser farm started with {self.pool_size} instances")

    async def _create_instance(self, index: int):
        proxy_config = {}
        if self.proxy_urls:
            proxy_url = self.proxy_urls[index % len(self.proxy_urls)]
            proxy_config = {"proxy": {"server": proxy_url}}

        browser = await self.playwright.chromium.launch(
            headless=True,
            args=[
                "--disable-dev-shm-usage",
                "--disable-gpu",
                "--no-sandbox",
                "--disable-setuid-sandbox",
            ],
        )
        instance = BrowserInstance(browser, proxy_config.get("proxy", {}).get("server", ""))
        self.instances.append(instance)

    async def get_page(self) -> tuple:
        async with self._lock:
            for instance in self.instances:
                if instance.is_available:
                    context = await instance.browser.new_context(
                        user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
                        viewport={"width": 1920, "height": 1080},
                    )
                    page = await context.new_page()
                    instance.active_pages += 1
                    instance.total_requests += 1
                    return page, instance
        return None, None

    async def release_page(self, page: Page, instance: BrowserInstance):
        try:
            await page.context.close()
        except Exception:
            pass
        instance.active_pages -= 1

    async def execute(self, action: Callable, *args, **kwargs):
        page, instance = await self.get_page()
        if not page:
            raise RuntimeError("No available browser instances")
        try:
            result = await action(page, *args, **kwargs)
            return result
        finally:
            await self.release_page(page, instance)

    async def shutdown(self):
        for instance in self.instances:
            await instance.browser.close()
        if self.playwright:
            await self.playwright.stop()

# Usage
async def scrape_with_js(page: Page, url: str):
    await page.goto(url, wait_until="networkidle", timeout=30000)
    title = await page.title()
    content = await page.content()
    return {"url": url, "title": title, "html_length": len(content)}

async def main():
    farm = BrowserFarm(pool_size=5, proxy_urls=[
        "http://user:pass@proxy1:8080",
        "http://user:pass@proxy2:8080",
    ])
    await farm.start()

    urls = [f"https://example.com/page/{i}" for i in range(50)]
    tasks = [farm.execute(scrape_with_js, url) for url in urls]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    success = sum(1 for r in results if not isinstance(r, Exception))
    print(f"Scraped {success}/{len(urls)} pages")

    await farm.shutdown()

asyncio.run(main())

FAQ

How many browser instances should I run?

Each Chromium instance uses 100-300MB of RAM. On a machine with 16GB RAM, run 20-40 instances comfortably. For larger scale, use Kubernetes with the container deployment guide.

How do I prevent memory leaks?

Recycle browser instances every 30-60 minutes. Close all contexts and pages after use. Monitor memory usage and restart instances that exceed thresholds.

Can I use different proxies for each browser instance?

Yes. Assign proxies at the browser or context level. Each BrowserInstance in the farm can use a different proxy, providing natural IP rotation across your browser pool.

Implementation Best Practices

Error Handling and Retry Logic

Production scraping tools must handle failures gracefully. Implement exponential backoff with jitter:

import random
import time

def retry_with_backoff(func, max_retries=3, base_delay=1):
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s")
            time.sleep(delay)

Logging Configuration

Set up structured logging for debugging and monitoring:

import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
        }
        if record.exc_info:
            log_entry["exception"] = self.formatException(record.exc_info)
        return json.dumps(log_entry)

# Setup
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger("scraper")
logger.addHandler(handler)
logger.setLevel(logging.INFO)

Configuration Management

Use environment variables and config files for flexibility:

import os
from dataclasses import dataclass

@dataclass
class ScraperConfig:
    proxy_url: str = os.getenv("PROXY_URL", "")
    concurrent_workers: int = int(os.getenv("CONCURRENT_WORKERS", "10"))
    request_timeout: int = int(os.getenv("REQUEST_TIMEOUT", "15"))
    max_retries: int = int(os.getenv("MAX_RETRIES", "3"))
    rate_limit_per_second: float = float(os.getenv("RATE_LIMIT", "5"))
    output_format: str = os.getenv("OUTPUT_FORMAT", "json")
    database_url: str = os.getenv("DATABASE_URL", "sqlite:///results.db")
    log_level: str = os.getenv("LOG_LEVEL", "INFO")

    @classmethod
    def from_yaml(cls, filepath: str):
        import yaml
        with open(filepath) as f:
            config = yaml.safe_load(f)
        return cls(**{k: v for k, v in config.items() if hasattr(cls, k)})

Rate Limiting

Implement token bucket rate limiting to respect target sites:

import asyncio
import time

class RateLimiter:
    def __init__(self, rate: float, burst: int = 1):
        self.rate = rate  # requests per second
        self.burst = burst
        self.tokens = burst
        self.last_refill = time.time()
        self._lock = asyncio.Lock()

    async def acquire(self):
        async with self._lock:
            now = time.time()
            elapsed = now - self.last_refill
            self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
            self.last_refill = now

            if self.tokens >= 1:
                self.tokens -= 1
                return
            else:
                wait_time = (1 - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                self.tokens = 0

Data Validation

Validate scraped data before storage:

from typing import Optional, List
import re

class DataValidator:
    @staticmethod
    def validate_url(url: str) -> bool:
        pattern = re.compile(
            r'^https?://'
            r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'
            r'localhost|'
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
            r'(?::\d+)?'
            r'(?:/?|[/?]\S+)$', re.IGNORECASE)
        return bool(pattern.match(url))

    @staticmethod
    def validate_price(price: Optional[float]) -> bool:
        if price is None:
            return True
        return 0 < price < 1_000_000

    @staticmethod
    def validate_text(text: str, min_length: int = 1, max_length: int = 10000) -> bool:
        return min_length <= len(text.strip()) <= max_length

    def validate_record(self, record: dict) -> tuple:
        errors = []
        if "url" in record and not self.validate_url(record["url"]):
            errors.append("invalid URL")
        if "price" in record and not self.validate_price(record.get("price")):
            errors.append("invalid price")
        if "title" in record and not self.validate_text(record.get("title", ""), 1, 500):
            errors.append("invalid title length")
        return len(errors) == 0, errors

Deployment

Running as a Service

# Using systemd
sudo cat > /etc/systemd/system/scraper.service << EOF
[Unit]
Description=Web Scraping Service
After=network.target

[Service]
Type=simple
User=scraper
WorkingDir=/opt/scraper
ExecStart=/opt/scraper/venv/bin/python main.py
Restart=always
RestartSec=10
Environment=PROXY_URL=http://user:pass@proxy:8080
Environment=LOG_LEVEL=INFO

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl enable scraper
sudo systemctl start scraper

Docker Deployment

FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

HEALTHCHECK --interval=30s --timeout=10s CMD python -c "import requests; requests.get('http://localhost:8000/health')"

CMD ["python", "main.py"]

Testing

Write tests for your scraping tools:

import pytest
import asyncio

class TestProxyIntegration:
    def test_proxy_connectivity(self):
        import requests
        proxy = {"http": "http://user:pass@proxy:8080", "https": "http://user:pass@proxy:8080"}
        response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
        assert response.status_code == 200
        assert "origin" in response.json()

    def test_proxy_rotation(self):
        ips = set()
        for _ in range(5):
            import requests
            proxy = {"http": "http://user:pass@rotating-proxy:8080"}
            response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
            ips.add(response.json()["origin"])
        assert len(ips) > 1, "Proxy should rotate IPs"

    def test_data_validation(self):
        validator = DataValidator()
        valid, errors = validator.validate_record({
            "url": "https://example.com",
            "title": "Test Product",
            "price": 29.99,
        })
        assert valid
        assert len(errors) == 0

For proxy infrastructure guidance, see our proxy pool management guide and web scraping proxy overview.


Related Reading

Scroll to Top