Build a Scraping Job Scheduler: Task Queue System

Build a Scraping Job Scheduler: Task Queue System

A scraping job scheduler manages the lifecycle of scraping tasks — queuing URLs, distributing work across workers, handling retries, and ensuring rate limits are respected. Using Celery with Redis provides a battle-tested foundation for distributed scraping at any scale.

Celery + Redis Setup

# tasks.py
from celery import Celery
import requests
import time
from bs4 import BeautifulSoup

app = Celery('scraper', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

app.conf.update(
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],
    task_acks_late=True,
    worker_prefetch_multiplier=1,
    task_default_rate_limit='10/m',
    task_time_limit=60,
    task_soft_time_limit=45,
)

PROXY_URL = "http://user:pass@proxy.example.com:8080"

@app.task(bind=True, max_retries=3, default_retry_delay=30)
def scrape_page(self, url: str, selectors: dict = None):
    try:
        proxies = {"http": PROXY_URL, "https": PROXY_URL}
        response = requests.get(url, proxies=proxies, timeout=15,
                              headers={"User-Agent": "Mozilla/5.0"})

        if response.status_code == 429:
            raise self.retry(countdown=60)
        if response.status_code == 403:
            raise self.retry(countdown=30)

        response.raise_for_status()
        soup = BeautifulSoup(response.text, "lxml")

        result = {"url": url, "title": soup.title.string if soup.title else ""}
        if selectors:
            for key, sel in selectors.items():
                result[key] = [el.text.strip() for el in soup.select(sel)]

        return result

    except requests.exceptions.ProxyError:
        raise self.retry(countdown=10)
    except Exception as exc:
        raise self.retry(exc=exc)

@app.task
def scrape_batch(urls: list, selectors: dict = None):
    from celery import group
    job = group(scrape_page.s(url, selectors) for url in urls)
    result = job.apply_async()
    return result.id

Enqueuing Jobs

from tasks import scrape_page, scrape_batch

# Single page
result = scrape_page.delay("https://example.com")
print(result.get(timeout=30))

# Batch of URLs
urls = [f"https://example.com/page/{i}" for i in range(100)]
batch_id = scrape_batch.delay(urls, {"headings": "h2", "links": "a"})

# Priority queue
scrape_page.apply_async(
    args=["https://important-page.com"],
    queue="high_priority",
    priority=9,
)

Running Workers

# Start workers
celery -A tasks worker --concurrency=10 --loglevel=info

# Start with multiple queues
celery -A tasks worker -Q default,high_priority --concurrency=20

# Monitor with Flower
celery -A tasks flower --port=5555

FAQ

How many Celery workers should I run?

Match worker concurrency to your proxy pool capacity. If you have 100 residential proxies each handling 5 requests per minute, you can run 500 concurrent workers (though 100-200 is more practical to avoid overwhelming targets).

How do I handle rate limiting per domain?

Use Celery’s rate_limit parameter per task or implement domain-specific rate limiting with Redis. Create separate queues for different target domains with different rate limits.

Can I use this with Airflow?

Yes. Use Airflow to schedule when batches are enqueued, and Celery workers to execute individual scraping tasks. Airflow handles the “when and what” while Celery handles the “how” of distributed execution.

Implementation Best Practices

Error Handling and Retry Logic

Production scraping tools must handle failures gracefully. Implement exponential backoff with jitter:

import random
import time

def retry_with_backoff(func, max_retries=3, base_delay=1):
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s")
            time.sleep(delay)

Logging Configuration

Set up structured logging for debugging and monitoring:

import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
        }
        if record.exc_info:
            log_entry["exception"] = self.formatException(record.exc_info)
        return json.dumps(log_entry)

# Setup
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger("scraper")
logger.addHandler(handler)
logger.setLevel(logging.INFO)

Configuration Management

Use environment variables and config files for flexibility:

import os
from dataclasses import dataclass

