Building a Rate-Limited Scraper with Asyncio

Building a Rate-Limited Scraper with Asyncio

Aggressive scraping gets you blocked. Rate limiting is the difference between a scraper that runs for hours and one that gets banned in minutes. Python’s asyncio provides the perfect foundation for building a fast yet respectful scraper that throttles requests per domain, backs off on errors, and rotates proxies automatically.

Why Rate Limiting Matters

Every website has request thresholds. Exceed them and you trigger CAPTCHAs, 429 responses, or IP bans. A good rate limiter:

  • Controls requests per second per domain
  • Implements exponential backoff on errors
  • Respects robots.txt crawl-delay directives
  • Distributes load across proxies
  • Adapts dynamically to server responses

Token Bucket Rate Limiter

The token bucket algorithm is the standard for rate limiting. Tokens accumulate at a fixed rate. Each request consumes one token. If no tokens are available, the request waits.

import asyncio
import time
from collections import defaultdict

class TokenBucket:
    def __init__(self, rate: float, burst: int = 1):
        self.rate = rate          # tokens per second
        self.burst = burst        # max tokens (burst capacity)
        self.tokens = burst       # current tokens
        self.last_refill = time.monotonic()
        self._lock = asyncio.Lock()

    async def acquire(self):
        async with self._lock:
            now = time.monotonic()
            elapsed = now - self.last_refill
            self.tokens = min(
                self.burst,
                self.tokens + elapsed * self.rate
            )
            self.last_refill = now

            if self.tokens >= 1:
                self.tokens -= 1
                return 0  # no wait needed

            wait_time = (1 - self.tokens) / self.rate
            self.tokens = 0
            return wait_time

    async def wait(self):
        wait_time = await self.acquire()
        if wait_time > 0:
            await asyncio.sleep(wait_time)

Per-Domain Rate Limiter

Different domains have different tolerance levels. A per-domain limiter lets you configure rates individually.

from urllib.parse import urlparse

class DomainRateLimiter:
    def __init__(self, default_rps: float = 2.0, default_burst: int = 5):
        self.default_rps = default_rps
        self.default_burst = default_burst
        self.buckets: dict[str, TokenBucket] = {}
        self.domain_configs: dict[str, tuple] = {}

    def configure_domain(self, domain: str, rps: float, burst: int = 5):
        self.domain_configs[domain] = (rps, burst)

    def _get_bucket(self, domain: str) -> TokenBucket:
        if domain not in self.buckets:
            rps, burst = self.domain_configs.get(
                domain,
                (self.default_rps, self.default_burst)
            )
            self.buckets[domain] = TokenBucket(rps, burst)
        return self.buckets[domain]

    async def wait(self, url: str):
        domain = urlparse(url).netloc
        bucket = self._get_bucket(domain)
        await bucket.wait()

Backoff Strategy

When the server responds with 429 or 503, back off exponentially. This prevents hammering a struggling server and gives it time to recover.

import random

class BackoffManager:
    def __init__(
        self,
        initial_delay: float = 1.0,
        max_delay: float = 60.0,
        multiplier: float = 2.0,
        jitter: bool = True,
    ):
        self.initial_delay = initial_delay
        self.max_delay = max_delay
        self.multiplier = multiplier
        self.jitter = jitter
        self._failures: dict[str, int] = defaultdict(int)

    def record_failure(self, domain: str):
        self._failures[domain] += 1

    def record_success(self, domain: str):
        self._failures[domain] = 0

    async def wait_if_needed(self, domain: str):
        failures = self._failures.get(domain, 0)
        if failures == 0:
            return

        delay = min(
            self.initial_delay * (self.multiplier ** (failures - 1)),
            self.max_delay
        )

        if self.jitter:
            delay = delay * (0.5 + random.random())

        await asyncio.sleep(delay)

    def get_delay(self, domain: str) -> float:
        failures = self._failures.get(domain, 0)
        if failures == 0:
            return 0
        return min(
            self.initial_delay * (self.multiplier ** (failures - 1)),
            self.max_delay
        )

