Proxy Log Analyzer: Build and Deploy

Proxy Log Analyzer: Build and Deploy

Proxy logs contain a wealth of operational intelligence. Every request through your proxy infrastructure generates data about success rates, latency, error patterns, and blocking behavior. A log analyzer transforms this raw data into actionable insights — which proxies perform best, which domains block most aggressively, and where your scraping infrastructure needs improvement.

What the Analyzer Reveals

  • Proxy performance ranking — success rate and latency per proxy
  • Domain blocking patterns — which sites block which proxy types
  • Error distribution — timeouts vs. connection errors vs. HTTP errors
  • Traffic patterns — request volume over time
  • Cost analysis — bandwidth usage per proxy
  • Anomaly detection — sudden changes in success rates

Log Format

The analyzer supports a structured JSON log format. Instrument your scrapers to write logs like this:

{
    "timestamp": "2026-03-11T10:30:45.123Z",
    "proxy": "http://proxy1.example.com:8080",
    "target_url": "https://amazon.com/dp/B0EXAMPLE",
    "target_domain": "amazon.com",
    "method": "GET",
    "status_code": 200,
    "latency_ms": 845,
    "bytes_received": 52340,
    "error": null,
    "proxy_type": "residential",
    "country": "US",
    "retry_count": 0,
    "session_id": "abc123"
}

Implementation

import json
import gzip
from pathlib import Path
from collections import defaultdict
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Iterator
from datetime import datetime, timedelta
import statistics
import csv

@dataclass
class LogEntry:
    timestamp: datetime
    proxy: str
    target_url: str
    target_domain: str
    method: str
    status_code: int
    latency_ms: float
    bytes_received: int
    error: Optional[str]
    proxy_type: str
    country: str
    retry_count: int
    session_id: str

@dataclass
class ProxyStats:
    proxy: str
    total_requests: int = 0
    successful: int = 0
    failed: int = 0
    success_rate: float = 0
    avg_latency_ms: float = 0
    median_latency_ms: float = 0
    p95_latency_ms: float = 0
    total_bytes: int = 0
    errors: Dict[str, int] = field(default_factory=dict)
    status_codes: Dict[int, int] = field(default_factory=dict)

@dataclass
class DomainStats:
    domain: str
    total_requests: int = 0
    success_rate: float = 0
    block_rate: float = 0
    avg_latency_ms: float = 0
    most_common_error: str = ""
    best_proxy_type: str = ""

@dataclass
class AnalysisReport:
    period_start: str = ""
    period_end: str = ""
    total_requests: int = 0
    overall_success_rate: float = 0
    total_bandwidth_mb: float = 0
    proxy_stats: List[ProxyStats] = field(default_factory=list)
    domain_stats: List[DomainStats] = field(default_factory=list)
    hourly_traffic: Dict[int, int] = field(default_factory=dict)
    error_breakdown: Dict[str, int] = field(default_factory=dict)
    recommendations: List[str] = field(default_factory=list)


