Creating a Scraping API with FastAPI and Proxies
Instead of embedding scraping logic in every application, expose it as an API. A scraping API accepts URLs, fetches them through proxies, extracts data, and returns structured JSON. Your frontend, mobile app, or automation pipeline calls one endpoint instead of managing proxies, retries, and parsing directly.
FastAPI is the ideal framework: async by default, automatic API docs, built-in validation, and high performance.
What We Will Build
- REST API that scrapes any URL and returns the HTML or extracted data
- Automatic proxy rotation with failover
- Redis caching to avoid duplicate fetches
- API key authentication
- Rate limiting per client
- Structured extraction endpoints (title, links, text, metadata)
- Background job queue for batch scraping
Setup
pip install fastapi uvicorn httpx selectolax redis pydanticAPI Design
POST /api/scrape — Scrape a single URL
POST /api/scrape/batch — Scrape multiple URLs
GET /api/scrape/extract — Extract structured data from URL
GET /api/cache/{url_hash} — Get cached result
GET /api/stats — API usage statistics
GET /api/health — Health checkCore Implementation
# app.py
from fastapi import FastAPI, HTTPException, Depends, Header, Query
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, HttpUrl
from typing import List, Optional, Dict, Any
import httpx
import asyncio
import hashlib
import json
import time
import redis.asyncio as redis
from contextlib import asynccontextmanager
# Proxy pool
PROXIES = [
"http://user:pass@proxy1.example.com:8080",
"http://user:pass@proxy2.example.com:8080",
"http://user:pass@proxy3.example.com:8080",
]
proxy_index = 0
redis_client = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global redis_client
redis_client = redis.from_url("redis://localhost:6379", decode_responses=True)
yield
await redis_client.close()
app = FastAPI(
title="Scraping API",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# --- Models ---
class ScrapeRequest(BaseModel):
url: HttpUrl
headers: Optional[Dict[str, str]] = None
timeout: int = 30
use_cache: bool = True
extract: Optional[List[str]] = None # ["title", "links", "text", "meta"]
render_js: bool = False
class BatchScrapeRequest(BaseModel):
urls: List[HttpUrl]
concurrency: int = 5
headers: Optional[Dict[str, str]] = None
class ScrapeResponse(BaseModel):
url: str
status_code: int
html: Optional[str] = None
extracted: Optional[Dict[str, Any]] = None
cached: bool = False
latency_ms: int = 0
proxy_used: str = ""
# --- Auth ---
API_KEYS = {"demo-key-123": {"name": "demo", "rate_limit": 100}}
async def verify_api_key(x_api_key: str = Header()):
if x_api_key not in API_KEYS:
raise HTTPException(401, "Invalid API key")
return API_KEYS[x_api_key]
# --- Rate Limiting ---
async def check_rate_limit(
x_api_key: str = Header(),
):
key = f"ratelimit:{x_api_key}"
current = await redis_client.incr(key)
if current == 1:
await redis_client.expire(key, 3600) # 1 hour window
limit = API_KEYS.get(x_api_key, {}).get("rate_limit", 100)
if current > limit:
raise HTTPException(429, f"Rate limit exceeded ({limit}/hour)")
# --- Proxy Rotation ---
def get_proxy() -> str:
global proxy_index
proxy = PROXIES[proxy_index % len(PROXIES)]
proxy_index += 1
return proxy
# --- Caching ---
def url_hash(url: str) -> str:
return hashlib.md5(url.encode()).hexdigest()
async def get_cached(url: str) -> Optional[dict]:
cached = await redis_client.get(f"cache:{url_hash(url)}")
if cached:
return json.loads(cached)
return None
async def set_cached(url: str, data: dict, ttl: int = 3600):
await redis_client.setex(
f"cache:{url_hash(url)}", ttl, json.dumps(data)
)
# --- Extraction ---
def extract_data(html: str, fields: List[str]) -> Dict[str, Any]:
from selectolax.parser import HTMLParser
tree = HTMLParser(html)
result = {}
if "title" in fields:
title_node = tree.css_first("title")
result["title"] = title_node.text() if title_node else ""
if "meta" in fields:
meta = {}
for tag in tree.css("meta"):
name = tag.attributes.get("name", tag.attributes.get("property", ""))
content = tag.attributes.get("content", "")
if name and content:
meta[name] = content
result["meta"] = meta
if "links" in fields:
result["links"] = [
a.attributes.get("href", "")
for a in tree.css("a[href]")
]
if "text" in fields:
body = tree.css_first("body")
if body:
for tag in body.css("script, style, nav, footer, header"):
tag.decompose()
result["text"] = body.text(separator="\n", strip=True)
if "images" in fields:
result["images"] = [
img.attributes.get("src", "")
for img in tree.css("img[src]")
]
if "headings" in fields:
result["headings"] = []
for h in tree.css("h1, h2, h3, h4, h5, h6"):
result["headings"].append({
"level": int(h.tag[1]),
"text": h.text(strip=True),
})
return result
# --- Endpoints ---
@app.post("/api/scrape", response_model=ScrapeResponse)
async def scrape(
request: ScrapeRequest,
auth: dict = Depends(verify_api_key),
_rate: None = Depends(check_rate_limit),
):
url = str(request.url)
# Check cache
if request.use_cache:
cached = await get_cached(url)
if cached:
return ScrapeResponse(
url=url,
status_code=cached["status_code"],
html=cached.get("html"),
extracted=cached.get("extracted"),
cached=True,
)
# Fetch
proxy = get_proxy()
start = time.monotonic()
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
}
if request.headers:
headers.update(request.headers)
async with httpx.AsyncClient(
proxy=proxy,
timeout=request.timeout,
follow_redirects=True,
) as client:
response = await client.get(url, headers=headers)
latency = int((time.monotonic() - start) * 1000)
result = ScrapeResponse(
url=url,
status_code=response.status_code,
latency_ms=latency,
proxy_used=proxy.split("@")[-1] if "@" in proxy else proxy,
)
if response.status_code == 200:
result.html = response.text
if request.extract:
result.extracted = extract_data(response.text, request.extract)
# Cache the result
await set_cached(url, {
"status_code": response.status_code,
"html": response.text[:50000], # Limit cache size
"extracted": result.extracted,
})
return result
except httpx.TimeoutException:
raise HTTPException(504, "Target URL timed out")
except Exception as e:
raise HTTPException(502, f"Scraping failed: {str(e)[:200]}")
@app.post("/api/scrape/batch")
async def scrape_batch(
request: BatchScrapeRequest,
auth: dict = Depends(verify_api_key),
_rate: None = Depends(check_rate_limit),
):
semaphore = asyncio.Semaphore(request.concurrency)
results = []
async def fetch_one(url):
async with semaphore:
single = ScrapeRequest(
url=url,
headers=request.headers,
)
try:
result = await scrape(single, auth, None)
return result
except HTTPException as e:
return ScrapeResponse(
url=str(url),
status_code=e.status_code,
)
tasks = [fetch_one(url) for url in request.urls]
results = await asyncio.gather(*tasks)
return {
"total": len(results),
"successful": sum(1 for r in results if r.status_code == 200),
"results": results,
}
@app.get("/api/scrape/extract")
async def extract_endpoint(
url: HttpUrl = Query(...),
fields: str = Query("title,meta,links"),
auth: dict = Depends(verify_api_key),
):
field_list = [f.strip() for f in fields.split(",")]
request = ScrapeRequest(url=url, extract=field_list)
return await scrape(request, auth, None)
@app.get("/api/stats")
async def get_stats(auth: dict = Depends(verify_api_key)):
key = f"ratelimit:{auth['name']}"
usage = await redis_client.get(key) or 0
return {
"client": auth["name"],
"requests_this_hour": int(usage),
"rate_limit": auth["rate_limit"],
}
@app.get("/api/health")
async def health():
return {"status": "ok", "proxies": len(PROXIES)}Running the API
uvicorn app:app --host 0.0.0.0 --port 8000
# With auto-reload for development
uvicorn app:app --reloadAccess the auto-generated docs at http://localhost:8000/docs.
Client Usage
import httpx
API_URL = "http://localhost:8000"
API_KEY = "demo-key-123"
async def scrape_url(url: str):
async with httpx.AsyncClient() as client:
response = await client.post(
f"{API_URL}/api/scrape",
json={
"url": url,
"extract": ["title", "meta", "headings"],
},
headers={"X-API-Key": API_KEY},
)
return response.json()
# Batch scraping
async def scrape_many(urls: list):
async with httpx.AsyncClient() as client:
response = await client.post(
f"{API_URL}/api/scrape/batch",
json={"urls": urls, "concurrency": 5},
headers={"X-API-Key": API_KEY},
)
return response.json()Docker Deployment
# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
depends_on:
- redis
environment:
- REDIS_URL=redis://redis:6379
redis:
image: redis:7-alpine
ports:
- "6379:6379"Internal Links
- Building a Proxy Rotation Library in Python — rotation logic for the API
- Building a Rate-Limited Scraper with Asyncio — rate limiting strategies
- Building a Distributed Scraping System with Redis — scale the API with Redis
- Web Scraping ETL Pipeline with Airflow — pipe API results into ETL
- Best Web Scraping APIs 2026 — compare with commercial APIs
FAQ
Why FastAPI instead of Flask or Django?
FastAPI is async-native, which is essential for a scraping API that makes outbound HTTP requests. Flask requires additional libraries (gevent, asyncio bridges) for async support. FastAPI also generates OpenAPI documentation automatically, making the API self-documenting.
How do I handle JavaScript-rendered pages?
Add a render_js parameter that routes requests through a headless browser (Playwright or Selenium). Run headless browsers as a separate service and proxy requests through them when JS rendering is needed. This keeps the main API lightweight for static pages.
What caching strategy should I use?
Cache by URL hash with a 1-hour TTL for most pages. For rapidly changing pages (prices, stock), reduce TTL to 5-10 minutes. For static content (product descriptions, articles), increase TTL to 24 hours. Let clients control caching with the use_cache parameter.
How do I secure the API for production?
Beyond API keys, add IP whitelisting, request signing (HMAC), and HTTPS. Rate limit aggressively — 100-1,000 requests per hour per key depending on the plan. Log all requests for audit trails. Use environment variables for API keys instead of hardcoding them.
Can the API handle thousands of concurrent requests?
FastAPI with uvicorn handles 1,000+ concurrent connections. The bottleneck is proxy capacity and target site rate limits, not the API framework. For higher throughput, run multiple API instances behind a load balancer and increase the proxy pool size.
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
Related Reading
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)