Building a Distributed Scraping System with Redis

Building a Distributed Scraping System with Redis

Single-machine scrapers hit limits quickly. CPU, bandwidth, and IP restrictions cap throughput. A distributed scraping system spreads work across multiple workers, uses Redis as the coordination layer, and scales horizontally by adding more machines.

This guide builds a production-grade distributed scraper with Redis for task queues, URL deduplication, rate limiting, and result aggregation.

Architecture

                    ┌──────────┐
                    │  Redis   │
                    │  Server  │
                    └────┬─────┘
         ┌───────────────┼───────────────┐
         │               │               │
    ┌────┴────┐    ┌─────┴────┐    ┌─────┴────┐
    │ Worker 1│    │ Worker 2 │    │ Worker 3 │
    │ + Proxy │    │ + Proxy  │    │ + Proxy  │
    └─────────┘    └──────────┘    └──────────┘

Redis handles five responsibilities:

  1. Task queue — URLs waiting to be scraped
  2. Deduplication — prevents scraping the same URL twice
  3. Rate limiting — controls requests per domain
  4. Result storage — temporary storage before database insert
  5. Coordination — worker heartbeats and status tracking

Setup

pip install redis httpx selectolax

Ensure Redis is running:

docker run -d -p 6379:6379 redis:7-alpine

Task Queue

# queue.py
import redis
import json
import time
import hashlib
from typing import Optional, Dict, List

class ScrapingQueue:
    def __init__(self, redis_url='redis://localhost:6379', prefix='scraper'):
        self.r = redis.from_url(redis_url, decode_responses=True)
        self.prefix = prefix
        self.queue_key = f'{prefix}:queue'
        self.processing_key = f'{prefix}:processing'
        self.seen_key = f'{prefix}:seen'
        self.results_key = f'{prefix}:results'
        self.stats_key = f'{prefix}:stats'

    def enqueue(self, url: str, metadata: dict = None, priority: int = 0):
        url_hash = hashlib.md5(url.encode()).hexdigest()

        # Skip if already seen
        if self.r.sismember(self.seen_key, url_hash):
            return False

        task = json.dumps({
            'url': url,
            'metadata': metadata or {},
            'enqueued_at': time.time(),
            'attempts': 0,
            'url_hash': url_hash,
        })

        # Use sorted set for priority queue (lower score = higher priority)
        self.r.zadd(self.queue_key, {task: priority})
        self.r.sadd(self.seen_key, url_hash)
        self.r.hincrby(self.stats_key, 'enqueued', 1)
        return True

    def enqueue_many(self, urls: List[str], priority: int = 0):
        pipe = self.r.pipeline()
        added = 0
        for url in urls:
            url_hash = hashlib.md5(url.encode()).hexdigest()
            if not self.r.sismember(self.seen_key, url_hash):
                task = json.dumps({
                    'url': url,
                    'metadata': {},
                    'enqueued_at': time.time(),
                    'attempts': 0,
                    'url_hash': url_hash,
                })
                pipe.zadd(self.queue_key, {task: priority})
                pipe.sadd(self.seen_key, url_hash)
                added += 1
        pipe.hincrby(self.stats_key, 'enqueued', added)
        pipe.execute()
        return added

    def dequeue(self, worker_id: str) -> Optional[Dict]:
        # Pop highest priority task (lowest score)
        result = self.r.zpopmin(self.queue_key, count=1)
        if not result:
            return None

        task_json, score = result[0]
        task = json.loads(task_json)
        task['attempts'] += 1
        task['worker_id'] = worker_id
        task['started_at'] = time.time()

        # Track in processing set with timeout
        self.r.hset(
            self.processing_key,
            task['url_hash'],
            json.dumps(task)
        )

        return task

    def complete(self, task: Dict, result: dict):
        self.r.hdel(self.processing_key, task['url_hash'])
        self.r.rpush(self.results_key, json.dumps({
            'url': task['url'],
            'data': result,
            'completed_at': time.time(),
            'worker_id': task.get('worker_id'),
        }))
        self.r.hincrby(self.stats_key, 'completed', 1)

    def fail(self, task: Dict, error: str, max_retries: int = 3):
        self.r.hdel(self.processing_key, task['url_hash'])
        self.r.hincrby(self.stats_key, 'failed', 1)

        if task['attempts'] < max_retries:
            # Re-enqueue with lower priority
            task_json = json.dumps({
                'url': task['url'],
                'metadata': task['metadata'],
                'enqueued_at': time.time(),
                'attempts': task['attempts'],
                'url_hash': task['url_hash'],
                'last_error': error,
            })
            self.r.zadd(self.queue_key, {task_json: task['attempts'] * 10})
            self.r.hincrby(self.stats_key, 'retried', 1)

    def get_results(self, batch_size: int = 100) -> List[dict]:
        results = []
        for _ in range(batch_size):
            item = self.r.lpop(self.results_key)
            if not item:
                break
            results.append(json.loads(item))
        return results

    def get_stats(self) -> dict:
        stats = self.r.hgetall(self.stats_key)
        return {
            'queue_size': self.r.zcard(self.queue_key),
            'processing': self.r.hlen(self.processing_key),
            'results_pending': self.r.llen(self.results_key),
            'total_seen': self.r.scard(self.seen_key),
            'enqueued': int(stats.get('enqueued', 0)),
            'completed': int(stats.get('completed', 0)),
            'failed': int(stats.get('failed', 0)),
            'retried': int(stats.get('retried', 0)),
        }

    def recover_stale_tasks(self, timeout_seconds: int = 300):
        """Re-enqueue tasks stuck in processing state."""
        processing = self.r.hgetall(self.processing_key)
        now = time.time()
        recovered = 0

        for url_hash, task_json in processing.items():
            task = json.loads(task_json)
            if now - task.get('started_at', 0) > timeout_seconds:
                self.r.hdel(self.processing_key, url_hash)
                self.fail(task, 'timeout_recovery')
                recovered += 1

        return recovered

