Proxies for Blockchain Data Collection and On-Chain Analytics

Proxies for Blockchain Data Collection and On-Chain Analytics

On-chain analytics powers investment decisions, compliance monitoring, security audits, and market intelligence across the crypto industry. Platforms like Nansen, Dune Analytics, and Arkham Intelligence have built billion-dollar businesses on blockchain data analysis. Building your own on-chain analytics capability requires collecting massive amounts of blockchain data — transaction histories, token transfers, contract interactions, and state changes — which demands high-volume RPC access that quickly exceeds any single provider’s rate limits.

Proxies distribute your data collection across multiple IPs and RPC endpoints, enabling the throughput needed for comprehensive blockchain data analysis.

The Scale of Blockchain Data Collection

To illustrate the scale involved, consider what comprehensive Ethereum data collection requires:

  • Block data: ~7,200 blocks per day, each containing 100-400 transactions
  • Transaction receipts: ~1.2 million per day, each requiring a separate RPC call
  • Token transfers: ~2-5 million ERC-20 transfer events per day
  • Contract state: Millions of storage slot reads for DeFi protocol analysis
  • Historical data: 19+ million blocks since genesis

Collecting a single day of complete Ethereum data requires approximately 5 million RPC calls. At standard rate limits (25-100 calls per second per key), this takes 14-55 hours on a single endpoint. With proxy-distributed RPC access, you can complete this in under an hour.

Architecture for Blockchain Data Collection

┌──────────────────────────────────────────────┐
│           Data Collection Engine             │
│                                              │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐   │
│  │  Block   │  │  Event   │  │  State   │   │
│  │  Reader  │  │  Scanner │  │  Reader  │   │
│  └────┬─────┘  └────┬─────┘  └────┬─────┘   │
│       └──────────────┼──────────────┘        │
│              ┌───────▼────────┐              │
│              │ RPC Proxy Pool │              │
│              └───────┬────────┘              │
└──────────────────────┼───────────────────────┘
                       │
          ┌────────────┼────────────┐
          ▼            ▼            ▼
     Alchemy        Infura      QuickNode
    (via proxy 1)  (via proxy 2) (via proxy 3)

Setting Up the Data Collection Infrastructure

Step 1: RPC Pool with Proxy Distribution

import aiohttp
import asyncio
import time
from typing import List, Dict, Any
from dataclasses import dataclass, field
import random

@dataclass
class RPCEndpoint:
    url: str
    proxy: str
    api_key: str
    requests_made: int = 0
    last_request: float = 0
    errors: int = 0
    avg_latency_ms: float = 0

class BlockchainDataProxyPool:
    def __init__(self):
        self.endpoints: List[RPCEndpoint] = []
        self.current_index = 0
        self.lock = asyncio.Lock()

    def add_endpoint(self, url: str, proxy: str, api_key: str = ""):
        self.endpoints.append(RPCEndpoint(
            url=url, proxy=proxy, api_key=api_key
        ))

    async def get_endpoint(self) -> RPCEndpoint:
        async with self.lock:
            # Round-robin with health awareness
            healthy = [
                e for e in self.endpoints if e.errors < 10
            ]
            if not healthy:
                # Reset error counts
                for e in self.endpoints:
                    e.errors = 0
                healthy = self.endpoints

            endpoint = healthy[self.current_index % len(healthy)]
            self.current_index += 1
            return endpoint

    async def rpc_call(self, method: str, params: list) -> Any:
        endpoint = await self.get_endpoint()
        start = time.time()

        payload = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params,
            "id": 1
        }

        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(
                    endpoint.url,
                    json=payload,
                    proxy=f"http://{endpoint.proxy}",
                    timeout=aiohttp.ClientTimeout(total=10)
                ) as resp:
                    latency = (time.time() - start) * 1000
                    endpoint.avg_latency_ms = (
                        endpoint.avg_latency_ms * 0.9 + latency * 0.1
                    )
                    endpoint.requests_made += 1
                    endpoint.last_request = time.time()

                    data = await resp.json()
                    if "error" in data:
                        endpoint.errors += 1
                        raise Exception(data["error"].get("message", "RPC error"))
                    return data.get("result")
        except Exception as e:
            endpoint.errors += 1
            raise

    async def batch_rpc_call(self, calls: List[dict]) -> List[Any]:
        """Execute multiple RPC calls as a batch."""
        endpoint = await self.get_endpoint()

        batch = [
            {
                "jsonrpc": "2.0",
                "method": call["method"],
                "params": call["params"],
                "id": i
            }
            for i, call in enumerate(calls)
        ]

        async with aiohttp.ClientSession() as session:
            async with session.post(
                endpoint.url,
                json=batch,
                proxy=f"http://{endpoint.proxy}",
                timeout=aiohttp.ClientTimeout(total=30)
            ) as resp:
                results = await resp.json()
                endpoint.requests_made += 1
                return [r.get("result") for r in sorted(
                    results, key=lambda x: x["id"]
                )]

