Creating a Scraping API with FastAPI and Proxies

Creating a Scraping API with FastAPI and Proxies

Instead of embedding scraping logic in every application, expose it as an API. A scraping API accepts URLs, fetches them through proxies, extracts data, and returns structured JSON. Your frontend, mobile app, or automation pipeline calls one endpoint instead of managing proxies, retries, and parsing directly.

FastAPI is the ideal framework: async by default, automatic API docs, built-in validation, and high performance.

What We Will Build

  • REST API that scrapes any URL and returns the HTML or extracted data
  • Automatic proxy rotation with failover
  • Redis caching to avoid duplicate fetches
  • API key authentication
  • Rate limiting per client
  • Structured extraction endpoints (title, links, text, metadata)
  • Background job queue for batch scraping

Setup

pip install fastapi uvicorn httpx selectolax redis pydantic

API Design

POST /api/scrape          — Scrape a single URL
POST /api/scrape/batch    — Scrape multiple URLs
GET  /api/scrape/extract  — Extract structured data from URL
GET  /api/cache/{url_hash} — Get cached result
GET  /api/stats           — API usage statistics
GET  /api/health          — Health check

Core Implementation

# app.py
from fastapi import FastAPI, HTTPException, Depends, Header, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, HttpUrl
from typing import List, Optional, Dict, Any
import httpx
import asyncio
import hashlib
import json
import time
import redis.asyncio as redis
from contextlib import asynccontextmanager

# Proxy pool
PROXIES = [
    "http://user:pass@proxy1.example.com:8080",
    "http://user:pass@proxy2.example.com:8080",
    "http://user:pass@proxy3.example.com:8080",
]

proxy_index = 0
redis_client = None

@asynccontextmanager
async def lifespan(app: FastAPI):
    global redis_client
    redis_client = redis.from_url("redis://localhost:6379", decode_responses=True)
    yield
    await redis_client.close()

app = FastAPI(
    title="Scraping API",
    version="1.0.0",
    lifespan=lifespan,
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["*"],
    allow_headers=["*"],
)

# --- Models ---

class ScrapeRequest(BaseModel):
    url: HttpUrl
    headers: Optional[Dict[str, str]] = None
    timeout: int = 30
    use_cache: bool = True
    extract: Optional[List[str]] = None  # ["title", "links", "text", "meta"]
    render_js: bool = False

class BatchScrapeRequest(BaseModel):
    urls: List[HttpUrl]
    concurrency: int = 5
    headers: Optional[Dict[str, str]] = None

class ScrapeResponse(BaseModel):
    url: str
    status_code: int
    html: Optional[str] = None
    extracted: Optional[Dict[str, Any]] = None
    cached: bool = False
    latency_ms: int = 0
    proxy_used: str = ""

# --- Auth ---

API_KEYS = {"demo-key-123": {"name": "demo", "rate_limit": 100}}

async def verify_api_key(x_api_key: str = Header()):
    if x_api_key not in API_KEYS:
        raise HTTPException(401, "Invalid API key")
    return API_KEYS[x_api_key]

# --- Rate Limiting ---

async def check_rate_limit(
    x_api_key: str = Header(),
):
    key = f"ratelimit:{x_api_key}"
    current = await redis_client.incr(key)
    if current == 1:
        await redis_client.expire(key, 3600)  # 1 hour window

    limit = API_KEYS.get(x_api_key, {}).get("rate_limit", 100)
    if current > limit:
        raise HTTPException(429, f"Rate limit exceeded ({limit}/hour)")

# --- Proxy Rotation ---

def get_proxy() -> str:
    global proxy_index
    proxy = PROXIES[proxy_index % len(PROXIES)]
    proxy_index += 1
    return proxy

# --- Caching ---

def url_hash(url: str) -> str:
    return hashlib.md5(url.encode()).hexdigest()

async def get_cached(url: str) -> Optional[dict]:
    cached = await redis_client.get(f"cache:{url_hash(url)}")
    if cached:
        return json.loads(cached)
    return None

async def set_cached(url: str, data: dict, ttl: int = 3600):
    await redis_client.setex(
        f"cache:{url_hash(url)}", ttl, json.dumps(data)
    )

# --- Extraction ---

def extract_data(html: str, fields: List[str]) -> Dict[str, Any]:
    from selectolax.parser import HTMLParser
    tree = HTMLParser(html)
    result = {}

    if "title" in fields:
        title_node = tree.css_first("title")
        result["title"] = title_node.text() if title_node else ""

    if "meta" in fields:
        meta = {}
        for tag in tree.css("meta"):
            name = tag.attributes.get("name", tag.attributes.get("property", ""))
            content = tag.attributes.get("content", "")
            if name and content:
                meta[name] = content
        result["meta"] = meta

    if "links" in fields:
        result["links"] = [
            a.attributes.get("href", "")
            for a in tree.css("a[href]")
        ]

    if "text" in fields:
        body = tree.css_first("body")
        if body:
            for tag in body.css("script, style, nav, footer, header"):
                tag.decompose()
            result["text"] = body.text(separator="\n", strip=True)

    if "images" in fields:
        result["images"] = [
            img.attributes.get("src", "")
            for img in tree.css("img[src]")
        ]

    if "headings" in fields:
        result["headings"] = []
        for h in tree.css("h1, h2, h3, h4, h5, h6"):
            result["headings"].append({
                "level": int(h.tag[1]),
                "text": h.text(strip=True),
            })

    return result

# --- Endpoints ---

