Building a Distributed Scraping System with Redis
Single-machine scrapers hit limits quickly. CPU, bandwidth, and IP restrictions cap throughput. A distributed scraping system spreads work across multiple workers, uses Redis as the coordination layer, and scales horizontally by adding more machines.
This guide builds a production-grade distributed scraper with Redis for task queues, URL deduplication, rate limiting, and result aggregation.
Architecture
┌──────────┐
│ Redis │
│ Server │
└────┬─────┘
┌───────────────┼───────────────┐
│ │ │
┌────┴────┐ ┌─────┴────┐ ┌─────┴────┐
│ Worker 1│ │ Worker 2 │ │ Worker 3 │
│ + Proxy │ │ + Proxy │ │ + Proxy │
└─────────┘ └──────────┘ └──────────┘Redis handles five responsibilities:
- Task queue — URLs waiting to be scraped
- Deduplication — prevents scraping the same URL twice
- Rate limiting — controls requests per domain
- Result storage — temporary storage before database insert
- Coordination — worker heartbeats and status tracking
Setup
pip install redis httpx selectolaxEnsure Redis is running:
docker run -d -p 6379:6379 redis:7-alpineTask Queue
# queue.py
import redis
import json
import time
import hashlib
from typing import Optional, Dict, List
class ScrapingQueue:
def __init__(self, redis_url='redis://localhost:6379', prefix='scraper'):
self.r = redis.from_url(redis_url, decode_responses=True)
self.prefix = prefix
self.queue_key = f'{prefix}:queue'
self.processing_key = f'{prefix}:processing'
self.seen_key = f'{prefix}:seen'
self.results_key = f'{prefix}:results'
self.stats_key = f'{prefix}:stats'
def enqueue(self, url: str, metadata: dict = None, priority: int = 0):
url_hash = hashlib.md5(url.encode()).hexdigest()
# Skip if already seen
if self.r.sismember(self.seen_key, url_hash):
return False
task = json.dumps({
'url': url,
'metadata': metadata or {},
'enqueued_at': time.time(),
'attempts': 0,
'url_hash': url_hash,
})
# Use sorted set for priority queue (lower score = higher priority)
self.r.zadd(self.queue_key, {task: priority})
self.r.sadd(self.seen_key, url_hash)
self.r.hincrby(self.stats_key, 'enqueued', 1)
return True
def enqueue_many(self, urls: List[str], priority: int = 0):
pipe = self.r.pipeline()
added = 0
for url in urls:
url_hash = hashlib.md5(url.encode()).hexdigest()
if not self.r.sismember(self.seen_key, url_hash):
task = json.dumps({
'url': url,
'metadata': {},
'enqueued_at': time.time(),
'attempts': 0,
'url_hash': url_hash,
})
pipe.zadd(self.queue_key, {task: priority})
pipe.sadd(self.seen_key, url_hash)
added += 1
pipe.hincrby(self.stats_key, 'enqueued', added)
pipe.execute()
return added
def dequeue(self, worker_id: str) -> Optional[Dict]:
# Pop highest priority task (lowest score)
result = self.r.zpopmin(self.queue_key, count=1)
if not result:
return None
task_json, score = result[0]
task = json.loads(task_json)
task['attempts'] += 1
task['worker_id'] = worker_id
task['started_at'] = time.time()
# Track in processing set with timeout
self.r.hset(
self.processing_key,
task['url_hash'],
json.dumps(task)
)
return task
def complete(self, task: Dict, result: dict):
self.r.hdel(self.processing_key, task['url_hash'])
self.r.rpush(self.results_key, json.dumps({
'url': task['url'],
'data': result,
'completed_at': time.time(),
'worker_id': task.get('worker_id'),
}))
self.r.hincrby(self.stats_key, 'completed', 1)
def fail(self, task: Dict, error: str, max_retries: int = 3):
self.r.hdel(self.processing_key, task['url_hash'])
self.r.hincrby(self.stats_key, 'failed', 1)
if task['attempts'] < max_retries:
# Re-enqueue with lower priority
task_json = json.dumps({
'url': task['url'],
'metadata': task['metadata'],
'enqueued_at': time.time(),
'attempts': task['attempts'],
'url_hash': task['url_hash'],
'last_error': error,
})
self.r.zadd(self.queue_key, {task_json: task['attempts'] * 10})
self.r.hincrby(self.stats_key, 'retried', 1)
def get_results(self, batch_size: int = 100) -> List[dict]:
results = []
for _ in range(batch_size):
item = self.r.lpop(self.results_key)
if not item:
break
results.append(json.loads(item))
return results
def get_stats(self) -> dict:
stats = self.r.hgetall(self.stats_key)
return {
'queue_size': self.r.zcard(self.queue_key),
'processing': self.r.hlen(self.processing_key),
'results_pending': self.r.llen(self.results_key),
'total_seen': self.r.scard(self.seen_key),
'enqueued': int(stats.get('enqueued', 0)),
'completed': int(stats.get('completed', 0)),
'failed': int(stats.get('failed', 0)),
'retried': int(stats.get('retried', 0)),
}
def recover_stale_tasks(self, timeout_seconds: int = 300):
"""Re-enqueue tasks stuck in processing state."""
processing = self.r.hgetall(self.processing_key)
now = time.time()
recovered = 0
for url_hash, task_json in processing.items():
task = json.loads(task_json)
if now - task.get('started_at', 0) > timeout_seconds:
self.r.hdel(self.processing_key, url_hash)
self.fail(task, 'timeout_recovery')
recovered += 1
return recoveredRate Limiter
Prevent hitting target sites too aggressively:
# rate_limiter.py
import redis
import time
from urllib.parse import urlparse
class DomainRateLimiter:
def __init__(self, redis_client, default_rps=2):
self.r = redis_client
self.default_rps = default_rps
self.domain_limits = {}
def set_limit(self, domain: str, requests_per_second: float):
self.domain_limits[domain] = requests_per_second
async def wait_if_needed(self, url: str):
domain = urlparse(url).netloc
rps = self.domain_limits.get(domain, self.default_rps)
min_interval = 1.0 / rps
key = f'ratelimit:{domain}'
while True:
now = time.time()
last_request = self.r.get(key)
if last_request is None:
self.r.setex(key, 60, str(now))
return
elapsed = now - float(last_request)
if elapsed >= min_interval:
self.r.setex(key, 60, str(now))
return
wait_time = min_interval - elapsed
await asyncio.sleep(wait_time)Worker Process
Each worker runs independently, pulling tasks from Redis:
# worker.py
import asyncio
import httpx
import signal
import uuid
import time
from queue import ScrapingQueue
from rate_limiter import DomainRateLimiter
class ScrapingWorker:
def __init__(self, proxies: list, concurrency: int = 5):
self.worker_id = f'worker-{uuid.uuid4().hex[:8]}'
self.queue = ScrapingQueue()
self.proxies = proxies
self.proxy_index = 0
self.concurrency = concurrency
self.running = False
self.rate_limiter = DomainRateLimiter(
self.queue.r, default_rps=2
)
def get_next_proxy(self) -> str:
proxy = self.proxies[self.proxy_index % len(self.proxies)]
self.proxy_index += 1
return proxy
async def process_task(self, task: dict):
url = task['url']
proxy = self.get_next_proxy()
await self.rate_limiter.wait_if_needed(url)
try:
async with httpx.AsyncClient(
proxy=proxy,
timeout=30,
follow_redirects=True,
) as client:
response = await client.get(url, headers={
'User-Agent': 'Mozilla/5.0 (compatible; Bot/1.0)',
})
if response.status_code == 200:
data = self.parse(response.text, url)
self.queue.complete(task, data)
return True
elif response.status_code in (403, 429):
self.queue.fail(task, f'blocked:{response.status_code}')
else:
self.queue.fail(task, f'http:{response.status_code}')
except Exception as e:
self.queue.fail(task, str(e)[:200])
return False
def parse(self, html: str, url: str) -> dict:
# Implement your parsing logic
return {
'url': url,
'title': '',
'content': html[:500],
'scraped_at': time.time(),
}
async def run(self):
self.running = True
print(f'[{self.worker_id}] Starting with {len(self.proxies)} proxies')
semaphore = asyncio.Semaphore(self.concurrency)
async def worker_loop():
while self.running:
async with semaphore:
task = self.queue.dequeue(self.worker_id)
if task is None:
await asyncio.sleep(1)
continue
await self.process_task(task)
tasks = [asyncio.create_task(worker_loop())
for _ in range(self.concurrency)]
# Handle shutdown
loop = asyncio.get_event_loop()
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, self.stop)
await asyncio.gather(*tasks)
def stop(self):
print(f'[{self.worker_id}] Shutting down...')
self.running = False
if __name__ == '__main__':
proxies = [
'http://user:pass@proxy1.example.com:8080',
'http://user:pass@proxy2.example.com:8080',
]
worker = ScrapingWorker(proxies, concurrency=10)
asyncio.run(worker.run())Result Collector
A separate process collects results and writes them to a database:
# collector.py
import json
import time
from queue import ScrapingQueue
class ResultCollector:
def __init__(self, output_file='results.jsonl'):
self.queue = ScrapingQueue()
self.output_file = output_file
def run(self, batch_size=100, poll_interval=5):
print(f'Collecting results to {self.output_file}')
while True:
results = self.queue.get_results(batch_size)
if results:
with open(self.output_file, 'a') as f:
for result in results:
f.write(json.dumps(result) + '\n')
stats = self.queue.get_stats()
print(
f'Collected {len(results)} results | '
f'Queue: {stats["queue_size"]} | '
f'Processing: {stats["processing"]} | '
f'Completed: {stats["completed"]}'
)
time.sleep(poll_interval)
if __name__ == '__main__':
collector = ResultCollector()
collector.run()Running the System
Start all components:
# Terminal 1: Redis
docker run -d -p 6379:6379 redis:7-alpine
# Terminal 2: Seed the queue
python -c "
from queue import ScrapingQueue
q = ScrapingQueue()
urls = [f'https://example.com/page/{i}' for i in range(1000)]
added = q.enqueue_many(urls)
print(f'Added {added} URLs')
"
# Terminal 3: Worker 1
python worker.py
# Terminal 4: Worker 2 (different proxies)
PROXIES=http://proxy3:8080,http://proxy4:8080 python worker.py
# Terminal 5: Result collector
python collector.pyScaling Considerations
You can run workers on separate machines as long as they can reach Redis. For multi-machine deployments, consider using Redis Sentinel or Redis Cluster for high availability. Each worker should have its own set of proxies to avoid IP conflicts.
For very large jobs (millions of URLs), use Redis streams instead of sorted sets for the task queue. Streams support consumer groups and automatic acknowledgment, which makes them better suited for distributed processing.
Internal Links
- Building a Proxy Rotation Library in Python — proxy rotation for workers
- Web Scraping ETL Pipeline with Airflow — orchestrate scraping workflows
- Creating a Scraping API with FastAPI — expose scraping as an API
- Building a Rate-Limited Scraper with Asyncio — rate limiting techniques
- Distributed Web Scraping Guide — architecture patterns
FAQ
Why Redis instead of RabbitMQ or Kafka?
Redis is simpler to set up and provides all the primitives you need: sorted sets for priority queues, sets for deduplication, and key-value storage for rate limiting. RabbitMQ is better for complex routing, and Kafka is better for event streaming. For scraping task queues, Redis is the pragmatic choice.
How do I handle duplicate URLs across workers?
The deduplication set (seen_key) is checked atomically in Redis before enqueuing. Since all workers share the same Redis instance, no URL is scraped twice. Use MD5 hashes of URLs to save memory in the deduplication set.
What happens if a worker crashes mid-task?
Tasks move from the queue to the processing set when dequeued. The recover_stale_tasks() method checks for tasks stuck in processing longer than a timeout (default 5 minutes) and re-enqueues them. Run this recovery check periodically in a supervisor process.
How many workers can share one Redis instance?
A single Redis instance handles 50,000+ operations per second. With 10-20 workers at moderate concurrency, Redis is never the bottleneck. If you need more throughput, use Redis Cluster to shard across multiple nodes.
How do I prioritize certain URLs?
The queue uses Redis sorted sets where the score represents priority. Lower scores are dequeued first. Assign high-priority URLs a score of 0, normal URLs a score of 5, and retries a score proportional to their attempt count. This ensures important pages are scraped before retries.
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
Related Reading
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)