Complete Rate-Limited Scraper

import httpx
import asyncio
import time
import logging
from dataclasses import dataclass, field
from typing import List, Optional, Callable, Any

logger = logging.getLogger(__name__)

@dataclass
class ScrapeResult:
    url: str
    status_code: int = 0
    html: str = ""
    data: Any = None
    error: str = ""
    latency_ms: int = 0
    proxy_used: str = ""
    attempts: int = 0

class RateLimitedScraper:
    def __init__(
        self,
        proxies: List[str] = None,
        default_rps: float = 2.0,
        max_retries: int = 3,
        timeout: int = 30,
        max_concurrency: int = 20,
    ):
        self.proxies = proxies or []
        self.proxy_index = 0
        self.rate_limiter = DomainRateLimiter(default_rps=default_rps)
        self.backoff = BackoffManager()
        self.max_retries = max_retries
        self.timeout = timeout
        self.semaphore = asyncio.Semaphore(max_concurrency)

        # Statistics
        self.stats = {
            'total_requests': 0,
            'successful': 0,
            'failed': 0,
            'retries': 0,
            'total_wait_time': 0,
        }

    def configure_domain(self, domain: str, rps: float, burst: int = 5):
        self.rate_limiter.configure_domain(domain, rps, burst)

    def _get_proxy(self) -> Optional[str]:
        if not self.proxies:
            return None
        proxy = self.proxies[self.proxy_index % len(self.proxies)]
        self.proxy_index += 1
        return proxy

    async def fetch(
        self,
        url: str,
        method: str = "GET",
        headers: dict = None,
        parse_fn: Callable = None,
    ) -> ScrapeResult:
        result = ScrapeResult(url=url)
        domain = urlparse(url).netloc

        for attempt in range(1, self.max_retries + 1):
            result.attempts = attempt

            # Wait for rate limiter
            await self.rate_limiter.wait(url)

            # Wait for backoff if domain had recent failures
            await self.backoff.wait_if_needed(domain)

            proxy = self._get_proxy()
            result.proxy_used = proxy or "direct"

            try:
                start = time.monotonic()

                async with httpx.AsyncClient(
                    proxy=proxy,
                    timeout=self.timeout,
                    follow_redirects=True,
                ) as client:
                    default_headers = {
                        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                                      'AppleWebKit/537.36 (KHTML, like Gecko) '
                                      'Chrome/120.0.0.0 Safari/537.36',
                    }
                    if headers:
                        default_headers.update(headers)

                    response = await client.request(
                        method, url, headers=default_headers
                    )

                result.latency_ms = int(
                    (time.monotonic() - start) * 1000
                )
                result.status_code = response.status_code
                self.stats['total_requests'] += 1

                if response.status_code == 200:
                    result.html = response.text
                    if parse_fn:
                        result.data = parse_fn(response.text)
                    self.backoff.record_success(domain)
                    self.stats['successful'] += 1
                    return result

                elif response.status_code == 429:
                    # Rate limited — back off
                    retry_after = response.headers.get('Retry-After')
                    if retry_after:
                        await asyncio.sleep(int(retry_after))
                    self.backoff.record_failure(domain)
                    self.stats['retries'] += 1
                    logger.warning(
                        f"Rate limited on {domain}, backing off "
                        f"(attempt {attempt})"
                    )
                    continue

                elif response.status_code == 403:
                    # Blocked — try different proxy
                    self.backoff.record_failure(domain)
                    self.stats['retries'] += 1
                    continue

                elif response.status_code >= 500:
                    # Server error — retry
                    self.backoff.record_failure(domain)
                    self.stats['retries'] += 1
                    continue

                else:
                    result.error = f"HTTP {response.status_code}"
                    self.stats['failed'] += 1
                    return result

            except httpx.TimeoutException:
                result.error = "timeout"
                self.stats['retries'] += 1
                continue
            except Exception as e:
                result.error = str(e)[:200]
                self.stats['retries'] += 1
                continue

        self.stats['failed'] += 1
        if not result.error:
            result.error = f"Max retries ({self.max_retries}) exceeded"
        return result

    async def fetch_many(
        self,
        urls: List[str],
        parse_fn: Callable = None,
        progress_fn: Callable = None,
    ) -> List[ScrapeResult]:
        results = []
        completed = 0

        async def fetch_one(url):
            nonlocal completed
            async with self.semaphore:
                result = await self.fetch(url, parse_fn=parse_fn)
                completed += 1
                if progress_fn:
                    progress_fn(completed, len(urls), result)
                return result

        tasks = [fetch_one(url) for url in urls]
        results = await asyncio.gather(*tasks)
        return list(results)

    def print_stats(self):
        s = self.stats
        total = s['total_requests']
        if total == 0:
            print("No requests made")
            return

        print(f"\n--- Scraping Statistics ---")
        print(f"Total requests: {total}")
        print(f"Successful: {s['successful']} ({s['successful']/total*100:.1f}%)")
        print(f"Failed: {s['failed']}")
        print(f"Retries: {s['retries']}")