Rate Limiter

Prevent hitting target sites too aggressively:

# rate_limiter.py
import redis
import time
from urllib.parse import urlparse

class DomainRateLimiter:
    def __init__(self, redis_client, default_rps=2):
        self.r = redis_client
        self.default_rps = default_rps
        self.domain_limits = {}

    def set_limit(self, domain: str, requests_per_second: float):
        self.domain_limits[domain] = requests_per_second

    async def wait_if_needed(self, url: str):
        domain = urlparse(url).netloc
        rps = self.domain_limits.get(domain, self.default_rps)
        min_interval = 1.0 / rps

        key = f'ratelimit:{domain}'

        while True:
            now = time.time()
            last_request = self.r.get(key)

            if last_request is None:
                self.r.setex(key, 60, str(now))
                return

            elapsed = now - float(last_request)
            if elapsed >= min_interval:
                self.r.setex(key, 60, str(now))
                return

            wait_time = min_interval - elapsed
            await asyncio.sleep(wait_time)

Worker Process

Each worker runs independently, pulling tasks from Redis:

# worker.py
import asyncio
import httpx
import signal
import uuid
import time
from queue import ScrapingQueue
from rate_limiter import DomainRateLimiter

class ScrapingWorker:
    def __init__(self, proxies: list, concurrency: int = 5):
        self.worker_id = f'worker-{uuid.uuid4().hex[:8]}'
        self.queue = ScrapingQueue()
        self.proxies = proxies
        self.proxy_index = 0
        self.concurrency = concurrency
        self.running = False
        self.rate_limiter = DomainRateLimiter(
            self.queue.r, default_rps=2
        )

    def get_next_proxy(self) -> str:
        proxy = self.proxies[self.proxy_index % len(self.proxies)]
        self.proxy_index += 1
        return proxy

    async def process_task(self, task: dict):
        url = task['url']
        proxy = self.get_next_proxy()

        await self.rate_limiter.wait_if_needed(url)

        try:
            async with httpx.AsyncClient(
                proxy=proxy,
                timeout=30,
                follow_redirects=True,
            ) as client:
                response = await client.get(url, headers={
                    'User-Agent': 'Mozilla/5.0 (compatible; Bot/1.0)',
                })

            if response.status_code == 200:
                data = self.parse(response.text, url)
                self.queue.complete(task, data)
                return True
            elif response.status_code in (403, 429):
                self.queue.fail(task, f'blocked:{response.status_code}')
            else:
                self.queue.fail(task, f'http:{response.status_code}')

        except Exception as e:
            self.queue.fail(task, str(e)[:200])

        return False

    def parse(self, html: str, url: str) -> dict:
        # Implement your parsing logic
        return {
            'url': url,
            'title': '',
            'content': html[:500],
            'scraped_at': time.time(),
        }

    async def run(self):
        self.running = True
        print(f'[{self.worker_id}] Starting with {len(self.proxies)} proxies')

        semaphore = asyncio.Semaphore(self.concurrency)

        async def worker_loop():
            while self.running:
                async with semaphore:
                    task = self.queue.dequeue(self.worker_id)
                    if task is None:
                        await asyncio.sleep(1)
                        continue
                    await self.process_task(task)

        tasks = [asyncio.create_task(worker_loop())
                 for _ in range(self.concurrency)]

        # Handle shutdown
        loop = asyncio.get_event_loop()
        for sig in (signal.SIGINT, signal.SIGTERM):
            loop.add_signal_handler(sig, self.stop)

        await asyncio.gather(*tasks)

    def stop(self):
        print(f'[{self.worker_id}] Shutting down...')
        self.running = False