# Initialize pool
pool = BlockchainDataProxyPool()
pool.add_endpoint(
    "https://eth-mainnet.g.alchemy.com/v2/KEY1",
    "user:pass@proxy1.example.com:8080"
)
pool.add_endpoint(
    "https://eth-mainnet.g.alchemy.com/v2/KEY2",
    "user:pass@proxy2.example.com:8080"
)
pool.add_endpoint(
    "https://mainnet.infura.io/v3/KEY3",
    "user:pass@proxy3.example.com:8080"
)
pool.add_endpoint(
    "https://eth.quiknode.pro/KEY4/",
    "user:pass@proxy4.example.com:8080"
)

Step 2: Block and Transaction Collector

class BlockCollector:
    def __init__(self, rpc_pool: BlockchainDataProxyPool):
        self.pool = rpc_pool

    async def get_block(self, block_number: int,
                         full_transactions: bool = True) -> dict:
        hex_block = hex(block_number)
        return await self.pool.rpc_call(
            "eth_getBlockByNumber",
            [hex_block, full_transactions]
        )

    async def get_block_range(self, start: int, end: int) -> list:
        """Fetch a range of blocks using concurrent requests."""
        semaphore = asyncio.Semaphore(20)  # Max concurrent requests

        async def fetch_with_semaphore(block_num):
            async with semaphore:
                return await self.get_block(block_num)

        tasks = [
            fetch_with_semaphore(n) for n in range(start, end + 1)
        ]
        return await asyncio.gather(*tasks, return_exceptions=True)

    async def get_transaction_receipt(self, tx_hash: str) -> dict:
        return await self.pool.rpc_call(
            "eth_getTransactionReceipt", [tx_hash]
        )

    async def get_receipts_batch(self, tx_hashes: list) -> list:
        """Fetch multiple transaction receipts using batch RPC."""
        batch_size = 50
        all_receipts = []

        for i in range(0, len(tx_hashes), batch_size):
            batch = tx_hashes[i:i + batch_size]
            calls = [
                {"method": "eth_getTransactionReceipt", "params": [h]}
                for h in batch
            ]
            receipts = await self.pool.batch_rpc_call(calls)
            all_receipts.extend(receipts)

        return all_receipts

Step 3: Event Log Scanner

class EventScanner:
    """Scan for specific events across block ranges."""

    # Common event signatures
    ERC20_TRANSFER = (
        "0xddf252ad1be2c89b69c2b068fc378daa"
        "952ba7f163c4a11628f55a4df523b3ef"
    )
    UNISWAP_SWAP = (
        "0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
    )

    def __init__(self, rpc_pool: BlockchainDataProxyPool):
        self.pool = rpc_pool

    async def get_logs(self, from_block: int, to_block: int,
                        address: str = None,
                        topics: list = None) -> list:
        params = {
            "fromBlock": hex(from_block),
            "toBlock": hex(to_block),
        }
        if address:
            params["address"] = address
        if topics:
            params["topics"] = topics

        return await self.pool.rpc_call("eth_getLogs", [params])

    async def scan_erc20_transfers(self, token_address: str,
                                    from_block: int,
                                    to_block: int) -> list:
        """Get all ERC-20 transfers for a token."""
        # Scan in chunks of 2000 blocks to avoid response size limits
        chunk_size = 2000
        all_transfers = []

        for start in range(from_block, to_block, chunk_size):
            end = min(start + chunk_size - 1, to_block)
            logs = await self.get_logs(
                start, end,
                address=token_address,
                topics=[self.ERC20_TRANSFER]
            )
            if logs:
                for log in logs:
                    transfer = {
                        "block": int(log["blockNumber"], 16),
                        "tx_hash": log["transactionHash"],
                        "from": "0x" + log["topics"][1][26:],
                        "to": "0x" + log["topics"][2][26:],
                        "value": int(log["data"], 16),
                    }
                    all_transfers.append(transfer)

        return all_transfers

    async def scan_dex_swaps(self, pool_address: str,
                              from_block: int,
                              to_block: int) -> list:
        """Get all swap events from a Uniswap V3 pool."""
        chunk_size = 2000
        all_swaps = []

        for start in range(from_block, to_block, chunk_size):
            end = min(start + chunk_size - 1, to_block)
            logs = await self.get_logs(
                start, end,
                address=pool_address,
                topics=[self.UNISWAP_SWAP]
            )
            if logs:
                all_swaps.extend(logs)

        return all_swaps