@dataclass
class ScraperConfig:
    proxy_url: str = os.getenv("PROXY_URL", "")
    concurrent_workers: int = int(os.getenv("CONCURRENT_WORKERS", "10"))
    request_timeout: int = int(os.getenv("REQUEST_TIMEOUT", "15"))
    max_retries: int = int(os.getenv("MAX_RETRIES", "3"))
    rate_limit_per_second: float = float(os.getenv("RATE_LIMIT", "5"))
    output_format: str = os.getenv("OUTPUT_FORMAT", "json")
    database_url: str = os.getenv("DATABASE_URL", "sqlite:///results.db")
    log_level: str = os.getenv("LOG_LEVEL", "INFO")

    @classmethod
    def from_yaml(cls, filepath: str):
        import yaml
        with open(filepath) as f:
            config = yaml.safe_load(f)
        return cls(**{k: v for k, v in config.items() if hasattr(cls, k)})

Rate Limiting

Implement token bucket rate limiting to respect target sites:

import asyncio
import time

class RateLimiter:
    def __init__(self, rate: float, burst: int = 1):
        self.rate = rate  # requests per second
        self.burst = burst
        self.tokens = burst
        self.last_refill = time.time()
        self._lock = asyncio.Lock()

    async def acquire(self):
        async with self._lock:
            now = time.time()
            elapsed = now - self.last_refill
            self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
            self.last_refill = now

            if self.tokens >= 1:
                self.tokens -= 1
                return
            else:
                wait_time = (1 - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                self.tokens = 0

Data Validation

Validate scraped data before storage:

from typing import Optional, List
import re

class DataValidator:
    @staticmethod
    def validate_url(url: str) -> bool:
        pattern = re.compile(
            r'^https?://'
            r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'
            r'localhost|'
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
            r'(?::\d+)?'
            r'(?:/?|[/?]\S+)$', re.IGNORECASE)
        return bool(pattern.match(url))

    @staticmethod
    def validate_price(price: Optional[float]) -> bool:
        if price is None:
            return True
        return 0 < price < 1_000_000

    @staticmethod
    def validate_text(text: str, min_length: int = 1, max_length: int = 10000) -> bool:
        return min_length <= len(text.strip()) <= max_length

    def validate_record(self, record: dict) -> tuple:
        errors = []
        if "url" in record and not self.validate_url(record["url"]):
            errors.append("invalid URL")
        if "price" in record and not self.validate_price(record.get("price")):
            errors.append("invalid price")
        if "title" in record and not self.validate_text(record.get("title", ""), 1, 500):
            errors.append("invalid title length")
        return len(errors) == 0, errors

Deployment

Running as a Service

# Using systemd
sudo cat > /etc/systemd/system/scraper.service << EOF
[Unit]
Description=Web Scraping Service
After=network.target

[Service]
Type=simple
User=scraper
WorkingDir=/opt/scraper
ExecStart=/opt/scraper/venv/bin/python main.py
Restart=always
RestartSec=10
Environment=PROXY_URL=http://user:pass@proxy:8080
Environment=LOG_LEVEL=INFO

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl enable scraper
sudo systemctl start scraper

Docker Deployment

FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

HEALTHCHECK --interval=30s --timeout=10s CMD python -c "import requests; requests.get('http://localhost:8000/health')"

CMD ["python", "main.py"]

Testing

Write tests for your scraping tools:

import pytest
import asyncio

class TestProxyIntegration:
    def test_proxy_connectivity(self):
        import requests
        proxy = {"http": "http://user:pass@proxy:8080", "https": "http://user:pass@proxy:8080"}
        response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
        assert response.status_code == 200
        assert "origin" in response.json()

    def test_proxy_rotation(self):
        ips = set()
        for _ in range(5):
            import requests
            proxy = {"http": "http://user:pass@rotating-proxy:8080"}
            response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
            ips.add(response.json()["origin"])
        assert len(ips) > 1, "Proxy should rotate IPs"

    def test_data_validation(self):
        validator = DataValidator()
        valid, errors = validator.validate_record({
            "url": "https://example.com",
            "title": "Test Product",
            "price": 29.99,
        })
        assert valid
        assert len(errors) == 0

For proxy infrastructure guidance, see our proxy pool management guide and web scraping proxy overview.


Related Reading

Scroll to Top