class ProxyLogAnalyzer:
    def __init__(self):
        self.entries: List[LogEntry] = []

    def load_file(self, filepath: str):
        """Load log entries from a JSONL file."""
        path = Path(filepath)

        if path.suffix == '.gz':
            opener = gzip.open
        else:
            opener = open

        with opener(filepath, 'rt') as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                try:
                    data = json.loads(line)
                    entry = LogEntry(
                        timestamp=datetime.fromisoformat(
                            data['timestamp'].replace('Z', '+00:00')
                        ),
                        proxy=data.get('proxy', ''),
                        target_url=data.get('target_url', ''),
                        target_domain=data.get('target_domain', ''),
                        method=data.get('method', 'GET'),
                        status_code=data.get('status_code', 0),
                        latency_ms=data.get('latency_ms', 0),
                        bytes_received=data.get('bytes_received', 0),
                        error=data.get('error'),
                        proxy_type=data.get('proxy_type', 'unknown'),
                        country=data.get('country', 'unknown'),
                        retry_count=data.get('retry_count', 0),
                        session_id=data.get('session_id', ''),
                    )
                    self.entries.append(entry)
                except (json.JSONDecodeError, KeyError) as e:
                    continue

        print(f"Loaded {len(self.entries)} log entries from {filepath}")

    def load_directory(self, dirpath: str, pattern: str = "*.jsonl*"):
        """Load all matching log files from a directory."""
        path = Path(dirpath)
        for f in sorted(path.glob(pattern)):
            self.load_file(str(f))

    def analyze(
        self,
        start: datetime = None,
        end: datetime = None,
    ) -> AnalysisReport:
        entries = self.entries

        if start:
            entries = [e for e in entries if e.timestamp >= start]
        if end:
            entries = [e for e in entries if e.timestamp <= end]

        if not entries:
            return AnalysisReport()

        report = AnalysisReport(
            period_start=str(entries[0].timestamp),
            period_end=str(entries[-1].timestamp),
            total_requests=len(entries),
        )

        # Overall stats
        successful = sum(
            1 for e in entries
            if e.status_code == 200
        )
        report.overall_success_rate = round(
            successful / len(entries) * 100, 1
        )
        report.total_bandwidth_mb = round(
            sum(e.bytes_received for e in entries) / 1024 / 1024, 1
        )

        # Per-proxy analysis
        report.proxy_stats = self._analyze_proxies(entries)

        # Per-domain analysis
        report.domain_stats = self._analyze_domains(entries)

        # Hourly traffic
        for entry in entries:
            hour = entry.timestamp.hour
            report.hourly_traffic[hour] = (
                report.hourly_traffic.get(hour, 0) + 1
            )

        # Error breakdown
        for entry in entries:
            if entry.error:
                error_type = entry.error.split(':')[0][:50]
                report.error_breakdown[error_type] = (
                    report.error_breakdown.get(error_type, 0) + 1
                )
            elif entry.status_code >= 400:
                key = f"HTTP_{entry.status_code}"
                report.error_breakdown[key] = (
                    report.error_breakdown.get(key, 0) + 1
                )

        # Generate recommendations
        report.recommendations = self._generate_recommendations(report)

        return report

    def _analyze_proxies(self, entries: List[LogEntry]) -> List[ProxyStats]:
        proxy_data = defaultdict(list)
        for entry in entries:
            proxy_data[entry.proxy].append(entry)

        stats = []
        for proxy, proxy_entries in proxy_data.items():
            s = ProxyStats(proxy=proxy)
            s.total_requests = len(proxy_entries)
            s.successful = sum(
                1 for e in proxy_entries if e.status_code == 200
            )
            s.failed = s.total_requests - s.successful
            s.success_rate = round(
                s.successful / s.total_requests * 100, 1
            )

            latencies = [
                e.latency_ms for e in proxy_entries
                if e.latency_ms > 0
            ]
            if latencies:
                s.avg_latency_ms = round(statistics.mean(latencies))
                s.median_latency_ms = round(statistics.median(latencies))
                sorted_lat = sorted(latencies)
                s.p95_latency_ms = round(
                    sorted_lat[int(len(sorted_lat) * 0.95)]
                )

            s.total_bytes = sum(e.bytes_received for e in proxy_entries)

            for e in proxy_entries:
                if e.error:
                    err = e.error.split(':')[0][:30]
                    s.errors[err] = s.errors.get(err, 0) + 1
                code = e.status_code
                s.status_codes[code] = s.status_codes.get(code, 0) + 1

            stats.append(s)

        stats.sort(key=lambda x: x.success_rate, reverse=True)
        return stats

    def _analyze_domains(self, entries: List[LogEntry]) -> List[DomainStats]:
        domain_data = defaultdict(list)
        for entry in entries:
            domain_data[entry.target_domain].append(entry)

        stats = []
        for domain, domain_entries in domain_data.items():
            s = DomainStats(domain=domain)
            s.total_requests = len(domain_entries)

            successful = sum(
                1 for e in domain_entries if e.status_code == 200
            )
            s.success_rate = round(
                successful / s.total_requests * 100, 1
            )

            blocked = sum(
                1 for e in domain_entries
                if e.status_code in (403, 429, 503)
            )
            s.block_rate = round(
                blocked / s.total_requests * 100, 1
            )

            latencies = [
                e.latency_ms for e in domain_entries
                if e.latency_ms > 0
            ]
            s.avg_latency_ms = (
                round(statistics.mean(latencies)) if latencies else 0
            )

            # Find most common error
            errors = defaultdict(int)
            for e in domain_entries:
                if e.error:
                    errors[e.error.split(':')[0]] += 1
                elif e.status_code >= 400:
                    errors[f"HTTP_{e.status_code}"] += 1
            if errors:
                s.most_common_error = max(errors, key=errors.get)

            # Best proxy type for this domain
            type_success = defaultdict(lambda: [0, 0])
            for e in domain_entries:
                type_success[e.proxy_type][0] += 1
                if e.status_code == 200:
                    type_success[e.proxy_type][1] += 1

            best_type = ""
            best_rate = 0
            for ptype, (total, ok) in type_success.items():
                rate = ok / total if total else 0
                if rate > best_rate:
                    best_rate = rate
                    best_type = ptype
            s.best_proxy_type = best_type

            stats.append(s)

        stats.sort(key=lambda x: x.total_requests, reverse=True)
        return stats

    def _generate_recommendations(
        self, report: AnalysisReport
    ) -> List[str]:
        recs = []

        # Low-performing proxies
        for ps in report.proxy_stats:
            if ps.success_rate < 50 and ps.total_requests > 10:
                recs.append(
                    f"Remove proxy {ps.proxy} — "
                    f"{ps.success_rate}% success rate "
                    f"({ps.total_requests} requests)"
                )

        # High block-rate domains
        for ds in report.domain_stats:
            if ds.block_rate > 30 and ds.total_requests > 10:
                rec = (
                    f"Domain {ds.domain} has {ds.block_rate}% block rate"
                )
                if ds.best_proxy_type:
                    rec += f" — try using {ds.best_proxy_type} proxies"
                recs.append(rec)

        # Traffic distribution
        if report.hourly_traffic:
            peak_hour = max(
                report.hourly_traffic,
                key=report.hourly_traffic.get
            )
            recs.append(
                f"Peak traffic at hour {peak_hour}:00 — "
                f"consider distributing load more evenly"
            )

        return recs

    def print_report(self, report: AnalysisReport):
        print("\n" + "=" * 70)
        print("PROXY LOG ANALYSIS REPORT")
        print("=" * 70)

        print(f"\nPeriod: {report.period_start} to {report.period_end}")
        print(f"Total requests: {report.total_requests:,}")
        print(f"Success rate: {report.overall_success_rate}%")
        print(f"Bandwidth: {report.total_bandwidth_mb} MB")

        print(f"\n--- Top Proxies ---")
        print(f"{'Proxy':<40} {'Requests':<10} {'Success%':<10} {'Avg ms':<8}")
        for ps in report.proxy_stats[:10]:
            print(
                f"{ps.proxy[:38]:<40} {ps.total_requests:<10} "
                f"{ps.success_rate:<10} {ps.avg_latency_ms:<8}"
            )

        print(f"\n--- Domain Analysis ---")
        print(f"{'Domain':<30} {'Requests':<10} {'Success%':<10} {'Block%':<8} {'Best Type':<12}")
        for ds in report.domain_stats[:10]:
            print(
                f"{ds.domain[:28]:<30} {ds.total_requests:<10} "
                f"{ds.success_rate:<10} {ds.block_rate:<8} "
                f"{ds.best_proxy_type:<12}"
            )

        if report.error_breakdown:
            print(f"\n--- Error Breakdown ---")
            sorted_errors = sorted(
                report.error_breakdown.items(),
                key=lambda x: x[1], reverse=True,
            )
            for error, count in sorted_errors[:10]:
                print(f"  {error}: {count:,}")

        if report.recommendations:
            print(f"\n--- Recommendations ---")
            for i, rec in enumerate(report.recommendations, 1):
                print(f"  {i}. {rec}")

    def export_csv(self, report: AnalysisReport, filepath: str):
        with open(filepath, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow([
                'proxy', 'requests', 'success_rate',
                'avg_latency_ms', 'p95_latency_ms', 'bandwidth_bytes',
            ])
            for ps in report.proxy_stats:
                writer.writerow([
                    ps.proxy, ps.total_requests, ps.success_rate,
                    ps.avg_latency_ms, ps.p95_latency_ms, ps.total_bytes,
                ])
        print(f"Exported to {filepath}")


# Usage
if __name__ == '__main__':
    analyzer = ProxyLogAnalyzer()
    analyzer.load_directory('/var/log/scraper/', pattern='*.jsonl*')

    report = analyzer.analyze()
    analyzer.print_report(report)
    analyzer.export_csv(report, 'proxy_analysis.csv')

Internal Links

FAQ

What log format should I use?

JSONL (JSON Lines) is the most flexible format — one JSON object per line. It is easy to parse, supports nested data, and compresses well with gzip. For high-volume logging, write to a buffered file and rotate logs daily.

How much disk space do proxy logs consume?

A typical log entry is 300-500 bytes. At 100,000 requests per day, that is about 50 MB per day uncompressed, 5-10 MB with gzip. Retain raw logs for 7-30 days, then archive compressed logs for long-term trend analysis.

How often should I run the analysis?

Run hourly analysis for operational monitoring (detecting sudden drops in success rate). Run daily analysis for optimization decisions (removing bad proxies, adjusting domain-specific strategies). Run weekly analysis for cost and performance trends.

Can I correlate proxy logs with application logs?

Yes. Include a session_id or request_id in both proxy logs and application logs. Join on this ID to trace a business event (product price check) to the underlying proxy request and identify which proxy infrastructure issues affect business outcomes.

How do I detect when a proxy gets blocked?

Monitor the rolling success rate per proxy per domain. A sudden drop from 90%+ to below 50% indicates blocking. The analyzer’s per-domain stats show which proxy types work best for each site, guiding your rotation strategy.


Related Reading

Scroll to Top