Proxies for Blockchain Data Collection and On-Chain Analytics
On-chain analytics powers investment decisions, compliance monitoring, security audits, and market intelligence across the crypto industry. Platforms like Nansen, Dune Analytics, and Arkham Intelligence have built billion-dollar businesses on blockchain data analysis. Building your own on-chain analytics capability requires collecting massive amounts of blockchain data — transaction histories, token transfers, contract interactions, and state changes — which demands high-volume RPC access that quickly exceeds any single provider’s rate limits.
Proxies distribute your data collection across multiple IPs and RPC endpoints, enabling the throughput needed for comprehensive blockchain data analysis.
The Scale of Blockchain Data Collection
To illustrate the scale involved, consider what comprehensive Ethereum data collection requires:
- Block data: ~7,200 blocks per day, each containing 100-400 transactions
- Transaction receipts: ~1.2 million per day, each requiring a separate RPC call
- Token transfers: ~2-5 million ERC-20 transfer events per day
- Contract state: Millions of storage slot reads for DeFi protocol analysis
- Historical data: 19+ million blocks since genesis
Collecting a single day of complete Ethereum data requires approximately 5 million RPC calls. At standard rate limits (25-100 calls per second per key), this takes 14-55 hours on a single endpoint. With proxy-distributed RPC access, you can complete this in under an hour.
Architecture for Blockchain Data Collection
┌──────────────────────────────────────────────┐
│ Data Collection Engine │
│ │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ Block │ │ Event │ │ State │ │
│ │ Reader │ │ Scanner │ │ Reader │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
│ └──────────────┼──────────────┘ │
│ ┌───────▼────────┐ │
│ │ RPC Proxy Pool │ │
│ └───────┬────────┘ │
└──────────────────────┼───────────────────────┘
│
┌────────────┼────────────┐
▼ ▼ ▼
Alchemy Infura QuickNode
(via proxy 1) (via proxy 2) (via proxy 3)Setting Up the Data Collection Infrastructure
Step 1: RPC Pool with Proxy Distribution
import aiohttp
import asyncio
import time
from typing import List, Dict, Any
from dataclasses import dataclass, field
import random
@dataclass
class RPCEndpoint:
url: str
proxy: str
api_key: str
requests_made: int = 0
last_request: float = 0
errors: int = 0
avg_latency_ms: float = 0
class BlockchainDataProxyPool:
def __init__(self):
self.endpoints: List[RPCEndpoint] = []
self.current_index = 0
self.lock = asyncio.Lock()
def add_endpoint(self, url: str, proxy: str, api_key: str = ""):
self.endpoints.append(RPCEndpoint(
url=url, proxy=proxy, api_key=api_key
))
async def get_endpoint(self) -> RPCEndpoint:
async with self.lock:
# Round-robin with health awareness
healthy = [
e for e in self.endpoints if e.errors < 10
]
if not healthy:
# Reset error counts
for e in self.endpoints:
e.errors = 0
healthy = self.endpoints
endpoint = healthy[self.current_index % len(healthy)]
self.current_index += 1
return endpoint
async def rpc_call(self, method: str, params: list) -> Any:
endpoint = await self.get_endpoint()
start = time.time()
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params,
"id": 1
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint.url,
json=payload,
proxy=f"http://{endpoint.proxy}",
timeout=aiohttp.ClientTimeout(total=10)
) as resp:
latency = (time.time() - start) * 1000
endpoint.avg_latency_ms = (
endpoint.avg_latency_ms * 0.9 + latency * 0.1
)
endpoint.requests_made += 1
endpoint.last_request = time.time()
data = await resp.json()
if "error" in data:
endpoint.errors += 1
raise Exception(data["error"].get("message", "RPC error"))
return data.get("result")
except Exception as e:
endpoint.errors += 1
raise
async def batch_rpc_call(self, calls: List[dict]) -> List[Any]:
"""Execute multiple RPC calls as a batch."""
endpoint = await self.get_endpoint()
batch = [
{
"jsonrpc": "2.0",
"method": call["method"],
"params": call["params"],
"id": i
}
for i, call in enumerate(calls)
]
async with aiohttp.ClientSession() as session:
async with session.post(
endpoint.url,
json=batch,
proxy=f"http://{endpoint.proxy}",
timeout=aiohttp.ClientTimeout(total=30)
) as resp:
results = await resp.json()
endpoint.requests_made += 1
return [r.get("result") for r in sorted(
results, key=lambda x: x["id"]
)]
# Initialize pool
pool = BlockchainDataProxyPool()
pool.add_endpoint(
"https://eth-mainnet.g.alchemy.com/v2/KEY1",
"user:pass@proxy1.example.com:8080"
)
pool.add_endpoint(
"https://eth-mainnet.g.alchemy.com/v2/KEY2",
"user:pass@proxy2.example.com:8080"
)
pool.add_endpoint(
"https://mainnet.infura.io/v3/KEY3",
"user:pass@proxy3.example.com:8080"
)
pool.add_endpoint(
"https://eth.quiknode.pro/KEY4/",
"user:pass@proxy4.example.com:8080"
)Step 2: Block and Transaction Collector
class BlockCollector:
def __init__(self, rpc_pool: BlockchainDataProxyPool):
self.pool = rpc_pool
async def get_block(self, block_number: int,
full_transactions: bool = True) -> dict:
hex_block = hex(block_number)
return await self.pool.rpc_call(
"eth_getBlockByNumber",
[hex_block, full_transactions]
)
async def get_block_range(self, start: int, end: int) -> list:
"""Fetch a range of blocks using concurrent requests."""
semaphore = asyncio.Semaphore(20) # Max concurrent requests
async def fetch_with_semaphore(block_num):
async with semaphore:
return await self.get_block(block_num)
tasks = [
fetch_with_semaphore(n) for n in range(start, end + 1)
]
return await asyncio.gather(*tasks, return_exceptions=True)
async def get_transaction_receipt(self, tx_hash: str) -> dict:
return await self.pool.rpc_call(
"eth_getTransactionReceipt", [tx_hash]
)
async def get_receipts_batch(self, tx_hashes: list) -> list:
"""Fetch multiple transaction receipts using batch RPC."""
batch_size = 50
all_receipts = []
for i in range(0, len(tx_hashes), batch_size):
batch = tx_hashes[i:i + batch_size]
calls = [
{"method": "eth_getTransactionReceipt", "params": [h]}
for h in batch
]
receipts = await self.pool.batch_rpc_call(calls)
all_receipts.extend(receipts)
return all_receiptsStep 3: Event Log Scanner
class EventScanner:
"""Scan for specific events across block ranges."""
# Common event signatures
ERC20_TRANSFER = (
"0xddf252ad1be2c89b69c2b068fc378daa"
"952ba7f163c4a11628f55a4df523b3ef"
)
UNISWAP_SWAP = (
"0xc42079f94a6350d7e6235f29174924f928cc2ac818eb64fed8004e115fbcca67"
)
def __init__(self, rpc_pool: BlockchainDataProxyPool):
self.pool = rpc_pool
async def get_logs(self, from_block: int, to_block: int,
address: str = None,
topics: list = None) -> list:
params = {
"fromBlock": hex(from_block),
"toBlock": hex(to_block),
}
if address:
params["address"] = address
if topics:
params["topics"] = topics
return await self.pool.rpc_call("eth_getLogs", [params])
async def scan_erc20_transfers(self, token_address: str,
from_block: int,
to_block: int) -> list:
"""Get all ERC-20 transfers for a token."""
# Scan in chunks of 2000 blocks to avoid response size limits
chunk_size = 2000
all_transfers = []
for start in range(from_block, to_block, chunk_size):
end = min(start + chunk_size - 1, to_block)
logs = await self.get_logs(
start, end,
address=token_address,
topics=[self.ERC20_TRANSFER]
)
if logs:
for log in logs:
transfer = {
"block": int(log["blockNumber"], 16),
"tx_hash": log["transactionHash"],
"from": "0x" + log["topics"][1][26:],
"to": "0x" + log["topics"][2][26:],
"value": int(log["data"], 16),
}
all_transfers.append(transfer)
return all_transfers
async def scan_dex_swaps(self, pool_address: str,
from_block: int,
to_block: int) -> list:
"""Get all swap events from a Uniswap V3 pool."""
chunk_size = 2000
all_swaps = []
for start in range(from_block, to_block, chunk_size):
end = min(start + chunk_size - 1, to_block)
logs = await self.get_logs(
start, end,
address=pool_address,
topics=[self.UNISWAP_SWAP]
)
if logs:
all_swaps.extend(logs)
return all_swapsStep 4: Whale Tracker
class WhaleTracker:
"""Track large token movements and whale wallets."""
def __init__(self, rpc_pool: BlockchainDataProxyPool):
self.pool = rpc_pool
self.scanner = EventScanner(rpc_pool)
async def find_large_transfers(self, token_address: str,
min_value: int,
blocks_back: int = 1000) -> list:
"""Find token transfers above a minimum value."""
latest = await self.pool.rpc_call("eth_blockNumber", [])
latest_block = int(latest, 16)
from_block = latest_block - blocks_back
transfers = await self.scanner.scan_erc20_transfers(
token_address, from_block, latest_block
)
whales = [t for t in transfers if t["value"] >= min_value]
whales.sort(key=lambda x: x["value"], reverse=True)
return whales
async def get_wallet_token_balance(self, wallet: str,
token_address: str) -> int:
"""Get ERC-20 token balance for a wallet."""
# balanceOf(address) call data
padded_address = wallet[2:].lower().zfill(64)
data = f"0x70a08231{padded_address}"
result = await self.pool.rpc_call(
"eth_call",
[{"to": token_address, "data": data}, "latest"]
)
return int(result, 16) if result else 0Scaling Data Collection with Proxies
Throughput Benchmarks
| Setup | RPC Endpoints | Proxies | Effective RPS | Daily Capacity |
|---|---|---|---|---|
| Single key | 1 | 0 | 25 | 2.1M calls |
| Basic pool | 3 | 3 | 75 | 6.5M calls |
| Standard pool | 6 | 6 | 150 | 13M calls |
| Advanced pool | 12 | 12 | 300 | 26M calls |
With mobile proxies distributing load across multiple RPC API keys, you can achieve 10-30x the throughput of a single endpoint. This is essential for web scraping and data collection workloads that require high volume.
Cost Optimization
RPC calls are priced per compute unit. Optimize your proxy distribution to minimize costs:
- Use batch requests where possible — a batch of 50 calls counts as 50 CUs but only one HTTP request
- Cache responses for immutable data (historical blocks, finalized transactions)
- Use
eth_getLogsinstead of polling — event-based data collection is more efficient than block-by-block scanning - Distribute across providers to stay within free tier limits on each
Data Storage for Analytics
import sqlite3
def init_analytics_db(db_path: str):
conn = sqlite3.connect(db_path)
conn.executescript('''
CREATE TABLE IF NOT EXISTS blocks (
number INTEGER PRIMARY KEY,
hash TEXT,
timestamp INTEGER,
transaction_count INTEGER,
gas_used INTEGER,
gas_limit INTEGER
);
CREATE TABLE IF NOT EXISTS transfers (
id INTEGER PRIMARY KEY AUTOINCREMENT,
block_number INTEGER,
tx_hash TEXT,
token_address TEXT,
from_address TEXT,
to_address TEXT,
value TEXT,
FOREIGN KEY (block_number) REFERENCES blocks(number)
);
CREATE INDEX IF NOT EXISTS idx_transfers_token
ON transfers(token_address);
CREATE INDEX IF NOT EXISTS idx_transfers_from
ON transfers(from_address);
CREATE INDEX IF NOT EXISTS idx_transfers_to
ON transfers(to_address);
''')
return connCommon Pitfalls
Hitting RPC provider limits despite proxies: Each API key has its own rate limit regardless of IP. Use separate API keys per proxy, not just separate IPs with the same key.
Not handling chain reorganizations: Recent blocks can be reorganized. Always wait for finalization before treating data as permanent. For the proxy glossary definition of how chain data consistency works with proxy caching, check the dedicated resource.
Ignoring batch request capabilities: Many RPC providers support batch requests that process multiple calls in a single HTTP request. This dramatically reduces overhead.
Collecting more data than needed: Focus on the specific events, contracts, and time ranges relevant to your analysis rather than trying to index the entire blockchain.
Conclusion
Blockchain data collection at scale is fundamentally a distributed systems problem, and proxy infrastructure is the distribution layer that makes it feasible. By combining multiple RPC providers with proxy-distributed access, you can achieve the throughput needed for comprehensive on-chain analytics — from whale tracking to DEX analysis to protocol monitoring. Start with a focused data collection scope, scale your proxy pool as your analytics needs grow, and always implement proper caching to avoid redundant RPC calls.
- How to Avoid IP-Based Sybil Detection in Crypto Protocols
- Best Proxies for Binance, Bybit, and OKX API Trading
- How to Collect Cryptocurrency Price Data Across Exchanges
- How to Scrape Stock Market Data with Mobile Proxies
- How Anti-Bot Systems Detect Scrapers (Cloudflare, Akamai, PerimeterX)
- Anti-Phishing with Proxies: How Security Teams Use Mobile IPs
- How to Avoid IP-Based Sybil Detection in Crypto Protocols
- Best Proxies for Binance, Bybit, and OKX API Trading
- How to Collect Cryptocurrency Price Data Across Exchanges
- How to Scrape Stock Market Data with Mobile Proxies
- 403 Forbidden in Web Scraping: How to Fix It
- aiohttp + BeautifulSoup: Async Python Scraping
- How to Avoid IP-Based Sybil Detection in Crypto Protocols
- Best Proxies for Binance, Bybit, and OKX API Trading
- How to Collect Cryptocurrency Price Data Across Exchanges
- How to Scrape Stock Market Data with Mobile Proxies
- 403 Forbidden in Web Scraping: How to Fix It
- aiohttp + BeautifulSoup: Async Python Scraping
- How to Avoid IP-Based Sybil Detection in Crypto Protocols
- Best Proxies for Binance, Bybit, and OKX API Trading
- How to Collect Cryptocurrency Price Data Across Exchanges
- How to Scrape Stock Market Data with Mobile Proxies
- 403 Forbidden Error: What It Means & How to Fix It
- 403 Forbidden in Web Scraping: How to Fix It
Related Reading
- How to Avoid IP-Based Sybil Detection in Crypto Protocols
- Best Proxies for Binance, Bybit, and OKX API Trading
- How to Collect Cryptocurrency Price Data Across Exchanges
- How to Scrape Stock Market Data with Mobile Proxies
- 403 Forbidden Error: What It Means & How to Fix It
- 403 Forbidden in Web Scraping: How to Fix It