Building a Web Scraping Queue with Redis + Python

Building a Web Scraping Queue with Redis + Python

Redis is the perfect backbone for web scraping infrastructure — it is fast, supports multiple data structures (lists, sets, sorted sets), and handles the core patterns you need: FIFO queues, URL deduplication, priority scheduling, and distributed locking.

Basic Queue Implementation

import redis
import json
import time
import hashlib
from dataclasses import dataclass, asdict
from typing import Optional

@dataclass
class ScrapeJob:
    url: str
    priority: int = 0
    retries: int = 0
    max_retries: int = 3
    metadata: dict = None
    created_at: float = None
    
    def __post_init__(self):
        self.created_at = self.created_at or time.time()
        self.metadata = self.metadata or {}

class RedisScrapingQueue:
    def __init__(self, redis_url='redis://localhost:6379', prefix='scraper'):
        self.redis = redis.from_url(redis_url)
        self.prefix = prefix
    
    def _key(self, name):
        return f"{self.prefix}:{name}"
    
    # --- URL Queue ---
    
    def enqueue(self, job: ScrapeJob):
        url_hash = hashlib.sha256(job.url.encode()).hexdigest()
        
        if self.redis.sismember(self._key('seen'), url_hash):
            return False
        
        self.redis.sadd(self._key('seen'), url_hash)
        
        if job.priority > 0:
            self.redis.zadd(self._key('priority_queue'),
                          {json.dumps(asdict(job)): -job.priority})
        else:
            self.redis.rpush(self._key('queue'), json.dumps(asdict(job)))
        
        self.redis.incr(self._key('stats:enqueued'))
        return True
    
    def dequeue(self, timeout=5) -> Optional[ScrapeJob]:
        # Check priority queue first
        result = self.redis.zpopmin(self._key('priority_queue'))
        if result:
            job_data = json.loads(result[0][0])
            return ScrapeJob(**job_data)
        
        # Then regular queue
        result = self.redis.blpop(self._key('queue'), timeout=timeout)
        if result:
            job_data = json.loads(result[1])
            return ScrapeJob(**job_data)
        
        return None
    
    def retry(self, job: ScrapeJob):
        job.retries += 1
        if job.retries <= job.max_retries:
            self.redis.rpush(self._key('retry_queue'), json.dumps(asdict(job)))
            return True
        else:
            self.redis.rpush(self._key('dead_letter'), json.dumps(asdict(job)))
            return False
    
    def complete(self, job: ScrapeJob, result: dict):
        self.redis.rpush(self._key('results'), json.dumps({
            'url': job.url,
            'result': result,
            'completed_at': time.time(),
        }))
        self.redis.incr(self._key('stats:completed'))
    
    # --- Stats ---
    
    def stats(self):
        return {
            'queued': self.redis.llen(self._key('queue')),
            'priority_queued': self.redis.zcard(self._key('priority_queue')),
            'retry_queued': self.redis.llen(self._key('retry_queue')),
            'dead_letter': self.redis.llen(self._key('dead_letter')),
            'seen_urls': self.redis.scard(self._key('seen')),
            'total_enqueued': int(self.redis.get(self._key('stats:enqueued')) or 0),
            'total_completed': int(self.redis.get(self._key('stats:completed')) or 0),
            'results': self.redis.llen(self._key('results')),
        }

Worker Implementation

import asyncio
import httpx
from bs4 import BeautifulSoup

class ScrapingWorker:
    def __init__(self, queue: RedisScrapingQueue, proxy: str, worker_id: int = 0):
        self.queue = queue
        self.proxy = proxy
        self.worker_id = worker_id
        self.running = True
    
    async def run(self):
        async with httpx.AsyncClient(proxy=self.proxy, timeout=30) as client:
            while self.running:
                job = self.queue.dequeue(timeout=2)
                if not job:
                    continue
                
                try:
                    response = await client.get(job.url)
                    result = self.parse(response)
                    self.queue.complete(job, result)
                except Exception as e:
                    print(f"Worker {self.worker_id}: Error on {job.url}: {e}")
                    self.queue.retry(job)
    
    def parse(self, response):
        soup = BeautifulSoup(response.text, 'lxml')
        return {
            'title': soup.title.string if soup.title else '',
            'status': response.status_code,
        }

# Run multiple workers
async def run_workers(num_workers=10, proxy='http://user:pass@proxy.com:8080'):
    queue = RedisScrapingQueue()
    
    # Seed URLs
    for i in range(1000):
        queue.enqueue(ScrapeJob(url=f'https://example.com/page/{i}'))
    
    workers = [
        ScrapingWorker(queue, proxy, worker_id=i)
        for i in range(num_workers)
    ]
    
    await asyncio.gather(*[w.run() for w in workers])

asyncio.run(run_workers())

Rate Limiting with Redis

class RateLimiter:
    def __init__(self, redis_client, key_prefix='ratelimit'):
        self.redis = redis_client
        self.prefix = key_prefix
    
    async def acquire(self, domain, max_requests=10, window=1):
        key = f"{self.prefix}:{domain}"
        current = self.redis.incr(key)
        if current == 1:
            self.redis.expire(key, window)
        
        if current > max_requests:
            ttl = self.redis.ttl(key)
            await asyncio.sleep(max(ttl, 0.1))
            return await self.acquire(domain, max_requests, window)
        
        return True

Internal Links

FAQ

Why Redis over other message queues for scraping?

Redis is simpler to set up, extremely fast (100K+ operations/second), and its data structures (sets for dedup, sorted sets for priority, lists for queues) map perfectly to scraping needs. RabbitMQ or Kafka add unnecessary complexity for most scraping jobs.

How do I handle Redis memory limits?

Monitor memory usage with redis-cli info memory. Set maxmemory and a policy (allkeys-lru). For the seen-URL set, use bloom filters (RedisBloom module) to reduce memory from O(n) to O(1). Archive completed results to disk or database.

Can I run workers on different machines?

Yes. Redis is network-accessible, so workers on different machines can connect to the same Redis instance. This is the simplest way to distribute scraping workload. Each worker pulls jobs independently.

How do I prioritize certain URLs?

Use Redis sorted sets with negative priority scores (ZADD with negative values so higher priority pops first). Check the priority queue before the regular queue in the dequeue method.

What happens if a worker crashes mid-job?

Use Redis BRPOPLPUSH to atomically move jobs to a processing list. If the worker crashes, a watchdog can move unacknowledged jobs back to the queue after a timeout. This prevents lost jobs.


Related Reading

Scroll to Top