if __name__ == '__main__':
    proxies = [
        'http://user:pass@proxy1.example.com:8080',
        'http://user:pass@proxy2.example.com:8080',
    ]

    worker = ScrapingWorker(proxies, concurrency=10)
    asyncio.run(worker.run())

Result Collector

A separate process collects results and writes them to a database:

# collector.py
import json
import time
from queue import ScrapingQueue

class ResultCollector:
    def __init__(self, output_file='results.jsonl'):
        self.queue = ScrapingQueue()
        self.output_file = output_file

    def run(self, batch_size=100, poll_interval=5):
        print(f'Collecting results to {self.output_file}')

        while True:
            results = self.queue.get_results(batch_size)

            if results:
                with open(self.output_file, 'a') as f:
                    for result in results:
                        f.write(json.dumps(result) + '\n')

                stats = self.queue.get_stats()
                print(
                    f'Collected {len(results)} results | '
                    f'Queue: {stats["queue_size"]} | '
                    f'Processing: {stats["processing"]} | '
                    f'Completed: {stats["completed"]}'
                )

            time.sleep(poll_interval)

if __name__ == '__main__':
    collector = ResultCollector()
    collector.run()

Running the System

Start all components:

# Terminal 1: Redis
docker run -d -p 6379:6379 redis:7-alpine

# Terminal 2: Seed the queue
python -c "
from queue import ScrapingQueue
q = ScrapingQueue()
urls = [f'https://example.com/page/{i}' for i in range(1000)]
added = q.enqueue_many(urls)
print(f'Added {added} URLs')
"

# Terminal 3: Worker 1
python worker.py

# Terminal 4: Worker 2 (different proxies)
PROXIES=http://proxy3:8080,http://proxy4:8080 python worker.py

# Terminal 5: Result collector
python collector.py

Scaling Considerations

You can run workers on separate machines as long as they can reach Redis. For multi-machine deployments, consider using Redis Sentinel or Redis Cluster for high availability. Each worker should have its own set of proxies to avoid IP conflicts.

For very large jobs (millions of URLs), use Redis streams instead of sorted sets for the task queue. Streams support consumer groups and automatic acknowledgment, which makes them better suited for distributed processing.

Internal Links

FAQ

Why Redis instead of RabbitMQ or Kafka?

Redis is simpler to set up and provides all the primitives you need: sorted sets for priority queues, sets for deduplication, and key-value storage for rate limiting. RabbitMQ is better for complex routing, and Kafka is better for event streaming. For scraping task queues, Redis is the pragmatic choice.

How do I handle duplicate URLs across workers?

The deduplication set (seen_key) is checked atomically in Redis before enqueuing. Since all workers share the same Redis instance, no URL is scraped twice. Use MD5 hashes of URLs to save memory in the deduplication set.

What happens if a worker crashes mid-task?

Tasks move from the queue to the processing set when dequeued. The recover_stale_tasks() method checks for tasks stuck in processing longer than a timeout (default 5 minutes) and re-enqueues them. Run this recovery check periodically in a supervisor process.

How many workers can share one Redis instance?

A single Redis instance handles 50,000+ operations per second. With 10-20 workers at moderate concurrency, Redis is never the bottleneck. If you need more throughput, use Redis Cluster to shard across multiple nodes.

How do I prioritize certain URLs?

The queue uses Redis sorted sets where the score represents priority. Lower scores are dequeued first. Assign high-priority URLs a score of 0, normal URLs a score of 5, and retries a score proportional to their attempt count. This ensures important pages are scraped before retries.


Related Reading

Scroll to Top