How to Build a Crypto Price Alert Bot with Proxies and Python

How to Build a Crypto Price Alert Bot with Proxies and Python

A crypto price alert bot monitors token prices across exchanges and notifies you instantly when prices hit your targets. Whether you are tracking breakout levels, watching for flash crashes, or monitoring arbitrage spreads, a custom bot gives you control that commercial alert services cannot match.

The challenge is that polling multiple exchanges at high frequency burns through API rate limits quickly. Proxies solve this by distributing requests across multiple IP addresses, letting your bot monitor hundreds of pairs simultaneously without getting blocked.

This tutorial walks through building a production-ready price alert bot in Python with integrated proxy rotation.

Architecture Overview

┌─────────────────────────────────────┐
│          Price Alert Bot            │
│                                     │
│  ┌───────────┐  ┌────────────────┐  │
│  │  Poller   │──│  Proxy Router  │──┼──→ Exchange APIs
│  └─────┬─────┘  └────────────────┘  │
│        │                            │
│  ┌─────▼─────┐  ┌────────────────┐  │
│  │  Alert    │──│  Notification  │──┼──→ Telegram / Discord / Email
│  │  Engine   │  │  Service       │  │
│  └───────────┘  └────────────────┘  │
└─────────────────────────────────────┘

Prerequisites

pip install aiohttp asyncio python-telegram-bot requests

Step 1: Proxy Manager

import aiohttp
import asyncio
import time
import random
from typing import List, Dict, Optional
from dataclasses import dataclass, field

@dataclass
class ProxyStatus:
    url: str
    healthy: bool = True
    last_used: float = 0
    failures: int = 0
    avg_latency_ms: float = 0
    request_count: int = 0

class AlertBotProxyManager:
    def __init__(self, proxies: List[str]):
        self.proxies = {
            p: ProxyStatus(url=p) for p in proxies
        }
        self.lock = asyncio.Lock()

    async def get_proxy(self) -> str:
        """Get the best available proxy."""
        async with self.lock:
            healthy = [
                p for p in self.proxies.values()
                if p.healthy and p.failures < 5
            ]
            if not healthy:
                # Reset all proxies if none are healthy
                for p in self.proxies.values():
                    p.healthy = True
                    p.failures = 0
                healthy = list(self.proxies.values())

            # Select proxy with lowest recent usage
            selected = min(healthy, key=lambda p: p.last_used)
            selected.last_used = time.time()
            selected.request_count += 1
            return selected.url

    async def report_success(self, proxy_url: str, latency_ms: float):
        async with self.lock:
            p = self.proxies[proxy_url]
            p.failures = max(0, p.failures - 1)
            p.avg_latency_ms = (
                p.avg_latency_ms * 0.9 + latency_ms * 0.1
            )

    async def report_failure(self, proxy_url: str):
        async with self.lock:
            p = self.proxies[proxy_url]
            p.failures += 1
            if p.failures >= 5:
                p.healthy = False

    def get_stats(self) -> List[dict]:
        return [
            {
                "proxy": p.url.split("@")[1] if "@" in p.url else p.url,
                "healthy": p.healthy,
                "failures": p.failures,
                "avg_latency": f"{p.avg_latency_ms:.1f}ms",
                "requests": p.request_count
            }
            for p in self.proxies.values()
        ]

Step 2: Exchange Price Fetchers

