How to Scrape CoinGecko and CoinMarketCap Data at Scale

How to Scrape CoinGecko and CoinMarketCap Data at Scale

CoinGecko and CoinMarketCap are the two most comprehensive sources of cryptocurrency market data. They aggregate prices, volumes, market caps, and metadata for over 10,000 tokens each. For data analysts, trading bot developers, and crypto researchers, accessing this data at scale is essential — but both platforms enforce strict rate limits that make bulk data collection impossible without proxy infrastructure.

This guide covers practical strategies for scraping CoinGecko and CoinMarketCap data using Python and proxies, including API optimization, web scraping fallbacks, and data storage.

API vs Web Scraping: Choosing Your Approach

CoinGecko API

CoinGecko offers a free API tier and paid plans:

TierRate LimitMonthly CostBest For
Free10-30 calls/min$0Basic monitoring
Analyst500 calls/min$14/moSmall-scale analysis
Pro1,000 calls/min$49/moMedium-scale projects

Even on the Pro plan, 1,000 calls per minute is insufficient for scraping detailed data on 10,000+ tokens. Proxies multiply your effective rate limit by distributing requests across multiple API keys and IP addresses.

CoinMarketCap API

CoinMarketCap’s API has similar tiered pricing:

TierDaily CreditsMonthly Cost
Basic10,000$0
Hobbyist40,000$29/mo
Startup120,000$79/mo
Standard600,000$299/mo

Each API call costs 1-5 credits depending on the endpoint. With proxies, you can use multiple free-tier keys to achieve higher throughput.

Setting Up the Scraping Infrastructure

Proxy Manager for Data Aggregator Scraping

import aiohttp
import asyncio
import time
from typing import List, Dict, Optional
from dataclasses import dataclass
import random

@dataclass
class APIKeyProxy:
    api_key: str
    proxy: str
    calls_made: int = 0
    last_call: float = 0
    cooldown_until: float = 0

class CryptoDataProxyManager:
    def __init__(self):
        self.coingecko_keys: List[APIKeyProxy] = []
        self.cmc_keys: List[APIKeyProxy] = []
        self.web_proxies: List[str] = []
        self.web_proxy_index = 0

    def add_coingecko_key(self, api_key: str, proxy: str):
        self.coingecko_keys.append(APIKeyProxy(
            api_key=api_key, proxy=proxy
        ))

    def add_cmc_key(self, api_key: str, proxy: str):
        self.cmc_keys.append(APIKeyProxy(
            api_key=api_key, proxy=proxy
        ))

    def add_web_proxy(self, proxy: str):
        self.web_proxies.append(proxy)

    def get_coingecko_key(self) -> APIKeyProxy:
        now = time.time()
        available = [
            k for k in self.coingecko_keys
            if now > k.cooldown_until
        ]
        if not available:
            # Return the one with the earliest cooldown end
            return min(self.coingecko_keys, key=lambda k: k.cooldown_until)
        # Return least-used key
        return min(available, key=lambda k: k.calls_made)

    def get_cmc_key(self) -> APIKeyProxy:
        now = time.time()
        available = [
            k for k in self.cmc_keys if now > k.cooldown_until
        ]
        if not available:
            return min(self.cmc_keys, key=lambda k: k.cooldown_until)
        return min(available, key=lambda k: k.calls_made)

    def get_web_proxy(self) -> str:
        proxy = self.web_proxies[
            self.web_proxy_index % len(self.web_proxies)
        ]
        self.web_proxy_index += 1
        return proxy

# Initialize
pm = CryptoDataProxyManager()
pm.add_coingecko_key("CG-key-1", "user:pass@proxy1.example.com:8080")
pm.add_coingecko_key("CG-key-2", "user:pass@proxy2.example.com:8080")
pm.add_coingecko_key("CG-key-3", "user:pass@proxy3.example.com:8080")
pm.add_cmc_key("CMC-key-1", "user:pass@proxy4.example.com:8080")
pm.add_cmc_key("CMC-key-2", "user:pass@proxy5.example.com:8080")
pm.add_web_proxy("user:pass@proxy6.example.com:8080")
pm.add_web_proxy("user:pass@proxy7.example.com:8080")

