How to Scrape Real-Time Crypto Prices from Multiple Exchanges

How to Scrape Real-Time Crypto Prices from Multiple Exchanges

Real-time cryptocurrency price data is the foundation of trading algorithms, market analysis, and portfolio tracking tools. While exchanges offer APIs, their rate limits are restrictive, and many valuable data points — order book depth, historical trades, funding rates — require aggressive polling that quickly triggers blocks.

Scraping crypto prices from multiple exchanges simultaneously requires proxy infrastructure that handles rate limits, maintains session consistency, and delivers data with minimal latency. This guide covers the complete setup from architecture to production-ready Python code.

Why Scrape Instead of Using APIs Directly?

Exchange APIs are useful but limited:

  • Rate limits: Binance caps at 1,200 requests per minute per IP. If you monitor 500 trading pairs across 5 exchanges, you exhaust limits within a single polling cycle.
  • Data gaps: Many exchanges do not expose all data through their APIs. Historical order book snapshots, liquidation data, and whale transaction alerts often require scraping the web interface.
  • Redundancy: API outages happen. A scraping-based fallback ensures continuous data collection.
  • Cross-exchange normalization: Scraping gives you raw data you can normalize across exchanges into a consistent format.

For a deeper dive into web scraping with proxies, including rotation strategies and anti-detection techniques, the dedicated guide covers the fundamentals.

Architecture Overview

┌─────────────────┐     ┌──────────────┐     ┌──────────────┐
│  Price Scraper   │────▶│ Proxy Router │────▶│  Exchange 1  │
│  (Python/async)  │     │              │────▶│  Exchange 2  │
│                  │     │              │────▶│  Exchange 3  │
└────────┬────────┘     └──────────────┘     └──────────────┘
         │
         ▼
┌─────────────────┐
│   Data Store     │
│  (Redis/TimescaleDB)│
└─────────────────┘

Setting Up the Multi-Exchange Scraper

Step 1: Proxy Configuration

import aiohttp
import asyncio
import time
from typing import Dict, List, Optional
from dataclasses import dataclass

@dataclass
class ExchangeProxy:
    exchange: str
    proxy_url: str
    last_request: float = 0
    min_interval: float = 0.05  # 50ms between requests
    failures: int = 0

class ProxyPool:
    def __init__(self):
        self.pools: Dict[str, List[ExchangeProxy]] = {}
        self.index: Dict[str, int] = {}

    def add_exchange_proxies(self, exchange: str, proxies: list,
                              min_interval: float = 0.05):
        self.pools[exchange] = [
            ExchangeProxy(
                exchange=exchange,
                proxy_url=p,
                min_interval=min_interval
            ) for p in proxies
        ]
        self.index[exchange] = 0

    def get_proxy(self, exchange: str) -> ExchangeProxy:
        pool = self.pools[exchange]
        # Round-robin with rate limit awareness
        for _ in range(len(pool)):
            idx = self.index[exchange] % len(pool)
            self.index[exchange] += 1
            proxy = pool[idx]

            elapsed = time.time() - proxy.last_request
            if elapsed >= proxy.min_interval and proxy.failures < 5:
                proxy.last_request = time.time()
                return proxy

        # All proxies busy; return least recently used
        return min(pool, key=lambda p: p.last_request)

# Initialize proxy pool
proxy_pool = ProxyPool()
proxy_pool.add_exchange_proxies("binance", [
    "user:pass@proxy1.example.com:8080",
    "user:pass@proxy2.example.com:8080",
    "user:pass@proxy3.example.com:8080",
], min_interval=0.05)

proxy_pool.add_exchange_proxies("coinbase", [
    "user:pass@proxy4.example.com:8080",
    "user:pass@proxy5.example.com:8080",
], min_interval=0.1)

proxy_pool.add_exchange_proxies("bybit", [
    "user:pass@proxy6.example.com:8080",
    "user:pass@proxy7.example.com:8080",
], min_interval=0.05)

Step 2: Exchange-Specific Scrapers