class ExchangeFetcher:
    """Fetch prices from multiple exchanges through proxies."""

    EXCHANGE_URLS = {
        "binance": "https://api.binance.com/api/v3/ticker/price",
        "coinbase": "https://api.exchange.coinbase.com/products",
        "bybit": "https://api.bybit.com/v5/market/tickers",
        "kraken": "https://api.kraken.com/0/public/Ticker",
    }

    def __init__(self, proxy_manager: AlertBotProxyManager):
        self.proxy_manager = proxy_manager

    async def fetch_binance_price(self, session: aiohttp.ClientSession,
                                   symbol: str, proxy: str) -> Optional[float]:
        url = f"{self.EXCHANGE_URLS['binance']}?symbol={symbol}"
        start = time.time()
        try:
            async with session.get(
                url,
                proxy=f"http://{proxy}",
                timeout=aiohttp.ClientTimeout(total=5)
            ) as resp:
                latency = (time.time() - start) * 1000
                if resp.status == 200:
                    data = await resp.json()
                    await self.proxy_manager.report_success(proxy, latency)
                    return float(data["price"])
                elif resp.status == 429:
                    await self.proxy_manager.report_failure(proxy)
                    return None
        except Exception:
            await self.proxy_manager.report_failure(proxy)
            return None

    async def fetch_bybit_price(self, session, symbol, proxy) -> Optional[float]:
        url = f"{self.EXCHANGE_URLS['bybit']}?category=spot&symbol={symbol}"
        start = time.time()
        try:
            async with session.get(
                url,
                proxy=f"http://{proxy}",
                timeout=aiohttp.ClientTimeout(total=5)
            ) as resp:
                latency = (time.time() - start) * 1000
                if resp.status == 200:
                    data = await resp.json()
                    await self.proxy_manager.report_success(proxy, latency)
                    tickers = data.get("result", {}).get("list", [])
                    if tickers:
                        return float(tickers[0]["lastPrice"])
                return None
        except Exception:
            await self.proxy_manager.report_failure(proxy)
            return None

    async def fetch_price(self, symbol: str,
                           exchange: str = "binance") -> Optional[float]:
        proxy = await self.proxy_manager.get_proxy()
        async with aiohttp.ClientSession() as session:
            if exchange == "binance":
                return await self.fetch_binance_price(session, symbol, proxy)
            elif exchange == "bybit":
                return await self.fetch_bybit_price(session, symbol, proxy)
        return None

    async def fetch_all_exchanges(self, symbol_map: dict) -> dict:
        """Fetch price from all exchanges simultaneously."""
        proxy = await self.proxy_manager.get_proxy()
        results = {}

        async with aiohttp.ClientSession() as session:
            tasks = {}
            for exchange, symbol in symbol_map.items():
                if exchange == "binance":
                    tasks[exchange] = self.fetch_binance_price(
                        session, symbol, proxy
                    )
                elif exchange == "bybit":
                    tasks[exchange] = self.fetch_bybit_price(
                        session, symbol, proxy
                    )

            for exchange, task in tasks.items():
                try:
                    results[exchange] = await task
                except Exception:
                    results[exchange] = None

        return results

Step 3: Alert Engine

from enum import Enum

class AlertType(Enum):
    PRICE_ABOVE = "price_above"
    PRICE_BELOW = "price_below"
    PERCENT_CHANGE = "percent_change"
    SPREAD_ALERT = "spread_alert"

@dataclass
class Alert:
    alert_id: str
    symbol: str
    alert_type: AlertType
    threshold: float
    exchange: str = "binance"
    triggered: bool = False
    triggered_at: Optional[float] = None
    cooldown_seconds: int = 300  # Don't re-trigger for 5 minutes
    repeat: bool = True

