Build a Web Scraping Dashboard with Python and Grafana
Monitoring web scraping operations in real time is critical for production environments. When you are processing thousands of pages per hour through rotating proxies, you need visibility into success rates, proxy performance, error patterns, and data quality. A purpose-built dashboard transforms raw metrics into actionable insights.
This tutorial builds a complete monitoring dashboard using Python for metrics collection, Prometheus for storage, and Grafana for visualization.
Architecture Overview
Scraping Jobs (Python)
|
v
Prometheus Client (metrics export)
|
v
Prometheus Server (time-series storage)
|
v
Grafana (visualization dashboards)Setting Up Prometheus Metrics
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
# Define metrics
REQUESTS_TOTAL = Counter(
'scraper_requests_total',
'Total scraping requests',
['status', 'target', 'proxy_country']
)
REQUEST_DURATION = Histogram(
'scraper_request_duration_seconds',
'Request duration in seconds',
['target'],
buckets=[0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
PROXY_POOL_SIZE = Gauge(
'scraper_proxy_pool_size',
'Number of proxies in pool',
['status']
)
PAGES_SCRAPED = Counter(
'scraper_pages_scraped_total',
'Total pages successfully scraped',
['target']
)
DATA_ITEMS_EXTRACTED = Counter(
'scraper_items_extracted_total',
'Total data items extracted',
['item_type']
)
ACTIVE_JOBS = Gauge(
'scraper_active_jobs',
'Currently running scraping jobs'
)
class MonitoredScraper:
def __init__(self, metrics_port=8000):
start_http_server(metrics_port)
print(f"Metrics server started on port {metrics_port}")
def scrape(self, url, proxy):
ACTIVE_JOBS.inc()
start = time.time()
try:
import requests
response = requests.get(url, proxies={"http": proxy, "https": proxy}, timeout=15)
duration = time.time() - start
REQUEST_DURATION.labels(target=self._get_domain(url)).observe(duration)
if response.status_code == 200:
REQUESTS_TOTAL.labels(status='success', target=self._get_domain(url), proxy_country='US').inc()
PAGES_SCRAPED.labels(target=self._get_domain(url)).inc()
return response
else:
REQUESTS_TOTAL.labels(status='error', target=self._get_domain(url), proxy_country='US').inc()
return None
except Exception as e:
REQUESTS_TOTAL.labels(status='failed', target=self._get_domain(url), proxy_country='US').inc()
return None
finally:
ACTIVE_JOBS.dec()
def _get_domain(self, url):
from urllib.parse import urlparse
return urlparse(url).netlocDocker Compose Setup
version: '3.8'
services:
prometheus:
image: prom/prometheus:latest
ports:
- "9090:9090"
volumes:
- ./prometheus.yml:/etc/prometheus/prometheus.yml
grafana:
image: grafana/grafana:latest
ports:
- "3000:3000"
environment:
GF_SECURITY_ADMIN_PASSWORD: admin
volumes:
- grafana-data:/var/lib/grafana
scraper:
build: ./scraper
ports:
- "8000:8000"
volumes:
grafana-data:Prometheus Configuration
# prometheus.yml
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'scraper'
static_configs:
- targets: ['scraper:8000']Grafana Dashboard Panels
Create these panels in Grafana for a comprehensive view:
| Panel | Query | Type |
|---|---|---|
| Request Rate | rate(scraper_requests_total[5m]) | Time Series |
| Success Rate | rate(scraper_requests_total{status="success"}[5m]) / rate(scraper_requests_total[5m]) | Gauge |
| Avg Latency | histogram_quantile(0.95, rate(scraper_request_duration_seconds_bucket[5m])) | Stat |
| Active Jobs | scraper_active_jobs | Stat |
| Error Breakdown | sum by (status)(rate(scraper_requests_total[5m])) | Pie Chart |
| Proxy Pool Health | scraper_proxy_pool_size | Bar Gauge |
| Pages Scraped | increase(scraper_pages_scraped_total[1h]) | Counter |
Alerting Rules
# prometheus-alerts.yml
groups:
- name: scraper_alerts
rules:
- alert: HighErrorRate
expr: rate(scraper_requests_total{status="failed"}[5m]) / rate(scraper_requests_total[5m]) > 0.3
for: 5m
labels:
severity: warning
annotations:
summary: "Scraper error rate above 30%"
- alert: ProxyPoolDepleted
expr: scraper_proxy_pool_size{status="healthy"} < 5
for: 2m
labels:
severity: critical
annotations:
summary: "Healthy proxy pool below 5"
- alert: HighLatency
expr: histogram_quantile(0.95, rate(scraper_request_duration_seconds_bucket[5m])) > 10
for: 5m
labels:
severity: warning
annotations:
summary: "P95 latency exceeds 10 seconds"FAQ
What metrics should I prioritize on my dashboard?
Focus on success rate, P95 latency, error distribution, and proxy pool health. These four metrics give you immediate visibility into whether your scraping operation is healthy. Add data quality metrics (items extracted per page) for deeper insight.
Can I use this with cloud scraping infrastructure?
Yes. Deploy Prometheus and Grafana on your cloud infrastructure and point them at your scraper instances. For Kubernetes deployments, use the Prometheus Operator for automatic service discovery.
How much storage does Prometheus need?
Prometheus uses approximately 1-2 bytes per sample. At 15-second scrape intervals with 50 metrics, expect about 150MB per day. Configure retention with --storage.tsdb.retention.time=30d.
Should I monitor individual proxy performance?
For large proxy pools, aggregate metrics by country or provider rather than individual IPs. Track individual proxy performance in your proxy pool manager and export summarized metrics to Prometheus.
Implementation Best Practices
Error Handling and Retry Logic
Production scraping tools must handle failures gracefully. Implement exponential backoff with jitter:
import random
import time
def retry_with_backoff(func, max_retries=3, base_delay=1):
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s")
time.sleep(delay)Logging Configuration
Set up structured logging for debugging and monitoring:
import logging
import json
from datetime import datetime
class JSONFormatter(logging.Formatter):
def format(self, record):
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
}
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry)
# Setup
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger("scraper")
logger.addHandler(handler)
logger.setLevel(logging.INFO)Configuration Management
Use environment variables and config files for flexibility:
import os
from dataclasses import dataclass
@dataclass
class ScraperConfig:
proxy_url: str = os.getenv("PROXY_URL", "")
concurrent_workers: int = int(os.getenv("CONCURRENT_WORKERS", "10"))
request_timeout: int = int(os.getenv("REQUEST_TIMEOUT", "15"))
max_retries: int = int(os.getenv("MAX_RETRIES", "3"))
rate_limit_per_second: float = float(os.getenv("RATE_LIMIT", "5"))
output_format: str = os.getenv("OUTPUT_FORMAT", "json")
database_url: str = os.getenv("DATABASE_URL", "sqlite:///results.db")
log_level: str = os.getenv("LOG_LEVEL", "INFO")
@classmethod
def from_yaml(cls, filepath: str):
import yaml
with open(filepath) as f:
config = yaml.safe_load(f)
return cls(**{k: v for k, v in config.items() if hasattr(cls, k)})Rate Limiting
Implement token bucket rate limiting to respect target sites:
import asyncio
import time
class RateLimiter:
def __init__(self, rate: float, burst: int = 1):
self.rate = rate # requests per second
self.burst = burst
self.tokens = burst
self.last_refill = time.time()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
self.last_refill = now
if self.tokens >= 1:
self.tokens -= 1
return
else:
wait_time = (1 - self.tokens) / self.rate
await asyncio.sleep(wait_time)
self.tokens = 0Data Validation
Validate scraped data before storage:
from typing import Optional, List
import re
class DataValidator:
@staticmethod
def validate_url(url: str) -> bool:
pattern = re.compile(
r'^https?://'
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'
r'localhost|'
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
r'(?::\d+)?'
r'(?:/?|[/?]\S+)$', re.IGNORECASE)
return bool(pattern.match(url))
@staticmethod
def validate_price(price: Optional[float]) -> bool:
if price is None:
return True
return 0 < price < 1_000_000
@staticmethod
def validate_text(text: str, min_length: int = 1, max_length: int = 10000) -> bool:
return min_length <= len(text.strip()) <= max_length
def validate_record(self, record: dict) -> tuple:
errors = []
if "url" in record and not self.validate_url(record["url"]):
errors.append("invalid URL")
if "price" in record and not self.validate_price(record.get("price")):
errors.append("invalid price")
if "title" in record and not self.validate_text(record.get("title", ""), 1, 500):
errors.append("invalid title length")
return len(errors) == 0, errorsDeployment
Running as a Service
# Using systemd
sudo cat > /etc/systemd/system/scraper.service << EOF
[Unit]
Description=Web Scraping Service
After=network.target
[Service]
Type=simple
User=scraper
WorkingDir=/opt/scraper
ExecStart=/opt/scraper/venv/bin/python main.py
Restart=always
RestartSec=10
Environment=PROXY_URL=http://user:pass@proxy:8080
Environment=LOG_LEVEL=INFO
[Install]
WantedBy=multi-user.target
EOF
sudo systemctl enable scraper
sudo systemctl start scraperDocker Deployment
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
HEALTHCHECK --interval=30s --timeout=10s CMD python -c "import requests; requests.get('http://localhost:8000/health')"
CMD ["python", "main.py"]Testing
Write tests for your scraping tools:
import pytest
import asyncio
class TestProxyIntegration:
def test_proxy_connectivity(self):
import requests
proxy = {"http": "http://user:pass@proxy:8080", "https": "http://user:pass@proxy:8080"}
response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
assert response.status_code == 200
assert "origin" in response.json()
def test_proxy_rotation(self):
ips = set()
for _ in range(5):
import requests
proxy = {"http": "http://user:pass@rotating-proxy:8080"}
response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
ips.add(response.json()["origin"])
assert len(ips) > 1, "Proxy should rotate IPs"
def test_data_validation(self):
validator = DataValidator()
valid, errors = validator.validate_record({
"url": "https://example.com",
"title": "Test Product",
"price": 29.99,
})
assert valid
assert len(errors) == 0For proxy infrastructure guidance, see our proxy pool management guide and web scraping proxy overview.
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
Related Reading
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)