Usage Example

async def main():
    scraper = RateLimitedScraper(
        proxies=[
            "http://user:pass@proxy1.example.com:8080",
            "http://user:pass@proxy2.example.com:8080",
        ],
        default_rps=2.0,
        max_retries=3,
        max_concurrency=10,
    )

    # Configure specific domains
    scraper.configure_domain("api.example.com", rps=5.0, burst=10)
    scraper.configure_domain("slow-site.com", rps=0.5, burst=1)

    # Scrape URLs
    urls = [f"https://example.com/page/{i}" for i in range(100)]

    def progress(done, total, result):
        status = "OK" if result.status_code == 200 else "FAIL"
        print(f"[{done}/{total}] {status} {result.url} ({result.latency_ms}ms)")

    results = await scraper.fetch_many(
        urls,
        progress_fn=progress,
    )

    scraper.print_stats()

    # Process results
    successful = [r for r in results if r.status_code == 200]
    print(f"\nGot {len(successful)} pages to parse")

asyncio.run(main())

Robots.txt Integration

Respect crawl-delay directives automatically:

import httpx
from urllib.robotparser import RobotFileParser

class RobotsRespector:
    def __init__(self):
        self._parsers: dict[str, RobotFileParser] = {}

    async def get_crawl_delay(self, domain: str) -> Optional[float]:
        if domain not in self._parsers:
            parser = RobotFileParser()
            try:
                async with httpx.AsyncClient(timeout=10) as client:
                    resp = await client.get(f"https://{domain}/robots.txt")
                    if resp.status_code == 200:
                        parser.parse(resp.text.splitlines())
                        self._parsers[domain] = parser
            except Exception:
                return None

        parser = self._parsers.get(domain)
        if parser:
            delay = parser.crawl_delay("*")
            return delay
        return None

Internal Links

FAQ

What is the safest default rate limit?

Start with 1-2 requests per second per domain. Most websites tolerate this without triggering anti-bot systems. Increase gradually while monitoring for 429 responses. Some sites allow 10+ RPS while others block at 0.5 RPS.

How does the token bucket differ from a simple sleep?

A simple sleep(1) between requests wastes time when you scrape multiple domains. Token bucket allows bursts — if you have not requested from a domain recently, accumulated tokens allow several rapid requests. This maximizes throughput while respecting per-domain limits.

Should I rate limit per proxy or per domain?

Per domain is more important because the target site tracks incoming request rates regardless of which proxy you use. However, adding per-proxy limits prevents individual proxies from being overloaded, which helps with proxy longevity.

How do I handle sites with very strict rate limits?

For sites that block at 0.5 RPS, combine rate limiting with longer delays between sessions. Scrape in bursts of 10-20 requests, then wait 5-10 minutes. This mimics human browsing patterns better than constant low-rate traffic.

Can I use this with residential proxies?

Yes. Residential proxies are less likely to be blocked, so you can often increase the rate limit to 5-10 RPS per domain. The rate limiter works identically regardless of proxy type — it controls the request rate from your application, not from the proxy infrastructure.


Related Reading

Scroll to Top