Build a Scraping Results Database: Store and Query Data
Storing web scraping results efficiently requires careful database design. The right schema handles millions of records, supports fast queries, manages deduplication, and accommodates the semi-structured nature of scraped data.
PostgreSQL Schema Design
-- Core tables for scraping results
CREATE TABLE domains (
id SERIAL PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
category TEXT,
scrape_config JSONB DEFAULT '{}',
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE scrape_jobs (
id SERIAL PRIMARY KEY,
domain_id INTEGER REFERENCES domains(id),
status TEXT DEFAULT 'pending', -- pending, running, completed, failed
started_at TIMESTAMP,
completed_at TIMESTAMP,
pages_scraped INTEGER DEFAULT 0,
pages_failed INTEGER DEFAULT 0,
proxy_used TEXT,
config JSONB DEFAULT '{}',
error_message TEXT
);
CREATE TABLE scraped_pages (
id BIGSERIAL PRIMARY KEY,
job_id INTEGER REFERENCES scrape_jobs(id),
url TEXT NOT NULL,
domain_id INTEGER REFERENCES domains(id),
status_code INTEGER,
content_hash TEXT,
html_size INTEGER,
data JSONB,
metadata JSONB DEFAULT '{}',
scraped_at TIMESTAMP DEFAULT NOW(),
proxy_ip TEXT,
latency_ms FLOAT
);
-- Indexes for common queries
CREATE INDEX idx_pages_domain ON scraped_pages(domain_id);
CREATE INDEX idx_pages_date ON scraped_pages(scraped_at);
CREATE INDEX idx_pages_url ON scraped_pages(url);
CREATE INDEX idx_pages_hash ON scraped_pages(content_hash);
CREATE INDEX idx_pages_data ON scraped_pages USING GIN(data);
CREATE INDEX idx_jobs_status ON scrape_jobs(status);
-- Deduplication view
CREATE MATERIALIZED VIEW unique_pages AS
SELECT DISTINCT ON (url, content_hash)
id, url, domain_id, data, scraped_at
FROM scraped_pages
ORDER BY url, content_hash, scraped_at DESC;
-- Partitioning by date (for large datasets)
CREATE TABLE scraped_pages_partitioned (
LIKE scraped_pages INCLUDING ALL
) PARTITION BY RANGE (scraped_at);
CREATE TABLE scraped_pages_2026_03 PARTITION OF scraped_pages_partitioned
FOR VALUES FROM ('2026-03-01') TO ('2026-04-01');Python Data Access Layer
import psycopg2
from psycopg2.extras import RealDictCursor, execute_values
import json
from datetime import datetime
from typing import List, Dict, Optional
class ScrapingDatabase:
def __init__(self, dsn: str):
self.conn = psycopg2.connect(dsn)
def save_results(self, job_id: int, results: List[Dict]):
cursor = self.conn.cursor()
values = []
for r in results:
values.append((
job_id,
r["url"],
r.get("domain_id"),
r.get("status_code", 200),
r.get("content_hash", ""),
r.get("html_size", 0),
json.dumps(r.get("data", {})),
json.dumps(r.get("metadata", {})),
r.get("scraped_at", datetime.now()),
r.get("proxy_ip", ""),
r.get("latency_ms", 0),
))
execute_values(cursor, """
INSERT INTO scraped_pages
(job_id, url, domain_id, status_code, content_hash,
html_size, data, metadata, scraped_at, proxy_ip, latency_ms)
VALUES %s
""", values)
self.conn.commit()
def query_by_domain(self, domain: str, limit: int = 100) -> List[Dict]:
cursor = self.conn.cursor(cursor_factory=RealDictCursor)
cursor.execute("""
SELECT sp.url, sp.data, sp.scraped_at, sp.latency_ms
FROM scraped_pages sp
JOIN domains d ON sp.domain_id = d.id
WHERE d.name = %s
ORDER BY sp.scraped_at DESC
LIMIT %s
""", (domain, limit))
return cursor.fetchall()
def get_scraping_stats(self) -> Dict:
cursor = self.conn.cursor(cursor_factory=RealDictCursor)
cursor.execute("""
SELECT
COUNT(*) as total_pages,
COUNT(DISTINCT url) as unique_urls,
COUNT(DISTINCT domain_id) as domains,
AVG(latency_ms) as avg_latency,
MIN(scraped_at) as first_scrape,
MAX(scraped_at) as last_scrape
FROM scraped_pages
""")
return cursor.fetchone()FAQ
Should I use PostgreSQL or MongoDB for scraped data?
PostgreSQL with JSONB columns gives you the best of both worlds: structured queries for metadata and flexible JSON storage for variable scraped data. MongoDB is viable for purely unstructured data but lacks PostgreSQL’s query power and ACID guarantees.
How do I handle schema changes when target sites change?
Store scraped data in JSONB columns. When a site changes its structure, your schema stays the same — only your parsing code needs updating. Add versioning to track which parser version produced each record.
How much storage does scraping data require?
Text-only scraped data: ~1-5 KB per page. With HTML: ~50-200 KB per page. At 1 million pages, expect 1-200 GB depending on what you store. Use table partitioning and archival strategies for datasets over 10 million rows. Use efficient proxy bandwidth to minimize transfer costs.
Implementation Best Practices
Error Handling and Retry Logic
Production scraping tools must handle failures gracefully. Implement exponential backoff with jitter:
import random
import time
def retry_with_backoff(func, max_retries=3, base_delay=1):
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s")
time.sleep(delay)Logging Configuration
Set up structured logging for debugging and monitoring:
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
}
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry)
# Setup
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger("scraper")
logger.addHandler(handler)
logger.setLevel(logging.INFO)Configuration Management
Use environment variables and config files for flexibility:
import os
from dataclasses import dataclass
@dataclass
class ScraperConfig:
proxy_url: str = os.getenv("PROXY_URL", "")
concurrent_workers: int = int(os.getenv("CONCURRENT_WORKERS", "10"))
request_timeout: int = int(os.getenv("REQUEST_TIMEOUT", "15"))
max_retries: int = int(os.getenv("MAX_RETRIES", "3"))
rate_limit_per_second: float = float(os.getenv("RATE_LIMIT", "5"))
output_format: str = os.getenv("OUTPUT_FORMAT", "json")
database_url: str = os.getenv("DATABASE_URL", "sqlite:///results.db")
log_level: str = os.getenv("LOG_LEVEL", "INFO")
@classmethod
def from_yaml(cls, filepath: str):
import yaml
with open(filepath) as f:
config = yaml.safe_load(f)
return cls(**{k: v for k, v in config.items() if hasattr(cls, k)})Rate Limiting
Implement token bucket rate limiting to respect target sites:
import asyncio
import time
class RateLimiter:
def __init__(self, rate: float, burst: int = 1):
self.rate = rate # requests per second
self.burst = burst
self.tokens = burst
self.last_refill = time.time()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return
else:
wait_time = (1 - self.tokens) / self.rate
await asyncio.sleep(wait_time)
self.tokens = 0Data Validation
Validate scraped data before storage:
from typing import Optional, List
import re
class DataValidator:
@staticmethod
def validate_url(url: str) -> bool:
pattern = re.compile(
r'^https?://'
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'
r'localhost|'
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
r'(?::\d+)?'
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
return bool(pattern.match(url))
@staticmethod
def validate_price(price: Optional[float]) -> bool:
if price is None:
return True
return 0 < price < 1_000_000
@staticmethod
def validate_text(text: str, min_length: int = 1, max_length: int = 10000) -> bool:
return min_length <= len(text.strip()) <= max_length
def validate_record(self, record: dict) -> tuple:
errors = []
if "url" in record and not self.validate_url(record["url"]):
errors.append("invalid URL")
if "price" in record and not self.validate_price(record.get("price")):
errors.append("invalid price")
if "title" in record and not self.validate_text(record.get("title", ""), 1, 500):
errors.append("invalid title length")
return len(errors) == 0, errorsDeployment
Running as a Service
# Using systemd
sudo cat > /etc/systemd/system/scraper.service << EOF
[Unit]
Description=Web Scraping Service
After=network.target
[Service]
Type=simple
User=scraper
WorkingDir=/opt/scraper
ExecStart=/opt/scraper/venv/bin/python main.py
Restart=always
RestartSec=10
Environment=PROXY_URL=http://user:pass@proxy:8080
Environment=LOG_LEVEL=INFO
[Install]
WantedBy=multi-user.target
EOF
sudo systemctl enable scraper
sudo systemctl start scraperDocker Deployment
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
HEALTHCHECK --interval=30s --timeout=10s CMD python -c "import requests; requests.get('http://localhost:8000/health')"
CMD ["python", "main.py"]Testing
Write tests for your scraping tools:
import pytest
import asyncio
class TestProxyIntegration:
def test_proxy_connectivity(self):
import requests
proxy = {"http": "http://user:pass@proxy:8080", "https": "http://user:pass@proxy:8080"}
response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
assert response.status_code == 200
assert "origin" in response.json()
def test_proxy_rotation(self):
ips = set()
for _ in range(5):
import requests
proxy = {"http": "http://user:pass@rotating-proxy:8080"}
response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
ips.add(response.json()["origin"])
assert len(ips) > 1, "Proxy should rotate IPs"
def test_data_validation(self):
validator = DataValidator()
valid, errors = validator.validate_record({
"url": "https://example.com",
"title": "Test Product",
"price": 29.99,
})
assert valid
assert len(errors) == 0For proxy infrastructure guidance, see our proxy pool management guide and web scraping proxy overview.
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
Related Reading
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)