class BinanceScraper:
    BASE_URL = "https://api.binance.com"

    async def get_ticker(self, session: aiohttp.ClientSession,
                          symbol: str, proxy: ExchangeProxy) -> dict:
        url = f"{self.BASE_URL}/api/v3/ticker/24hr?symbol={symbol}"
        async with session.get(
            url,
            proxy=f"http://{proxy.proxy_url}",
            timeout=aiohttp.ClientTimeout(total=5)
        ) as resp:
            if resp.status == 429:
                proxy.failures += 1
                raise Exception("Rate limited")
            data = await resp.json()
            return {
                "exchange": "binance",
                "symbol": symbol,
                "price": float(data["lastPrice"]),
                "volume_24h": float(data["volume"]),
                "high_24h": float(data["highPrice"]),
                "low_24h": float(data["lowPrice"]),
                "timestamp": time.time()
            }

    async def get_order_book(self, session, symbol, proxy, depth=20):
        url = f"{self.BASE_URL}/api/v3/depth?symbol={symbol}&limit={depth}"
        async with session.get(
            url,
            proxy=f"http://{proxy.proxy_url}",
            timeout=aiohttp.ClientTimeout(total=5)
        ) as resp:
            data = await resp.json()
            return {
                "exchange": "binance",
                "symbol": symbol,
                "bids": [(float(p), float(q)) for p, q in data["bids"]],
                "asks": [(float(p), float(q)) for p, q in data["asks"]],
                "timestamp": time.time()
            }

class CoinbaseScraper:
    BASE_URL = "https://api.exchange.coinbase.com"

    async def get_ticker(self, session, symbol, proxy):
        # Coinbase uses BTC-USD format
        url = f"{self.BASE_URL}/products/{symbol}/ticker"
        async with session.get(
            url,
            proxy=f"http://{proxy.proxy_url}",
            timeout=aiohttp.ClientTimeout(total=5)
        ) as resp:
            data = await resp.json()
            return {
                "exchange": "coinbase",
                "symbol": symbol,
                "price": float(data["price"]),
                "volume_24h": float(data["volume"]),
                "bid": float(data["bid"]),
                "ask": float(data["ask"]),
                "timestamp": time.time()
            }

class BybitScraper:
    BASE_URL = "https://api.bybit.com"

    async def get_ticker(self, session, symbol, proxy):
        url = f"{self.BASE_URL}/v5/market/tickers?category=spot&symbol={symbol}"
        async with session.get(
            url,
            proxy=f"http://{proxy.proxy_url}",
            timeout=aiohttp.ClientTimeout(total=5)
        ) as resp:
            data = await resp.json()
            ticker = data["result"]["list"][0]
            return {
                "exchange": "bybit",
                "symbol": symbol,
                "price": float(ticker["lastPrice"]),
                "volume_24h": float(ticker["volume24h"]),
                "high_24h": float(ticker["highPrice24h"]),
                "low_24h": float(ticker["lowPrice24h"]),
                "timestamp": time.time()
            }

Step 3: Unified Price Collector

class CryptoPriceCollector:
    def __init__(self, proxy_pool: ProxyPool):
        self.proxy_pool = proxy_pool
        self.scrapers = {
            "binance": BinanceScraper(),
            "coinbase": CoinbaseScraper(),
            "bybit": BybitScraper(),
        }
        self.symbol_mapping = {
            "BTC": {
                "binance": "BTCUSDT",
                "coinbase": "BTC-USD",
                "bybit": "BTCUSDT"
            },
            "ETH": {
                "binance": "ETHUSDT",
                "coinbase": "ETH-USD",
                "bybit": "ETHUSDT"
            },
            "SOL": {
                "binance": "SOLUSDT",
                "coinbase": "SOL-USD",
                "bybit": "SOLUSDT"
            }
        }

    async def collect_prices(self, assets: list) -> list:
        """Collect prices for all assets across all exchanges."""
        async with aiohttp.ClientSession() as session:
            tasks = []
            for asset in assets:
                for exchange, symbol in self.symbol_mapping[asset].items():
                    proxy = self.proxy_pool.get_proxy(exchange)
                    scraper = self.scrapers[exchange]
                    task = self._safe_fetch(
                        scraper.get_ticker(session, symbol, proxy),
                        exchange, asset
                    )
                    tasks.append(task)

            results = await asyncio.gather(*tasks)
            return [r for r in results if r is not None]

    async def _safe_fetch(self, coro, exchange, asset):
        try:
            return await coro
        except Exception as e:
            print(f"Failed {exchange}/{asset}: {e}")
            return None

    def find_arbitrage(self, prices: list, min_spread: float = 0.001):
        """Identify arbitrage opportunities from collected prices."""
        by_asset = {}
        for p in prices:
            symbol = p["symbol"]
            by_asset.setdefault(symbol, []).append(p)

        opportunities = []
        for symbol, exchange_prices in by_asset.items():
            if len(exchange_prices) < 2:
                continue
            sorted_prices = sorted(exchange_prices, key=lambda x: x["price"])
            lowest = sorted_prices[0]
            highest = sorted_prices[-1]
            spread = (highest["price"] - lowest["price"]) / lowest["price"]
            if spread > min_spread:
                opportunities.append({
                    "asset": symbol,
                    "buy_exchange": lowest["exchange"],
                    "buy_price": lowest["price"],
                    "sell_exchange": highest["exchange"],
                    "sell_price": highest["price"],
                    "spread_pct": round(spread * 100, 4)
                })
        return opportunities