Scraping CoinGecko

API-Based Collection

class CoinGeckoScraper:
    BASE_URL = "https://api.coingecko.com/api/v3"
    PRO_URL = "https://pro-api.coingecko.com/api/v3"

    def __init__(self, proxy_manager: CryptoDataProxyManager):
        self.pm = proxy_manager

    async def _api_call(self, session, endpoint: str,
                         params: dict = None) -> Optional[dict]:
        key_proxy = self.pm.get_coingecko_key()

        # Wait for cooldown if needed
        now = time.time()
        if now < key_proxy.cooldown_until:
            await asyncio.sleep(key_proxy.cooldown_until - now)

        url = f"{self.PRO_URL}{endpoint}"
        headers = {"x-cg-pro-api-key": key_proxy.api_key}

        try:
            async with session.get(
                url,
                headers=headers,
                params=params or {},
                proxy=f"http://{key_proxy.proxy}",
                timeout=aiohttp.ClientTimeout(total=15)
            ) as resp:
                key_proxy.calls_made += 1
                key_proxy.last_call = time.time()

                if resp.status == 200:
                    return await resp.json()
                elif resp.status == 429:
                    # Rate limited — set cooldown
                    retry_after = int(
                        resp.headers.get("Retry-After", 60)
                    )
                    key_proxy.cooldown_until = time.time() + retry_after
                    return None
                else:
                    return None
        except Exception as e:
            key_proxy.cooldown_until = time.time() + 5
            return None

    async def get_coins_list(self, session) -> list:
        """Get list of all coins with IDs."""
        return await self._api_call(session, "/coins/list") or []

    async def get_coin_market_data(self, session,
                                    vs_currency: str = "usd",
                                    page: int = 1,
                                    per_page: int = 250) -> list:
        """Get market data for coins (paginated)."""
        params = {
            "vs_currency": vs_currency,
            "order": "market_cap_desc",
            "per_page": per_page,
            "page": page,
            "sparkline": "false",
            "price_change_percentage": "1h,24h,7d,30d"
        }
        return await self._api_call(
            session, "/coins/markets", params
        ) or []

    async def get_all_market_data(self, session,
                                   vs_currency: str = "usd",
                                   max_coins: int = 5000) -> list:
        """Fetch market data for all coins across pages."""
        all_data = []
        per_page = 250
        pages = (max_coins // per_page) + 1

        for page in range(1, pages + 1):
            data = await self.get_coin_market_data(
                session, vs_currency, page, per_page
            )
            if not data:
                break
            all_data.extend(data)
            # Small delay between pages
            await asyncio.sleep(0.5)

        return all_data

    async def get_coin_details(self, session, coin_id: str) -> dict:
        """Get detailed data for a specific coin."""
        params = {
            "localization": "false",
            "tickers": "true",
            "market_data": "true",
            "community_data": "true",
            "developer_data": "true",
        }
        return await self._api_call(
            session, f"/coins/{coin_id}", params
        )

    async def get_historical_prices(self, session, coin_id: str,
                                     days: int = 365) -> dict:
        """Get historical price data."""
        params = {
            "vs_currency": "usd",
            "days": str(days),
            "interval": "daily" if days > 90 else ""
        }
        return await self._api_call(
            session, f"/coins/{coin_id}/market_chart", params
        )

    async def bulk_collect_details(self, session,
                                    coin_ids: list) -> list:
        """Collect detailed data for multiple coins with rate limiting."""
        results = []
        semaphore = asyncio.Semaphore(5)

        async def fetch_with_limit(coin_id):
            async with semaphore:
                data = await self.get_coin_details(session, coin_id)
                if data:
                    results.append(data)
                await asyncio.sleep(0.2)  # Respect rate limits

        tasks = [fetch_with_limit(cid) for cid in coin_ids]
        await asyncio.gather(*tasks)
        return results

Scraping CoinMarketCap

API-Based Collection

class CoinMarketCapScraper:
    BASE_URL = "https://pro-api.coinmarketcap.com"

    def __init__(self, proxy_manager: CryptoDataProxyManager):
        self.pm = proxy_manager

    async def _api_call(self, session, endpoint: str,
                         params: dict = None) -> Optional[dict]:
        key_proxy = self.pm.get_cmc_key()
        url = f"{self.BASE_URL}{endpoint}"
        headers = {
            "X-CMC_PRO_API_KEY": key_proxy.api_key,
            "Accept": "application/json",
        }

        try:
            async with session.get(
                url,
                headers=headers,
                params=params or {},
                proxy=f"http://{key_proxy.proxy}",
                timeout=aiohttp.ClientTimeout(total=15)
            ) as resp:
                key_proxy.calls_made += 1
                if resp.status == 200:
                    data = await resp.json()
                    return data.get("data")
                elif resp.status == 429:
                    key_proxy.cooldown_until = time.time() + 60
                    return None
        except Exception:
            return None

    async def get_latest_listings(self, session,
                                   start: int = 1,
                                   limit: int = 200) -> list:
        """Get latest market data sorted by market cap."""
        params = {
            "start": str(start),
            "limit": str(limit),
            "convert": "USD",
            "sort": "market_cap",
            "sort_dir": "desc",
        }
        return await self._api_call(
            session, "/v1/cryptocurrency/listings/latest", params
        ) or []

    async def get_all_listings(self, session,
                                max_coins: int = 5000) -> list:
        """Paginate through all listings."""
        all_data = []
        per_page = 200

        for start in range(1, max_coins, per_page):
            data = await self.get_latest_listings(
                session, start, per_page
            )
            if not data:
                break
            all_data.extend(data)
            await asyncio.sleep(1)

        return all_data

    async def get_coin_metadata(self, session,
                                 coin_ids: list) -> dict:
        """Get metadata (description, links, logo) for coins."""
        ids_str = ",".join(str(i) for i in coin_ids[:100])
        params = {"id": ids_str}
        return await self._api_call(
            session, "/v2/cryptocurrency/info", params
        )

    async def get_global_metrics(self, session) -> dict:
        """Get global crypto market metrics."""
        return await self._api_call(
            session, "/v1/global-metrics/quotes/latest"
        )

Web Scraping Fallback

When API limits are exhausted, fall back to web scraping:

class CoinGeckoWebScraper:
    """Scrape CoinGecko web pages when API limits are reached."""

    def __init__(self, proxy_manager: CryptoDataProxyManager):
        self.pm = proxy_manager

    async def scrape_coin_page(self, session, coin_slug: str) -> dict:
        proxy = self.pm.get_web_proxy()
        url = f"https://www.coingecko.com/en/coins/{coin_slug}"

        headers = {
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                          "AppleWebKit/537.36 Chrome/120.0.0.0",
            "Accept": "text/html,application/xhtml+xml",
            "Accept-Language": "en-US,en;q=0.9",
        }

        async with session.get(
            url,
            headers=headers,
            proxy=f"http://{proxy}",
            timeout=aiohttp.ClientTimeout(total=20)
        ) as resp:
            if resp.status == 200:
                html = await resp.text()
                return self._parse_coin_page(html, coin_slug)
        return None

    def _parse_coin_page(self, html: str, coin_slug: str) -> dict:
        from bs4 import BeautifulSoup
        soup = BeautifulSoup(html, "html.parser")

        data = {"slug": coin_slug}

        # Extract price
        price_el = soup.select_one('[data-target="price.price"]')
        if price_el:
            data["price"] = price_el.text.strip()

        # Extract market cap
        mc_el = soup.select_one('[data-target="price.market-cap"]')
        if mc_el:
            data["market_cap"] = mc_el.text.strip()

        return data

Combining Both Sources

class CryptoDataAggregator:
    def __init__(self, proxy_manager: CryptoDataProxyManager):
        self.coingecko = CoinGeckoScraper(proxy_manager)
        self.cmc = CoinMarketCapScraper(proxy_manager)

    async def get_comprehensive_data(self, top_n: int = 1000) -> list:
        """Combine data from both sources for richer dataset."""
        async with aiohttp.ClientSession() as session:
            cg_task = self.coingecko.get_all_market_data(
                session, max_coins=top_n
            )
            cmc_task = self.cmc.get_all_listings(
                session, max_coins=top_n
            )

            cg_data, cmc_data = await asyncio.gather(
                cg_task, cmc_task
            )

        # Merge datasets by symbol
        merged = {}
        for coin in cg_data:
            symbol = coin.get("symbol", "").upper()
            merged[symbol] = {
                "symbol": symbol,
                "name": coin.get("name"),
                "cg_price": coin.get("current_price"),
                "cg_market_cap": coin.get("market_cap"),
                "cg_volume": coin.get("total_volume"),
                "cg_price_change_24h": coin.get(
                    "price_change_percentage_24h"
                ),
            }

        for coin in cmc_data:
            symbol = coin.get("symbol", "").upper()
            quote = coin.get("quote", {}).get("USD", {})
            if symbol in merged:
                merged[symbol].update({
                    "cmc_price": quote.get("price"),
                    "cmc_market_cap": quote.get("market_cap"),
                    "cmc_volume": quote.get("volume_24h"),
                    "cmc_price_change_24h": quote.get(
                        "percent_change_24h"
                    ),
                })
            else:
                merged[symbol] = {
                    "symbol": symbol,
                    "name": coin.get("name"),
                    "cmc_price": quote.get("price"),
                    "cmc_market_cap": quote.get("market_cap"),
                    "cmc_volume": quote.get("volume_24h"),
                }

        return list(merged.values())

Proxy Sizing

Data ScopeCoinGecko KeysCMC KeysProxiesUpdate Freq
Top 100 tokens1125 min
Top 1,000 tokens22415 min
Top 5,000 tokens3-53-56-830 min
Full coverage5+5+10+1 hour

Mobile proxies provide the best reliability for sustained data collection from CoinGecko and CoinMarketCap, as both platforms use Cloudflare protection that flags datacenter IPs. For general web scraping strategies including rate limit management and anti-detection, the dedicated guide provides comprehensive coverage.

Data Storage

import sqlite3
import json

def store_market_data(db_path: str, data: list):
    conn = sqlite3.connect(db_path)
    conn.execute('''CREATE TABLE IF NOT EXISTS market_data (
        id INTEGER PRIMARY KEY AUTOINCREMENT,
        symbol TEXT,
        name TEXT,
        price REAL,
        market_cap REAL,
        volume_24h REAL,
        price_change_24h REAL,
        source TEXT,
        collected_at REAL
    )''')

    now = time.time()
    for coin in data:
        conn.execute(
            '''INSERT INTO market_data
               (symbol, name, price, market_cap, volume_24h,
                price_change_24h, source, collected_at)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?)''',
            (
                coin.get("symbol"),
                coin.get("name"),
                coin.get("cg_price") or coin.get("cmc_price"),
                coin.get("cg_market_cap") or coin.get("cmc_market_cap"),
                coin.get("cg_volume") or coin.get("cmc_volume"),
                coin.get("cg_price_change_24h") or coin.get("cmc_price_change_24h"),
                "combined",
                now
            )
        )
    conn.commit()
    conn.close()

Conclusion

Scraping CoinGecko and CoinMarketCap at scale requires a layered approach: start with API access distributed across multiple keys and proxies, fall back to web scraping when API limits are exhausted, and combine data from both sources for maximum coverage. The proxy infrastructure ensures you maintain consistent access regardless of rate limits, while proper data storage and caching minimize redundant requests. For any serious crypto data operation, this dual-source approach with proxy distribution is the standard approach.


Related Reading

Scroll to Top