Proxy Failover Strategies: High Availability & Automatic Recovery
A single proxy failure should never stop your scraping pipeline. Whether it is a dead proxy, a rate-limited IP, or an entire provider outage, your system needs automatic detection and seamless failover. This guide covers production-tested failover patterns that keep your data collection running 24/7.
Why Proxy Failover Matters
Without failover:
- One proxy goes down → entire pipeline stops
- Rate limit on one IP → wasted retries on a blocked proxy
- Provider outage → hours of downtime
- Geo-specific proxy dies → missing data for that region
With proper failover:
- Dead proxy detected in seconds → traffic rerouted
- Rate-limited IP → automatically switched to fresh IP
- Provider outage → fallback to secondary provider
- Regional failure → geo-aware routing to backup
Failover Architecture Patterns
Pattern 1: Simple Fallback Chain
import httpx
import asyncio
from typing import List, Optional
class FallbackProxyChain:
"""Try proxies in order, fall back on failure."""
def __init__(self, proxy_chain: List[str]):
self.proxy_chain = proxy_chain
async def request(self, url: str, **kwargs) -> Optional[httpx.Response]:
last_error = None
for proxy in self.proxy_chain:
try:
async with httpx.AsyncClient(
proxy=proxy, timeout=15
) as client:
response = await client.get(url, **kwargs)
if response.status_code < 500:
return response
except Exception as e:
last_error = e
continue
raise Exception(f"All proxies failed. Last error: {last_error}")
# Usage
chain = FallbackProxyChain([
"http://user:pass@primary-proxy.com:8080",
"http://user:pass@secondary-proxy.com:8080",
"http://user:pass@tertiary-proxy.com:8080",
])
response = asyncio.run(chain.request("https://example.com"))Pattern 2: Circuit Breaker
The circuit breaker pattern prevents hammering a failed proxy:
import time
from enum import Enum
class CircuitState(Enum):
CLOSED = "closed" # Normal operation
OPEN = "open" # Proxy failed, skip it
HALF_OPEN = "half_open" # Testing if proxy recovered
class CircuitBreaker:
"""Circuit breaker for proxy health management."""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
success_threshold: int = 3,
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.success_threshold = success_threshold
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
self.last_failure_time = 0
def can_execute(self) -> bool:
if self.state == CircuitState.CLOSED:
return True
elif self.state == CircuitState.OPEN:
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = CircuitState.HALF_OPEN
return True
return False
elif self.state == CircuitState.HALF_OPEN:
return True
return False
def record_success(self):
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
self.success_count = 0
elif self.state == CircuitState.CLOSED:
self.failure_count = 0
def record_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
self.success_count = 0
elif self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
class ResilientProxyPool:
"""Proxy pool with circuit breakers per proxy."""
def __init__(self, proxies: List[str]):
self.proxies = {
proxy: CircuitBreaker() for proxy in proxies
}
def get_healthy_proxy(self) -> Optional[str]:
for proxy, breaker in self.proxies.items():
if breaker.can_execute():
return proxy
return None
async def request(self, url: str) -> httpx.Response:
proxy = self.get_healthy_proxy()
if not proxy:
raise Exception("No healthy proxies available")
breaker = self.proxies[proxy]
try:
async with httpx.AsyncClient(
proxy=proxy, timeout=15
) as client:
response = await client.get(url)
if response.status_code == 429:
breaker.record_failure()
return await self.request(url) # Retry with next proxy
breaker.record_success()
return response
except Exception:
breaker.record_failure()
return await self.request(url)
# Usage
pool = ResilientProxyPool([
"http://user:pass@proxy1.com:8080",
"http://user:pass@proxy2.com:8080",
"http://user:pass@proxy3.com:8080",
])Pattern 3: Health Check Monitor
import asyncio
from dataclasses import dataclass
@dataclass
class ProxyHealth:
url: str
healthy: bool = True
latency_ms: float = 0
consecutive_failures: int = 0
last_check: float = 0
total_requests: int = 0
failed_requests: int = 0
@property
def error_rate(self):
if self.total_requests == 0:
return 0
return self.failed_requests / self.total_requests
class HealthCheckMonitor:
"""Continuously monitor proxy health."""
def __init__(self, proxies: List[str], check_interval: int = 30):
self.proxies = {
url: ProxyHealth(url=url) for url in proxies
}
self.check_interval = check_interval
self.check_url = "https://httpbin.org/ip"
async def check_proxy(self, proxy_url: str) -> bool:
health = self.proxies[proxy_url]
try:
start = time.time()
async with httpx.AsyncClient(
proxy=proxy_url, timeout=10
) as client:
response = await client.get(self.check_url)
latency = (time.time() - start) * 1000
health.healthy = response.status_code == 200
health.latency_ms = latency
health.consecutive_failures = 0
health.last_check = time.time()
health.total_requests += 1
return True
except Exception:
health.healthy = False
health.consecutive_failures += 1
health.last_check = time.time()
health.total_requests += 1
health.failed_requests += 1
return False
async def monitor_loop(self):
while True:
tasks = [
self.check_proxy(url)
for url in self.proxies
]
await asyncio.gather(*tasks)
healthy = sum(1 for p in self.proxies.values() if p.healthy)
total = len(self.proxies)
print(f"Proxy health: {healthy}/{total} healthy")
await asyncio.sleep(self.check_interval)
def get_healthy_proxies(self) -> List[str]:
return [
url for url, health in self.proxies.items()
if health.healthy
]
def get_fastest_proxy(self) -> Optional[str]:
healthy = [
(url, h.latency_ms)
for url, h in self.proxies.items()
if h.healthy
]
if not healthy:
return None
return min(healthy, key=lambda x: x[1])[0]Pattern 4: Multi-Provider Failover
class MultiProviderFailover:
"""Failover across multiple proxy providers."""
def __init__(self):
self.providers = {}
self.priority_order = []
def add_provider(self, name: str, proxies: List[str], priority: int = 0):
self.providers[name] = {
"proxies": ResilientProxyPool(proxies),
"priority": priority,
"healthy": True,
}
self.priority_order = sorted(
self.providers.keys(),
key=lambda n: self.providers[n]["priority"]
)
async def request(self, url: str) -> httpx.Response:
for provider_name in self.priority_order:
provider = self.providers[provider_name]
if not provider["healthy"]:
continue
try:
response = await provider["proxies"].request(url)
return response
except Exception:
provider["healthy"] = False
continue
raise Exception("All providers exhausted")
# Usage
failover = MultiProviderFailover()
failover.add_provider("primary_brightdata", [
"http://user:pass@brd1.com:8080",
"http://user:pass@brd2.com:8080",
], priority=0)
failover.add_provider("secondary_oxylabs", [
"http://user:pass@oxy1.com:8080",
], priority=1)
failover.add_provider("fallback_datacenter", [
"http://user:pass@dc1.com:8080",
], priority=2)Retry Strategies
import random
class RetryStrategy:
"""Configurable retry with backoff."""
@staticmethod
def exponential_backoff(attempt, base=1, max_delay=60):
delay = min(base * (2 ** attempt), max_delay)
jitter = random.uniform(0, delay * 0.1)
return delay + jitter
@staticmethod
def linear_backoff(attempt, increment=2, max_delay=30):
return min(increment * attempt, max_delay)
@staticmethod
async def retry_with_backoff(
func,
max_retries=3,
backoff='exponential',
):
for attempt in range(max_retries + 1):
try:
return await func()
except Exception as e:
if attempt == max_retries:
raise
if backoff == 'exponential':
delay = RetryStrategy.exponential_backoff(attempt)
else:
delay = RetryStrategy.linear_backoff(attempt)
await asyncio.sleep(delay)Monitoring and Alerting
class FailoverMetrics:
"""Track failover events for monitoring."""
def __init__(self):
self.events = []
self.alert_threshold = 3
def record_failover(self, from_proxy, to_proxy, reason):
event = {
"timestamp": time.time(),
"from": from_proxy,
"to": to_proxy,
"reason": reason,
}
self.events.append(event)
# Check for alert conditions
recent = [
e for e in self.events
if time.time() - e["timestamp"] < 300 # Last 5 min
]
if len(recent) >= self.alert_threshold:
self.send_alert(recent)
def send_alert(self, events):
print(f"ALERT: {len(events)} failover events in 5 minutes")
# Integrate with Slack, PagerDuty, etc.Internal Links
- Proxy Load Balancing — distribute traffic before failover is needed
- Proxy Performance Benchmarks — identify underperforming proxies
- Building a Proxy Pool Manager — implement pool management with failover
- Web Scraping Architecture — design resilient scraping systems
- Proxy Troubleshooting Guide — diagnose common proxy failures
FAQ
How quickly should failover detect a dead proxy?
Aim for detection within 10-30 seconds. Use active health checks every 15-30 seconds combined with passive detection (tracking failed requests). The circuit breaker pattern can detect failures within 3-5 failed requests (seconds in high-throughput scenarios).
Should I use the same proxy provider for primary and backup?
No. True high availability requires multiple providers. If your primary provider has an outage, a backup on the same provider will also be down. Use at least two different providers, ideally with different infrastructure.
How do I handle partial proxy failures (slow but not dead)?
Set latency thresholds in addition to error detection. If a proxy responds but takes over 5 seconds, treat it as degraded and route traffic to faster alternatives. The weighted round-robin pattern can deprioritize slow proxies without fully removing them.
What is the difference between active and passive health checks?
Active health checks send periodic test requests to each proxy (e.g., every 30 seconds). Passive health checks monitor real traffic and mark proxies unhealthy based on actual failures. The best approach combines both for fast detection and accurate status.
How many backup proxies do I need?
A good rule of thumb is a 3:1 ratio — for every 3 primary proxies, have 1 backup ready. For critical operations, maintain a 1:1 ratio. Always have at least one alternative proxy provider configured, even if you rarely need it.
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
Related Reading
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)