Building a Rate-Limited Scraper with Asyncio
Aggressive scraping gets you blocked. Rate limiting is the difference between a scraper that runs for hours and one that gets banned in minutes. Python’s asyncio provides the perfect foundation for building a fast yet respectful scraper that throttles requests per domain, backs off on errors, and rotates proxies automatically.
Why Rate Limiting Matters
Every website has request thresholds. Exceed them and you trigger CAPTCHAs, 429 responses, or IP bans. A good rate limiter:
- Controls requests per second per domain
- Implements exponential backoff on errors
- Respects robots.txt crawl-delay directives
- Distributes load across proxies
- Adapts dynamically to server responses
Token Bucket Rate Limiter
The token bucket algorithm is the standard for rate limiting. Tokens accumulate at a fixed rate. Each request consumes one token. If no tokens are available, the request waits.
import asyncio
import time
from collections import defaultdict
class TokenBucket:
def __init__(self, rate: float, burst: int = 1):
self.rate = rate # tokens per second
self.burst = burst # max tokens (burst capacity)
self.tokens = burst # current tokens
self.last_refill = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.monotonic()
elapsed = now - self.last_refill
self.tokens = min(
self.burst,
self.tokens + elapsed * self.rate
)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return 0 # no wait needed
wait_time = (1 - self.tokens) / self.rate
self.tokens = 0
return wait_time
async def wait(self):
wait_time = await self.acquire()
if wait_time > 0:
await asyncio.sleep(wait_time)Per-Domain Rate Limiter
Different domains have different tolerance levels. A per-domain limiter lets you configure rates individually.
from urllib.parse import urlparse
class DomainRateLimiter:
def __init__(self, default_rps: float = 2.0, default_burst: int = 5):
self.default_rps = default_rps
self.default_burst = default_burst
self.buckets: dict[str, TokenBucket] = {}
self.domain_configs: dict[str, tuple] = {}
def configure_domain(self, domain: str, rps: float, burst: int = 5):
self.domain_configs[domain] = (rps, burst)
def _get_bucket(self, domain: str) -> TokenBucket:
if domain not in self.buckets:
rps, burst = self.domain_configs.get(
domain,
(self.default_rps, self.default_burst)
)
self.buckets[domain] = TokenBucket(rps, burst)
return self.buckets[domain]
async def wait(self, url: str):
domain = urlparse(url).netloc
bucket = self._get_bucket(domain)
await bucket.wait()Backoff Strategy
When the server responds with 429 or 503, back off exponentially. This prevents hammering a struggling server and gives it time to recover.
import random
class BackoffManager:
def __init__(
self,
initial_delay: float = 1.0,
max_delay: float = 60.0,
multiplier: float = 2.0,
jitter: bool = True,
):
self.initial_delay = initial_delay
self.max_delay = max_delay
self.multiplier = multiplier
self.jitter = jitter
self._failures: dict[str, int] = defaultdict(int)
def record_failure(self, domain: str):
self._failures[domain] += 1
def record_success(self, domain: str):
self._failures[domain] = 0
async def wait_if_needed(self, domain: str):
failures = self._failures.get(domain, 0)
if failures == 0:
return
delay = min(
self.initial_delay * (self.multiplier ** (failures - 1)),
self.max_delay
)
if self.jitter:
delay = delay * (0.5 + random.random())
await asyncio.sleep(delay)
def get_delay(self, domain: str) -> float:
failures = self._failures.get(domain, 0)
if failures == 0:
return 0
return min(
self.initial_delay * (self.multiplier ** (failures - 1)),
self.max_delay
)Complete Rate-Limited Scraper
import httpx
import asyncio
import time
import logging
from dataclasses import dataclass, field
from typing import List, Optional, Callable, Any
logger = logging.getLogger(__name__)
@dataclass
class ScrapeResult:
url: str
status_code: int = 0
html: str = ""
data: Any = None
error: str = ""
latency_ms: int = 0
proxy_used: str = ""
attempts: int = 0
class RateLimitedScraper:
def __init__(
self,
proxies: List[str] = None,
default_rps: float = 2.0,
max_retries: int = 3,
timeout: int = 30,
max_concurrency: int = 20,
):
self.proxies = proxies or []
self.proxy_index = 0
self.rate_limiter = DomainRateLimiter(default_rps=default_rps)
self.backoff = BackoffManager()
self.max_retries = max_retries
self.timeout = timeout
self.semaphore = asyncio.Semaphore(max_concurrency)
# Statistics
self.stats = {
'total_requests': 0,
'successful': 0,
'failed': 0,
'retries': 0,
'total_wait_time': 0,
}
def configure_domain(self, domain: str, rps: float, burst: int = 5):
self.rate_limiter.configure_domain(domain, rps, burst)
def _get_proxy(self) -> Optional[str]:
if not self.proxies:
return None
proxy = self.proxies[self.proxy_index % len(self.proxies)]
self.proxy_index += 1
return proxy
async def fetch(
self,
url: str,
method: str = "GET",
headers: dict = None,
parse_fn: Callable = None,
) -> ScrapeResult:
result = ScrapeResult(url=url)
domain = urlparse(url).netloc
for attempt in range(1, self.max_retries + 1):
result.attempts = attempt
# Wait for rate limiter
await self.rate_limiter.wait(url)
# Wait for backoff if domain had recent failures
await self.backoff.wait_if_needed(domain)
proxy = self._get_proxy()
result.proxy_used = proxy or "direct"
try:
start = time.monotonic()
async with httpx.AsyncClient(
proxy=proxy,
timeout=self.timeout,
follow_redirects=True,
) as client:
default_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/120.0.0.0 Safari/537.36',
}
if headers:
default_headers.update(headers)
response = await client.request(
method, url, headers=default_headers
)
result.latency_ms = int(
(time.monotonic() - start) * 1000
)
result.status_code = response.status_code
self.stats['total_requests'] += 1
if response.status_code == 200:
result.html = response.text
if parse_fn:
result.data = parse_fn(response.text)
self.backoff.record_success(domain)
self.stats['successful'] += 1
return result
elif response.status_code == 429:
# Rate limited — back off
retry_after = response.headers.get('Retry-After')
if retry_after:
await asyncio.sleep(int(retry_after))
self.backoff.record_failure(domain)
self.stats['retries'] += 1
logger.warning(
f"Rate limited on {domain}, backing off "
f"(attempt {attempt})"
)
continue
elif response.status_code == 403:
# Blocked — try different proxy
self.backoff.record_failure(domain)
self.stats['retries'] += 1
continue
elif response.status_code >= 500:
# Server error — retry
self.backoff.record_failure(domain)
self.stats['retries'] += 1
continue
else:
result.error = f"HTTP {response.status_code}"
self.stats['failed'] += 1
return result
except httpx.TimeoutException:
result.error = "timeout"
self.stats['retries'] += 1
continue
except Exception as e:
result.error = str(e)[:200]
self.stats['retries'] += 1
continue
self.stats['failed'] += 1
if not result.error:
result.error = f"Max retries ({self.max_retries}) exceeded"
return result
async def fetch_many(
self,
urls: List[str],
parse_fn: Callable = None,
progress_fn: Callable = None,
) -> List[ScrapeResult]:
results = []
completed = 0
async def fetch_one(url):
nonlocal completed
async with self.semaphore:
result = await self.fetch(url, parse_fn=parse_fn)
completed += 1
if progress_fn:
progress_fn(completed, len(urls), result)
return result
tasks = [fetch_one(url) for url in urls]
results = await asyncio.gather(*tasks)
return list(results)
def print_stats(self):
s = self.stats
total = s['total_requests']
if total == 0:
print("No requests made")
return
print(f"\n--- Scraping Statistics ---")
print(f"Total requests: {total}")
print(f"Successful: {s['successful']} ({s['successful']/total*100:.1f}%)")
print(f"Failed: {s['failed']}")
print(f"Retries: {s['retries']}")Usage Example
async def main():
scraper = RateLimitedScraper(
proxies=[
"http://user:pass@proxy1.example.com:8080",
"http://user:pass@proxy2.example.com:8080",
],
default_rps=2.0,
max_retries=3,
max_concurrency=10,
)
# Configure specific domains
scraper.configure_domain("api.example.com", rps=5.0, burst=10)
scraper.configure_domain("slow-site.com", rps=0.5, burst=1)
# Scrape URLs
urls = [f"https://example.com/page/{i}" for i in range(100)]
def progress(done, total, result):
status = "OK" if result.status_code == 200 else "FAIL"
print(f"[{done}/{total}] {status} {result.url} ({result.latency_ms}ms)")
results = await scraper.fetch_many(
urls,
progress_fn=progress,
)
scraper.print_stats()
# Process results
successful = [r for r in results if r.status_code == 200]
print(f"\nGot {len(successful)} pages to parse")
asyncio.run(main())Robots.txt Integration
Respect crawl-delay directives automatically:
import httpx
from urllib.robotparser import RobotFileParser
class RobotsRespector:
def __init__(self):
self._parsers: dict[str, RobotFileParser] = {}
async def get_crawl_delay(self, domain: str) -> Optional[float]:
if domain not in self._parsers:
parser = RobotFileParser()
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(f"https://{domain}/robots.txt")
if resp.status_code == 200:
parser.parse(resp.text.splitlines())
self._parsers[domain] = parser
except Exception:
return None
parser = self._parsers.get(domain)
if parser:
delay = parser.crawl_delay("*")
return delay
return NoneInternal Links
- Building a Proxy Rotation Library in Python — proxy rotation for scrapers
- Building a Distributed Scraping System with Redis — scale with Redis
- Creating a Scraping API with FastAPI — expose scraping as an API
- Web Scraping with Python Guide — fundamentals of web scraping
- How to Avoid Getting Blocked While Scraping — anti-detection strategies
FAQ
What is the safest default rate limit?
Start with 1-2 requests per second per domain. Most websites tolerate this without triggering anti-bot systems. Increase gradually while monitoring for 429 responses. Some sites allow 10+ RPS while others block at 0.5 RPS.
How does the token bucket differ from a simple sleep?
A simple sleep(1) between requests wastes time when you scrape multiple domains. Token bucket allows bursts — if you have not requested from a domain recently, accumulated tokens allow several rapid requests. This maximizes throughput while respecting per-domain limits.
Should I rate limit per proxy or per domain?
Per domain is more important because the target site tracks incoming request rates regardless of which proxy you use. However, adding per-proxy limits prevents individual proxies from being overloaded, which helps with proxy longevity.
How do I handle sites with very strict rate limits?
For sites that block at 0.5 RPS, combine rate limiting with longer delays between sessions. Scrape in bursts of 10-20 requests, then wait 5-10 minutes. This mimics human browsing patterns better than constant low-rate traffic.
Can I use this with residential proxies?
Yes. Residential proxies are less likely to be blocked, so you can often increase the rate limit to 5-10 RPS per domain. The rate limiter works identically regardless of proxy type — it controls the request rate from your application, not from the proxy infrastructure.
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
Related Reading
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)