class AlertEngine:
    def __init__(self):
        self.alerts: Dict[str, Alert] = {}
        self.price_history: Dict[str, List[tuple]] = {}

    def add_alert(self, alert: Alert):
        self.alerts[alert.alert_id] = alert

    def remove_alert(self, alert_id: str):
        self.alerts.pop(alert_id, None)

    def check_alerts(self, symbol: str, current_price: float,
                      exchange: str) -> List[Alert]:
        """Check all alerts against current price."""
        triggered = []
        now = time.time()

        # Store price history
        key = f"{exchange}:{symbol}"
        if key not in self.price_history:
            self.price_history[key] = []
        self.price_history[key].append((now, current_price))
        # Keep last hour of data
        self.price_history[key] = [
            (t, p) for t, p in self.price_history[key]
            if now - t < 3600
        ]

        for alert in self.alerts.values():
            if alert.symbol != symbol or alert.exchange != exchange:
                continue

            # Check cooldown
            if alert.triggered_at and not alert.repeat:
                continue
            if (alert.triggered_at and
                now - alert.triggered_at < alert.cooldown_seconds):
                continue

            should_trigger = False

            if alert.alert_type == AlertType.PRICE_ABOVE:
                should_trigger = current_price >= alert.threshold

            elif alert.alert_type == AlertType.PRICE_BELOW:
                should_trigger = current_price <= alert.threshold

            elif alert.alert_type == AlertType.PERCENT_CHANGE:
                history = self.price_history[key]
                if len(history) >= 2:
                    oldest_price = history[0][1]
                    pct_change = abs(
                        (current_price - oldest_price) / oldest_price * 100
                    )
                    should_trigger = pct_change >= alert.threshold

            if should_trigger:
                alert.triggered = True
                alert.triggered_at = now
                triggered.append(alert)

        return triggered

Step 4: Notification Service

import aiohttp

class NotificationService:
    def __init__(self, telegram_token: str = None,
                 telegram_chat_id: str = None,
                 discord_webhook: str = None):
        self.telegram_token = telegram_token
        self.telegram_chat_id = telegram_chat_id
        self.discord_webhook = discord_webhook

    async def send_alert(self, alert: Alert, current_price: float):
        message = self._format_message(alert, current_price)

        tasks = []
        if self.telegram_token:
            tasks.append(self._send_telegram(message))
        if self.discord_webhook:
            tasks.append(self._send_discord(message))

        await asyncio.gather(*tasks, return_exceptions=True)

    def _format_message(self, alert: Alert, price: float) -> str:
        direction = "above" if alert.alert_type == AlertType.PRICE_ABOVE else "below"
        return (
            f"Price Alert: {alert.symbol}\n"
            f"Exchange: {alert.exchange}\n"
            f"Current Price: ${price:,.2f}\n"
            f"Alert: Price {direction} ${alert.threshold:,.2f}\n"
            f"Time: {time.strftime('%Y-%m-%d %H:%M:%S UTC', time.gmtime())}"
        )

    async def _send_telegram(self, message: str):
        url = f"https://api.telegram.org/bot{self.telegram_token}/sendMessage"
        async with aiohttp.ClientSession() as session:
            await session.post(url, json={
                "chat_id": self.telegram_chat_id,
                "text": message,
                "parse_mode": "HTML"
            })

    async def _send_discord(self, message: str):
        async with aiohttp.ClientSession() as session:
            await session.post(self.discord_webhook, json={
                "content": message
            })

Step 5: Main Bot Loop

class CryptoPriceAlertBot:
    def __init__(self, proxies: list, telegram_token: str = None,
                 telegram_chat_id: str = None,
                 discord_webhook: str = None):
        self.proxy_manager = AlertBotProxyManager(proxies)
        self.fetcher = ExchangeFetcher(self.proxy_manager)
        self.alert_engine = AlertEngine()
        self.notifications = NotificationService(
            telegram_token=telegram_token,
            telegram_chat_id=telegram_chat_id,
            discord_webhook=discord_webhook
        )
        self.running = False

    def add_price_alert(self, symbol: str, target_price: float,
                         direction: str = "above",
                         exchange: str = "binance"):
        alert_type = (AlertType.PRICE_ABOVE if direction == "above"
                      else AlertType.PRICE_BELOW)
        alert = Alert(
            alert_id=f"{exchange}_{symbol}_{direction}_{target_price}",
            symbol=symbol,
            alert_type=alert_type,
            threshold=target_price,
            exchange=exchange
        )
        self.alert_engine.add_alert(alert)

    async def run(self, poll_interval: float = 2.0):
        """Main monitoring loop."""
        self.running = True
        print(f"Bot started. Monitoring {len(self.alert_engine.alerts)} alerts.")

        while self.running:
            # Collect all unique symbol/exchange pairs from alerts
            pairs = set()
            for alert in self.alert_engine.alerts.values():
                pairs.add((alert.symbol, alert.exchange))

            # Fetch prices for all pairs
            for symbol, exchange in pairs:
                try:
                    price = await self.fetcher.fetch_price(symbol, exchange)
                    if price is None:
                        continue

                    # Check alerts
                    triggered = self.alert_engine.check_alerts(
                        symbol, price, exchange
                    )

                    for alert in triggered:
                        await self.notifications.send_alert(alert, price)
                        print(f"ALERT TRIGGERED: {alert.symbol} @ ${price}")

                except Exception as e:
                    print(f"Error fetching {symbol}/{exchange}: {e}")

            await asyncio.sleep(poll_interval)