@app.post("/api/scrape", response_model=ScrapeResponse)
async def scrape(
    request: ScrapeRequest,
    auth: dict = Depends(verify_api_key),
    _rate: None = Depends(check_rate_limit),
):
    url = str(request.url)

    # Check cache
    if request.use_cache:
        cached = await get_cached(url)
        if cached:
            return ScrapeResponse(
                url=url,
                status_code=cached["status_code"],
                html=cached.get("html"),
                extracted=cached.get("extracted"),
                cached=True,
            )

    # Fetch
    proxy = get_proxy()
    start = time.monotonic()

    try:
        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                         "AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
        }
        if request.headers:
            headers.update(request.headers)

        async with httpx.AsyncClient(
            proxy=proxy,
            timeout=request.timeout,
            follow_redirects=True,
        ) as client:
            response = await client.get(url, headers=headers)

        latency = int((time.monotonic() - start) * 1000)

        result = ScrapeResponse(
            url=url,
            status_code=response.status_code,
            latency_ms=latency,
            proxy_used=proxy.split("@")[-1] if "@" in proxy else proxy,
        )

        if response.status_code == 200:
            result.html = response.text

            if request.extract:
                result.extracted = extract_data(response.text, request.extract)

            # Cache the result
            await set_cached(url, {
                "status_code": response.status_code,
                "html": response.text[:50000],  # Limit cache size
                "extracted": result.extracted,
            })

        return result

    except httpx.TimeoutException:
        raise HTTPException(504, "Target URL timed out")
    except Exception as e:
        raise HTTPException(502, f"Scraping failed: {str(e)[:200]}")


@app.post("/api/scrape/batch")
async def scrape_batch(
    request: BatchScrapeRequest,
    auth: dict = Depends(verify_api_key),
    _rate: None = Depends(check_rate_limit),
):
    semaphore = asyncio.Semaphore(request.concurrency)
    results = []

    async def fetch_one(url):
        async with semaphore:
            single = ScrapeRequest(
                url=url,
                headers=request.headers,
            )
            try:
                result = await scrape(single, auth, None)
                return result
            except HTTPException as e:
                return ScrapeResponse(
                    url=str(url),
                    status_code=e.status_code,
                )

    tasks = [fetch_one(url) for url in request.urls]
    results = await asyncio.gather(*tasks)

    return {
        "total": len(results),
        "successful": sum(1 for r in results if r.status_code == 200),
        "results": results,
    }


@app.get("/api/scrape/extract")
async def extract_endpoint(
    url: HttpUrl = Query(...),
    fields: str = Query("title,meta,links"),
    auth: dict = Depends(verify_api_key),
):
    field_list = [f.strip() for f in fields.split(",")]
    request = ScrapeRequest(url=url, extract=field_list)
    return await scrape(request, auth, None)


@app.get("/api/stats")
async def get_stats(auth: dict = Depends(verify_api_key)):
    key = f"ratelimit:{auth['name']}"
    usage = await redis_client.get(key) or 0
    return {
        "client": auth["name"],
        "requests_this_hour": int(usage),
        "rate_limit": auth["rate_limit"],
    }


@app.get("/api/health")
async def health():
    return {"status": "ok", "proxies": len(PROXIES)}

Running the API

uvicorn app:app --host 0.0.0.0 --port 8000

# With auto-reload for development
uvicorn app:app --reload

Access the auto-generated docs at http://localhost:8000/docs.

Client Usage

import httpx

API_URL = "http://localhost:8000"
API_KEY = "demo-key-123"

async def scrape_url(url: str):
    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"{API_URL}/api/scrape",
            json={
                "url": url,
                "extract": ["title", "meta", "headings"],
            },
            headers={"X-API-Key": API_KEY},
        )
        return response.json()

# Batch scraping
async def scrape_many(urls: list):
    async with httpx.AsyncClient() as client:
        response = await client.post(
            f"{API_URL}/api/scrape/batch",
            json={"urls": urls, "concurrency": 5},
            headers={"X-API-Key": API_KEY},
        )
        return response.json()

Docker Deployment

# docker-compose.yml
version: '3.8'
services:
  api:
    build: .
    ports:
      - "8000:8000"
    depends_on:
      - redis
    environment:
      - REDIS_URL=redis://redis:6379

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

Internal Links

FAQ

Why FastAPI instead of Flask or Django?

FastAPI is async-native, which is essential for a scraping API that makes outbound HTTP requests. Flask requires additional libraries (gevent, asyncio bridges) for async support. FastAPI also generates OpenAPI documentation automatically, making the API self-documenting.

How do I handle JavaScript-rendered pages?

Add a render_js parameter that routes requests through a headless browser (Playwright or Selenium). Run headless browsers as a separate service and proxy requests through them when JS rendering is needed. This keeps the main API lightweight for static pages.

What caching strategy should I use?

Cache by URL hash with a 1-hour TTL for most pages. For rapidly changing pages (prices, stock), reduce TTL to 5-10 minutes. For static content (product descriptions, articles), increase TTL to 24 hours. Let clients control caching with the use_cache parameter.

How do I secure the API for production?

Beyond API keys, add IP whitelisting, request signing (HMAC), and HTTPS. Rate limit aggressively — 100-1,000 requests per hour per key depending on the plan. Log all requests for audit trails. Use environment variables for API keys instead of hardcoding them.

Can the API handle thousands of concurrent requests?

FastAPI with uvicorn handles 1,000+ concurrent connections. The bottleneck is proxy capacity and target site rate limits, not the API framework. For higher throughput, run multiple API instances behind a load balancer and increase the proxy pool size.


Related Reading

Scroll to Top