Building a Web Scraping Queue with Redis + Python
Redis is the perfect backbone for web scraping infrastructure — it is fast, supports multiple data structures (lists, sets, sorted sets), and handles the core patterns you need: FIFO queues, URL deduplication, priority scheduling, and distributed locking.
Basic Queue Implementation
import redis
import json
import time
import hashlib
from dataclasses import dataclass, asdict
from typing import Optional
@dataclass
class ScrapeJob:
url: str
priority: int = 0
retries: int = 0
max_retries: int = 3
metadata: dict = None
created_at: float = None
def __post_init__(self):
self.created_at = self.created_at or time.time()
self.metadata = self.metadata or {}
class RedisScrapingQueue:
def __init__(self, redis_url='redis://localhost:6379', prefix='scraper'):
self.redis = redis.from_url(redis_url)
self.prefix = prefix
def _key(self, name):
return f"{self.prefix}:{name}"
# --- URL Queue ---
def enqueue(self, job: ScrapeJob):
url_hash = hashlib.sha256(job.url.encode()).hexdigest()
if self.redis.sismember(self._key('seen'), url_hash):
return False
self.redis.sadd(self._key('seen'), url_hash)
if job.priority > 0:
self.redis.zadd(self._key('priority_queue'),
{json.dumps(asdict(job)): -job.priority})
else:
self.redis.rpush(self._key('queue'), json.dumps(asdict(job)))
self.redis.incr(self._key('stats:enqueued'))
return True
def dequeue(self, timeout=5) -> Optional[ScrapeJob]:
# Check priority queue first
result = self.redis.zpopmin(self._key('priority_queue'))
if result:
job_data = json.loads(result[0][0])
return ScrapeJob(**job_data)
# Then regular queue
result = self.redis.blpop(self._key('queue'), timeout=timeout)
if result:
job_data = json.loads(result[1])
return ScrapeJob(**job_data)
return None
def retry(self, job: ScrapeJob):
job.retries += 1
if job.retries <= job.max_retries:
self.redis.rpush(self._key('retry_queue'), json.dumps(asdict(job)))
return True
else:
self.redis.rpush(self._key('dead_letter'), json.dumps(asdict(job)))
return False
def complete(self, job: ScrapeJob, result: dict):
self.redis.rpush(self._key('results'), json.dumps({
'url': job.url,
'result': result,
'completed_at': time.time(),
}))
self.redis.incr(self._key('stats:completed'))
# --- Stats ---
def stats(self):
return {
'queued': self.redis.llen(self._key('queue')),
'priority_queued': self.redis.zcard(self._key('priority_queue')),
'retry_queued': self.redis.llen(self._key('retry_queue')),
'dead_letter': self.redis.llen(self._key('dead_letter')),
'seen_urls': self.redis.scard(self._key('seen')),
'total_enqueued': int(self.redis.get(self._key('stats:enqueued')) or 0),
'total_completed': int(self.redis.get(self._key('stats:completed')) or 0),
'results': self.redis.llen(self._key('results')),
}Worker Implementation
import asyncio
import httpx
from bs4 import BeautifulSoup
class ScrapingWorker:
def __init__(self, queue: RedisScrapingQueue, proxy: str, worker_id: int = 0):
self.queue = queue
self.proxy = proxy
self.worker_id = worker_id
self.running = True
async def run(self):
async with httpx.AsyncClient(proxy=self.proxy, timeout=30) as client:
while self.running:
job = self.queue.dequeue(timeout=2)
if not job:
continue
try:
response = await client.get(job.url)
result = self.parse(response)
self.queue.complete(job, result)
except Exception as e:
print(f"Worker {self.worker_id}: Error on {job.url}: {e}")
self.queue.retry(job)
def parse(self, response):
soup = BeautifulSoup(response.text, 'lxml')
return {
'title': soup.title.string if soup.title else '',
'status': response.status_code,
}
# Run multiple workers
async def run_workers(num_workers=10, proxy='http://user:pass@proxy.com:8080'):
queue = RedisScrapingQueue()
# Seed URLs
for i in range(1000):
queue.enqueue(ScrapeJob(url=f'https://example.com/page/{i}'))
workers = [
ScrapingWorker(queue, proxy, worker_id=i)
for i in range(num_workers)
]
await asyncio.gather(*[w.run() for w in workers])
asyncio.run(run_workers())Rate Limiting with Redis
class RateLimiter:
def __init__(self, redis_client, key_prefix='ratelimit'):
self.redis = redis_client
self.prefix = key_prefix
async def acquire(self, domain, max_requests=10, window=1):
key = f"{self.prefix}:{domain}"
current = self.redis.incr(key)
if current == 1:
self.redis.expire(key, window)
if current > max_requests:
ttl = self.redis.ttl(key)
await asyncio.sleep(max(ttl, 0.1))
return await self.acquire(domain, max_requests, window)
return TrueInternal Links
- Web Scraping Architecture — broader architecture patterns
- Distributed Web Scraping — scale across machines
- Proxy Load Balancing — manage proxies in worker pools
- Data Deduplication — Redis-based URL dedup
- Monitoring Web Scrapers — track queue metrics
FAQ
Why Redis over other message queues for scraping?
Redis is simpler to set up, extremely fast (100K+ operations/second), and its data structures (sets for dedup, sorted sets for priority, lists for queues) map perfectly to scraping needs. RabbitMQ or Kafka add unnecessary complexity for most scraping jobs.
How do I handle Redis memory limits?
Monitor memory usage with redis-cli info memory. Set maxmemory and a policy (allkeys-lru). For the seen-URL set, use bloom filters (RedisBloom module) to reduce memory from O(n) to O(1). Archive completed results to disk or database.
Can I run workers on different machines?
Yes. Redis is network-accessible, so workers on different machines can connect to the same Redis instance. This is the simplest way to distribute scraping workload. Each worker pulls jobs independently.
How do I prioritize certain URLs?
Use Redis sorted sets with negative priority scores (ZADD with negative values so higher priority pops first). Check the priority queue before the regular queue in the dequeue method.
What happens if a worker crashes mid-job?
Use Redis BRPOPLPUSH to atomically move jobs to a processing list. If the worker crashes, a watchdog can move unacknowledged jobs back to the queue after a timeout. This prevents lost jobs.
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
Related Reading
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)