Proxy Log Analyzer: Build and Deploy
Proxy logs contain a wealth of operational intelligence. Every request through your proxy infrastructure generates data about success rates, latency, error patterns, and blocking behavior. A log analyzer transforms this raw data into actionable insights — which proxies perform best, which domains block most aggressively, and where your scraping infrastructure needs improvement.
What the Analyzer Reveals
- Proxy performance ranking — success rate and latency per proxy
- Domain blocking patterns — which sites block which proxy types
- Error distribution — timeouts vs. connection errors vs. HTTP errors
- Traffic patterns — request volume over time
- Cost analysis — bandwidth usage per proxy
- Anomaly detection — sudden changes in success rates
Log Format
The analyzer supports a structured JSON log format. Instrument your scrapers to write logs like this:
{
"timestamp": "2026-03-11T10:30:45.123Z",
"proxy": "http://proxy1.example.com:8080",
"target_url": "https://amazon.com/dp/B0EXAMPLE",
"target_domain": "amazon.com",
"method": "GET",
"status_code": 200,
"latency_ms": 845,
"bytes_received": 52340,
"error": null,
"proxy_type": "residential",
"country": "US",
"retry_count": 0,
"session_id": "abc123"
}Implementation
import json
import gzip
from pathlib import Path
from collections import defaultdict
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Iterator
from datetime import datetime, timedelta
import statistics
import csv
@dataclass
class LogEntry:
timestamp: datetime
proxy: str
target_url: str
target_domain: str
method: str
status_code: int
latency_ms: float
bytes_received: int
error: Optional[str]
proxy_type: str
country: str
retry_count: int
session_id: str
@dataclass
class ProxyStats:
proxy: str
total_requests: int = 0
successful: int = 0
failed: int = 0
success_rate: float = 0
avg_latency_ms: float = 0
median_latency_ms: float = 0
p95_latency_ms: float = 0
total_bytes: int = 0
errors: Dict[str, int] = field(default_factory=dict)
status_codes: Dict[int, int] = field(default_factory=dict)
@dataclass
class DomainStats:
domain: str
total_requests: int = 0
success_rate: float = 0
block_rate: float = 0
avg_latency_ms: float = 0
most_common_error: str = ""
best_proxy_type: str = ""
@dataclass
class AnalysisReport:
period_start: str = ""
period_end: str = ""
total_requests: int = 0
overall_success_rate: float = 0
total_bandwidth_mb: float = 0
proxy_stats: List[ProxyStats] = field(default_factory=list)
domain_stats: List[DomainStats] = field(default_factory=list)
hourly_traffic: Dict[int, int] = field(default_factory=dict)
error_breakdown: Dict[str, int] = field(default_factory=dict)
recommendations: List[str] = field(default_factory=list)
class ProxyLogAnalyzer:
def __init__(self):
self.entries: List[LogEntry] = []
def load_file(self, filepath: str):
"""Load log entries from a JSONL file."""
path = Path(filepath)
if path.suffix == '.gz':
opener = gzip.open
else:
opener = open
with opener(filepath, 'rt') as f:
for line in f:
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
entry = LogEntry(
timestamp=datetime.fromisoformat(
data['timestamp'].replace('Z', '+00:00')
),
proxy=data.get('proxy', ''),
target_url=data.get('target_url', ''),
target_domain=data.get('target_domain', ''),
method=data.get('method', 'GET'),
status_code=data.get('status_code', 0),
latency_ms=data.get('latency_ms', 0),
bytes_received=data.get('bytes_received', 0),
error=data.get('error'),
proxy_type=data.get('proxy_type', 'unknown'),
country=data.get('country', 'unknown'),
retry_count=data.get('retry_count', 0),
session_id=data.get('session_id', ''),
)
self.entries.append(entry)
except (json.JSONDecodeError, KeyError) as e:
continue
print(f"Loaded {len(self.entries)} log entries from {filepath}")
def load_directory(self, dirpath: str, pattern: str = "*.jsonl*"):
"""Load all matching log files from a directory."""
path = Path(dirpath)
for f in sorted(path.glob(pattern)):
self.load_file(str(f))
def analyze(
self,
start: datetime = None,
end: datetime = None,
) -> AnalysisReport:
entries = self.entries
if start:
entries = [e for e in entries if e.timestamp >= start]
if end:
entries = [e for e in entries if e.timestamp <= end]
if not entries:
return AnalysisReport()
report = AnalysisReport(
period_start=str(entries[0].timestamp),
period_end=str(entries[-1].timestamp),
total_requests=len(entries),
)
# Overall stats
successful = sum(
1 for e in entries
if e.status_code == 200
)
report.overall_success_rate = round(
successful / len(entries) * 100, 1
)
report.total_bandwidth_mb = round(
sum(e.bytes_received for e in entries) / 1024 / 1024, 1
)
# Per-proxy analysis
report.proxy_stats = self._analyze_proxies(entries)
# Per-domain analysis
report.domain_stats = self._analyze_domains(entries)
# Hourly traffic
for entry in entries:
hour = entry.timestamp.hour
report.hourly_traffic[hour] = (
report.hourly_traffic.get(hour, 0) + 1
)
# Error breakdown
for entry in entries:
if entry.error:
error_type = entry.error.split(':')[0][:50]
report.error_breakdown[error_type] = (
report.error_breakdown.get(error_type, 0) + 1
)
elif entry.status_code >= 400:
key = f"HTTP_{entry.status_code}"
report.error_breakdown[key] = (
report.error_breakdown.get(key, 0) + 1
)
# Generate recommendations
report.recommendations = self._generate_recommendations(report)
return report
def _analyze_proxies(self, entries: List[LogEntry]) -> List[ProxyStats]:
proxy_data = defaultdict(list)
for entry in entries:
proxy_data[entry.proxy].append(entry)
stats = []
for proxy, proxy_entries in proxy_data.items():
s = ProxyStats(proxy=proxy)
s.total_requests = len(proxy_entries)
s.successful = sum(
1 for e in proxy_entries if e.status_code == 200
)
s.failed = s.total_requests - s.successful
s.success_rate = round(
s.successful / s.total_requests * 100, 1
)
latencies = [
e.latency_ms for e in proxy_entries
if e.latency_ms > 0
]
if latencies:
s.avg_latency_ms = round(statistics.mean(latencies))
s.median_latency_ms = round(statistics.median(latencies))
sorted_lat = sorted(latencies)
s.p95_latency_ms = round(
sorted_lat[int(len(sorted_lat) * 0.95)]
)
s.total_bytes = sum(e.bytes_received for e in proxy_entries)
for e in proxy_entries:
if e.error:
err = e.error.split(':')[0][:30]
s.errors[err] = s.errors.get(err, 0) + 1
code = e.status_code
s.status_codes[code] = s.status_codes.get(code, 0) + 1
stats.append(s)
stats.sort(key=lambda x: x.success_rate, reverse=True)
return stats
def _analyze_domains(self, entries: List[LogEntry]) -> List[DomainStats]:
domain_data = defaultdict(list)
for entry in entries:
domain_data[entry.target_domain].append(entry)
stats = []
for domain, domain_entries in domain_data.items():
s = DomainStats(domain=domain)
s.total_requests = len(domain_entries)
successful = sum(
1 for e in domain_entries if e.status_code == 200
)
s.success_rate = round(
successful / s.total_requests * 100, 1
)
blocked = sum(
1 for e in domain_entries
if e.status_code in (403, 429, 503)
)
s.block_rate = round(
blocked / s.total_requests * 100, 1
)
latencies = [
e.latency_ms for e in domain_entries
if e.latency_ms > 0
]
s.avg_latency_ms = (
round(statistics.mean(latencies)) if latencies else 0
)
# Find most common error
errors = defaultdict(int)
for e in domain_entries:
if e.error:
errors[e.error.split(':')[0]] += 1
elif e.status_code >= 400:
errors[f"HTTP_{e.status_code}"] += 1
if errors:
s.most_common_error = max(errors, key=errors.get)
# Best proxy type for this domain
type_success = defaultdict(lambda: [0, 0])
for e in domain_entries:
type_success[e.proxy_type][0] += 1
if e.status_code == 200:
type_success[e.proxy_type][1] += 1
best_type = ""
best_rate = 0
for ptype, (total, ok) in type_success.items():
rate = ok / total if total else 0
if rate > best_rate:
best_rate = rate
best_type = ptype
s.best_proxy_type = best_type
stats.append(s)
stats.sort(key=lambda x: x.total_requests, reverse=True)
return stats
def _generate_recommendations(
self, report: AnalysisReport
) -> List[str]:
recs = []
# Low-performing proxies
for ps in report.proxy_stats:
if ps.success_rate < 50 and ps.total_requests > 10:
recs.append(
f"Remove proxy {ps.proxy} — "
f"{ps.success_rate}% success rate "
f"({ps.total_requests} requests)"
)
# High block-rate domains
for ds in report.domain_stats:
if ds.block_rate > 30 and ds.total_requests > 10:
rec = (
f"Domain {ds.domain} has {ds.block_rate}% block rate"
)
if ds.best_proxy_type:
rec += f" — try using {ds.best_proxy_type} proxies"
recs.append(rec)
# Traffic distribution
if report.hourly_traffic:
peak_hour = max(
report.hourly_traffic,
key=report.hourly_traffic.get
)
recs.append(
f"Peak traffic at hour {peak_hour}:00 — "
f"consider distributing load more evenly"
)
return recs
def print_report(self, report: AnalysisReport):
print("\n" + "=" * 70)
print("PROXY LOG ANALYSIS REPORT")
print("=" * 70)
print(f"\nPeriod: {report.period_start} to {report.period_end}")
print(f"Total requests: {report.total_requests:,}")
print(f"Success rate: {report.overall_success_rate}%")
print(f"Bandwidth: {report.total_bandwidth_mb} MB")
print(f"\n--- Top Proxies ---")
print(f"{'Proxy':<40} {'Requests':<10} {'Success%':<10} {'Avg ms':<8}")
for ps in report.proxy_stats[:10]:
print(
f"{ps.proxy[:38]:<40} {ps.total_requests:<10} "
f"{ps.success_rate:<10} {ps.avg_latency_ms:<8}"
)
print(f"\n--- Domain Analysis ---")
print(f"{'Domain':<30} {'Requests':<10} {'Success%':<10} {'Block%':<8} {'Best Type':<12}")
for ds in report.domain_stats[:10]:
print(
f"{ds.domain[:28]:<30} {ds.total_requests:<10} "
f"{ds.success_rate:<10} {ds.block_rate:<8} "
f"{ds.best_proxy_type:<12}"
)
if report.error_breakdown:
print(f"\n--- Error Breakdown ---")
sorted_errors = sorted(
report.error_breakdown.items(),
key=lambda x: x[1], reverse=True,
)
for error, count in sorted_errors[:10]:
print(f" {error}: {count:,}")
if report.recommendations:
print(f"\n--- Recommendations ---")
for i, rec in enumerate(report.recommendations, 1):
print(f" {i}. {rec}")
def export_csv(self, report: AnalysisReport, filepath: str):
with open(filepath, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow([
'proxy', 'requests', 'success_rate',
'avg_latency_ms', 'p95_latency_ms', 'bandwidth_bytes',
])
for ps in report.proxy_stats:
writer.writerow([
ps.proxy, ps.total_requests, ps.success_rate,
ps.avg_latency_ms, ps.p95_latency_ms, ps.total_bytes,
])
print(f"Exported to {filepath}")
# Usage
if __name__ == '__main__':
analyzer = ProxyLogAnalyzer()
analyzer.load_directory('/var/log/scraper/', pattern='*.jsonl*')
report = analyzer.analyze()
analyzer.print_report(report)
analyzer.export_csv(report, 'proxy_analysis.csv')Internal Links
- Creating a Web Scraping Dashboard with Grafana — real-time monitoring
- Proxy Health Monitor with Node.js — live health monitoring
- Building a Proxy Benchmarking Suite — structured benchmarks
- Proxy Pool Manager — manage proxy pools
- Building a Proxy Rotation Library — optimize rotation
FAQ
What log format should I use?
JSONL (JSON Lines) is the most flexible format — one JSON object per line. It is easy to parse, supports nested data, and compresses well with gzip. For high-volume logging, write to a buffered file and rotate logs daily.
How much disk space do proxy logs consume?
A typical log entry is 300-500 bytes. At 100,000 requests per day, that is about 50 MB per day uncompressed, 5-10 MB with gzip. Retain raw logs for 7-30 days, then archive compressed logs for long-term trend analysis.
How often should I run the analysis?
Run hourly analysis for operational monitoring (detecting sudden drops in success rate). Run daily analysis for optimization decisions (removing bad proxies, adjusting domain-specific strategies). Run weekly analysis for cost and performance trends.
Can I correlate proxy logs with application logs?
Yes. Include a session_id or request_id in both proxy logs and application logs. Join on this ID to trace a business event (product price check) to the underlying proxy request and identify which proxy infrastructure issues affect business outcomes.
How do I detect when a proxy gets blocked?
Monitor the rolling success rate per proxy per domain. A sudden drop from 90%+ to below 50% indicates blocking. The analyzer’s per-domain stats show which proxy types work best for each site, guiding your rotation strategy.
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
Related Reading
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)