# Usage
async def main():
    bot = CryptoPriceAlertBot(
        proxies=[
            "user:pass@proxy1.example.com:8080",
            "user:pass@proxy2.example.com:8080",
            "user:pass@proxy3.example.com:8080",
            "user:pass@proxy4.example.com:8080",
        ],
        telegram_token="YOUR_BOT_TOKEN",
        telegram_chat_id="YOUR_CHAT_ID"
    )

    # Set up alerts
    bot.add_price_alert("BTCUSDT", 100000, "above", "binance")
    bot.add_price_alert("BTCUSDT", 90000, "below", "binance")
    bot.add_price_alert("ETHUSDT", 5000, "above", "binance")
    bot.add_price_alert("SOLUSDT", 300, "above", "bybit")
    bot.add_price_alert("BTCUSDT", 95000, "below", "bybit")

    await bot.run(poll_interval=2.0)

asyncio.run(main())

Proxy Sizing for Alert Bots

The number of proxies you need depends on your monitoring scope:

Monitoring ScopePairsPoll RateProxies
Basic (5 pairs, 1 exchange)55s2
Standard (20 pairs, 2 exchanges)402s4
Advanced (100 pairs, 3 exchanges)3001s8
Professional (500+ pairs)1500+500ms15+

Using mobile proxies for alert bots provides the best combination of reliability and trust. Exchange APIs treat mobile IPs as legitimate user traffic, resulting in fewer rate limit hits and more consistent data delivery. For a deeper technical understanding of rate limiting and how proxies distribute request load, the proxy glossary covers these concepts.

Enhancements and Next Steps

WebSocket Price Feeds

For faster alerts, replace HTTP polling with WebSocket streaming:

async def websocket_price_stream(self, symbol: str, callback):
    proxy = await self.proxy_manager.get_proxy()
    ws_url = f"wss://stream.binance.com:9443/ws/{symbol.lower()}@ticker"

    async with aiohttp.ClientSession() as session:
        async with session.ws_connect(
            ws_url,
            proxy=f"http://{proxy}"
        ) as ws:
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = msg.json()
                    price = float(data["c"])  # Current price
                    await callback(symbol, price)

Persistent Alert Storage

Store alerts in a SQLite database so they survive bot restarts. Store price history in a time-series database like TimescaleDB for historical analysis.

Multi-Exchange Spread Alerts

Alert when the price difference between two exchanges exceeds a threshold — useful for identifying arbitrage opportunities before they close.

Conclusion

A custom crypto price alert bot with proxy infrastructure gives you monitoring capabilities that no commercial service can match — unlimited pairs, custom alert logic, and the speed of direct exchange API access. The proxy layer ensures your bot operates sustainably without hitting rate limits, even when monitoring hundreds of trading pairs across multiple exchanges. Start with the basic setup and add WebSocket feeds and advanced alert types as your needs grow.


Related Reading

Scroll to Top