# Run the collector
async def main():
    collector = CryptoPriceCollector(proxy_pool)

    while True:
        prices = await collector.collect_prices(["BTC", "ETH", "SOL"])
        arb_opps = collector.find_arbitrage(prices)

        for opp in arb_opps:
            print(f"[ARB] {opp['asset']}: Buy on {opp['buy_exchange']} "
                  f"@ {opp['buy_price']}, Sell on {opp['sell_exchange']} "
                  f"@ {opp['sell_price']} ({opp['spread_pct']}%)")

        await asyncio.sleep(1)  # Poll every second

asyncio.run(main())

Handling Anti-Scraping Measures

Exchange Rate Limit Headers

Most exchanges return rate limit information in response headers. Parse these to dynamically adjust your scraping speed:

async def adaptive_fetch(session, url, proxy):
    async with session.get(url, proxy=f"http://{proxy}") as resp:
        remaining = int(resp.headers.get("X-MBX-USED-WEIGHT-1m", 0))
        limit = 1200  # Binance default

        if remaining > limit * 0.8:
            # Approaching limit — slow down
            await asyncio.sleep(2)
        elif remaining > limit * 0.95:
            # Critical — pause significantly
            await asyncio.sleep(10)

        return await resp.json()

IP Rotation Strategy

For price scraping, use mobile proxies with automatic rotation. Mobile IPs carry high trust scores with exchange infrastructure, meaning your scraping operations face fewer challenges and CAPTCHAs than datacenter or residential alternatives.

Rotate proxies every 100-200 requests per exchange, or immediately upon receiving a 429 (Too Many Requests) response.

User-Agent and Header Management

Exchanges inspect request headers to distinguish bots from legitimate users:

HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                  "AppleWebKit/537.36 (KHTML, like Gecko) "
                  "Chrome/120.0.0.0 Safari/537.36",
    "Accept": "application/json",
    "Accept-Language": "en-US,en;q=0.9",
    "Accept-Encoding": "gzip, deflate, br",
}

Data Storage for Collected Prices

Redis for Real-Time Access

import redis
import json

r = redis.Redis(host='localhost', port=6379, db=0)

def store_price(price_data: dict):
    key = f"price:{price_data['exchange']}:{price_data['symbol']}"
    r.set(key, json.dumps(price_data), ex=60)  # 60s expiry

    # Also store in sorted set for time-series queries
    ts_key = f"ts:{price_data['exchange']}:{price_data['symbol']}"
    r.zadd(ts_key, {json.dumps(price_data): price_data['timestamp']})
    # Keep only last hour of data
    cutoff = time.time() - 3600
    r.zremrangebyscore(ts_key, '-inf', cutoff)

Scaling Considerations

MetricSmall SetupMedium SetupLarge Setup
Exchanges monitored35-815+
Trading pairs20100500+
Polling interval5s1s100ms
Proxies needed5-815-2550+
Bandwidth/day2-5 GB10-30 GB100+ GB

Common Mistakes

Polling too aggressively without proxies: You will get banned within minutes. Always route through a proxy pool.

Not handling WebSocket disconnections: For real-time data, WebSocket connections drop regularly. Implement automatic reconnection with exponential backoff.

Ignoring exchange maintenance windows: Exchanges schedule maintenance that returns error responses. Your scraper should detect maintenance mode and pause rather than burning through proxy IPs on failed requests.

Storing raw data without normalization: Each exchange uses different symbol formats, timestamp formats, and data structures. Normalize immediately upon collection, not after.

Conclusion

Scraping real-time crypto prices from multiple exchanges is a foundational capability for any serious crypto operation. The combination of async Python, properly managed proxy pools, and exchange-specific scraping logic gives you reliable, high-frequency price data that powers trading algorithms, arbitrage detection, and market analysis. Start with three exchanges and the core trading pairs, then scale your proxy pool as you add more data sources.


Related Reading

Scroll to Top