Step 4: Whale Tracker

class WhaleTracker:
    """Track large token movements and whale wallets."""

    def __init__(self, rpc_pool: BlockchainDataProxyPool):
        self.pool = rpc_pool
        self.scanner = EventScanner(rpc_pool)

    async def find_large_transfers(self, token_address: str,
                                    min_value: int,
                                    blocks_back: int = 1000) -> list:
        """Find token transfers above a minimum value."""
        latest = await self.pool.rpc_call("eth_blockNumber", [])
        latest_block = int(latest, 16)
        from_block = latest_block - blocks_back

        transfers = await self.scanner.scan_erc20_transfers(
            token_address, from_block, latest_block
        )

        whales = [t for t in transfers if t["value"] >= min_value]
        whales.sort(key=lambda x: x["value"], reverse=True)
        return whales

    async def get_wallet_token_balance(self, wallet: str,
                                        token_address: str) -> int:
        """Get ERC-20 token balance for a wallet."""
        # balanceOf(address) call data
        padded_address = wallet[2:].lower().zfill(64)
        data = f"0x70a08231{padded_address}"

        result = await self.pool.rpc_call(
            "eth_call",
            [{"to": token_address, "data": data}, "latest"]
        )
        return int(result, 16) if result else 0

Scaling Data Collection with Proxies

Throughput Benchmarks

SetupRPC EndpointsProxiesEffective RPSDaily Capacity
Single key10252.1M calls
Basic pool33756.5M calls
Standard pool6615013M calls
Advanced pool121230026M calls

With mobile proxies distributing load across multiple RPC API keys, you can achieve 10-30x the throughput of a single endpoint. This is essential for web scraping and data collection workloads that require high volume.

Cost Optimization

RPC calls are priced per compute unit. Optimize your proxy distribution to minimize costs:

  1. Use batch requests where possible — a batch of 50 calls counts as 50 CUs but only one HTTP request
  2. Cache responses for immutable data (historical blocks, finalized transactions)
  3. Use eth_getLogs instead of polling — event-based data collection is more efficient than block-by-block scanning
  4. Distribute across providers to stay within free tier limits on each

Data Storage for Analytics

import sqlite3

def init_analytics_db(db_path: str):
    conn = sqlite3.connect(db_path)
    conn.executescript('''
        CREATE TABLE IF NOT EXISTS blocks (
            number INTEGER PRIMARY KEY,
            hash TEXT,
            timestamp INTEGER,
            transaction_count INTEGER,
            gas_used INTEGER,
            gas_limit INTEGER
        );

        CREATE TABLE IF NOT EXISTS transfers (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            block_number INTEGER,
            tx_hash TEXT,
            token_address TEXT,
            from_address TEXT,
            to_address TEXT,
            value TEXT,
            FOREIGN KEY (block_number) REFERENCES blocks(number)
        );

        CREATE INDEX IF NOT EXISTS idx_transfers_token
            ON transfers(token_address);
        CREATE INDEX IF NOT EXISTS idx_transfers_from
            ON transfers(from_address);
        CREATE INDEX IF NOT EXISTS idx_transfers_to
            ON transfers(to_address);
    ''')
    return conn

Common Pitfalls

Hitting RPC provider limits despite proxies: Each API key has its own rate limit regardless of IP. Use separate API keys per proxy, not just separate IPs with the same key.

Not handling chain reorganizations: Recent blocks can be reorganized. Always wait for finalization before treating data as permanent. For the proxy glossary definition of how chain data consistency works with proxy caching, check the dedicated resource.

Ignoring batch request capabilities: Many RPC providers support batch requests that process multiple calls in a single HTTP request. This dramatically reduces overhead.

Collecting more data than needed: Focus on the specific events, contracts, and time ranges relevant to your analysis rather than trying to index the entire blockchain.

Conclusion

Blockchain data collection at scale is fundamentally a distributed systems problem, and proxy infrastructure is the distribution layer that makes it feasible. By combining multiple RPC providers with proxy-distributed access, you can achieve the throughput needed for comprehensive on-chain analytics — from whale tracking to DEX analysis to protocol monitoring. Start with a focused data collection scope, scale your proxy pool as your analytics needs grow, and always implement proper caching to avoid redundant RPC calls.


Related Reading

Scroll to Top