How to Build an Automated Lead Scraping Pipeline with Proxies
Manual lead generation does not scale. A sales rep spending four hours per day researching prospects on LinkedIn, business directories, and company websites might find 30-50 qualified leads. An automated pipeline running 24/7 with mobile proxies can generate thousands of enriched leads per day without human intervention.
This guide covers the complete architecture for building a production-grade automated lead scraping pipeline — from URL discovery through data enrichment to CRM delivery.
Pipeline Architecture
A robust lead scraping pipeline has six stages:
[URL Discovery] → [Proxy Router] → [Scraper Workers] → [Data Parser] → [Enrichment] → [CRM Output]Each stage runs independently and communicates through a message queue. This decoupled architecture means you can scale individual components without rebuilding the entire system.
Technology Stack
| Component | Recommended Tool | Purpose |
|---|---|---|
| Message Queue | Redis / RabbitMQ | Job distribution |
| Scraper Framework | Scrapy / aiohttp | HTTP requests |
| Browser Automation | Playwright | JS-heavy sites |
| Proxy Layer | Mobile proxy gateway | IP rotation |
| Data Store | PostgreSQL | Lead storage |
| Scheduler | Celery / APScheduler | Task scheduling |
| CRM Integration | REST API | HubSpot / Salesforce |
Stage 1: URL Discovery
Before you can scrape leads, you need a list of target URLs. There are three primary discovery methods:
Search Engine Scraping
Use Google search to find business listing pages for your target niche:
import requests
from bs4 import BeautifulSoup
import time
import random
def google_search_urls(query, proxy_url, num_results=100):
"""Scrape Google search results for target URLs"""
urls = []
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
for start in range(0, num_results, 10):
params = {
"q": query,
"start": start,
"num": 10,
}
response = requests.get(
"https://www.google.com/search",
params=params,
headers=headers,
proxies={"https": proxy_url},
timeout=15
)
soup = BeautifulSoup(response.text, 'lxml')
for result in soup.select('div.g a[href]'):
href = result.get('href', '')
if href.startswith('http') and 'google.com' not in href:
urls.append(href)
time.sleep(random.uniform(5, 15)) # Respect rate limits
return list(set(urls))Sitemap Parsing
Many business directories publish XML sitemaps that list every business page:
import xml.etree.ElementTree as ET
def parse_sitemap(sitemap_url, proxy_url):
"""Extract URLs from XML sitemap"""
response = requests.get(
sitemap_url,
proxies={"https": proxy_url},
timeout=15
)
root = ET.fromstring(response.content)
namespace = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'}
urls = []
for url_element in root.findall('.//ns:url/ns:loc', namespace):
urls.append(url_element.text)
return urlsSeed List Expansion
Start with a known list of company domains and expand by scraping their partners, customers, and competitors pages:
def expand_seed_list(seed_domains, proxy_url, session):
"""Find related companies from seed domain websites"""
expanded = set(seed_domains)
for domain in seed_domains:
try:
response = session.get(
f"https://{domain}",
proxies={"https": proxy_url},
timeout=15
)
soup = BeautifulSoup(response.text, 'lxml')
# Look for partner/customer logo sections
for img in soup.find_all('img', alt=True):
alt = img['alt'].lower()
if any(kw in alt for kw in ['partner', 'customer', 'client', 'trusted by']):
# Extract linked domain
parent_link = img.find_parent('a', href=True)
if parent_link:
from urllib.parse import urlparse
linked_domain = urlparse(parent_link['href']).netloc
if linked_domain:
expanded.add(linked_domain)
except Exception:
continue
return list(expanded)Stage 2: Proxy Router
The proxy router assigns proxy sessions to scraping jobs based on the target website’s difficulty level. For key terminology used in proxy configuration, see our proxy glossary.
import redis
import json
class ProxyRouter:
"""Route scraping jobs through appropriate proxy configurations"""
def __init__(self, proxy_gateway, redis_client):
self.gateway = proxy_gateway
self.redis = redis_client
self.profiles = {
"low_protection": {
"rotation": "per_request",
"concurrency": 50,
"delay": 0.5,
},
"medium_protection": {
"rotation": "per_request",
"concurrency": 10,
"delay": 2,
},
"high_protection": {
"rotation": "sticky_30min",
"concurrency": 3,
"delay": 5,
"browser": True,
},
}
def classify_target(self, domain):
"""Classify target website protection level"""
high_protection = ['linkedin.com', 'zoominfo.com', 'apollo.io']
medium_protection = ['yelp.com', 'yellowpages.com', 'bbb.org']
if domain in high_protection:
return "high_protection"
elif domain in medium_protection:
return "medium_protection"
return "low_protection"
def get_proxy_config(self, domain):
"""Return proxy configuration for target domain"""
level = self.classify_target(domain)
profile = self.profiles[level]
if profile["rotation"] == "sticky_30min":
session_id = f"sticky_{domain}_{int(time.time())}"
proxy_url = f"http://user-session-{session_id}:pass@{self.gateway}"
else:
proxy_url = f"http://user:pass@{self.gateway}"
return {
"proxy_url": proxy_url,
"concurrency": profile["concurrency"],
"delay": profile["delay"],
"use_browser": profile.get("browser", False),
}Stage 3: Scraper Workers
Workers consume URLs from the queue, scrape content through the proxy router, and push raw HTML to the parsing stage:
import asyncio
import aiohttp
from celery import Celery
app = Celery('lead_pipeline', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3)
def scrape_url(self, url, proxy_config):
"""Celery task to scrape a single URL"""
try:
response = requests.get(
url,
proxies={"https": proxy_config["proxy_url"]},
timeout=30,
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
)
if response.status_code == 200:
# Push to parsing queue
parse_html.delay(url, response.text)
elif response.status_code == 429:
# Rate limited - retry with exponential backoff
raise self.retry(countdown=60 * (2 ** self.request.retries))
elif response.status_code == 403:
# Blocked - retry with browser-based scraping
scrape_with_browser.delay(url, proxy_config)
except requests.exceptions.RequestException as exc:
raise self.retry(exc=exc, countdown=30)
@app.task
def scrape_with_browser(url, proxy_config):
"""Fallback browser-based scraping for protected sites"""
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(proxy={
"server": proxy_config["proxy_url"]
})
page = browser.new_page()
page.goto(url, wait_until="networkidle", timeout=30000)
html = page.content()
browser.close()
parse_html.delay(url, html)Stage 4: Data Parser
The parser extracts structured lead data from raw HTML:
from bs4 import BeautifulSoup
import re
@app.task
def parse_html(url, html):
"""Extract structured lead data from HTML"""
soup = BeautifulSoup(html, 'lxml')
lead = {"source_url": url}
# Extract company name
title = soup.find('title')
lead['company_name'] = title.get_text().split('|')[0].strip() if title else None
# Extract emails
email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
emails = re.findall(email_pattern, html)
lead['emails'] = list(set(e.lower() for e in emails if not e.endswith(('.png', '.jpg'))))
# Extract phone numbers
phone_pattern = r'[\+]?[(]?[0-9]{1,3}[)]?[-\s\.]?[(]?[0-9]{1,3}[)]?[-\s\.]?[0-9]{3,6}[-\s\.]?[0-9]{3,6}'
phones = re.findall(phone_pattern, html)
lead['phones'] = list(set(phones))[:3]
# Extract social links
social_patterns = {
'linkedin': r'linkedin\.com/(?:company|in)/[a-zA-Z0-9_-]+',
'twitter': r'twitter\.com/[a-zA-Z0-9_]+',
}
for platform, pattern in social_patterns.items():
match = re.search(pattern, html)
if match:
lead[f'{platform}_url'] = f'https://{match.group()}'
# Push to enrichment
if lead.get('emails') or lead.get('phones'):
enrich_lead.delay(lead)Stage 5: Enrichment
Cross-reference scraped data with additional sources to build complete prospect profiles:
@app.task
def enrich_lead(lead):
"""Enrich lead with additional data sources"""
enriched = lead.copy()
# Enrich with company website data
if lead.get('company_name'):
# Look up company domain
domain = find_company_domain(lead['company_name'])
if domain:
enriched['website'] = domain
enriched['tech_stack'] = detect_tech_stack(domain)
# Classify lead quality
enriched['quality_score'] = calculate_quality_score(enriched)
# Store in database
store_lead(enriched)
# Push high-quality leads to CRM
if enriched['quality_score'] >= 70:
push_to_crm.delay(enriched)
def calculate_quality_score(lead):
"""Score lead completeness and quality"""
score = 0
if lead.get('company_name'): score += 15
if lead.get('emails'): score += 25
if lead.get('phones'): score += 15
if lead.get('website'): score += 10
if lead.get('linkedin_url'): score += 15
if lead.get('tech_stack'): score += 10
if lead.get('company_name') and len(lead['company_name']) > 2: score += 10
return scoreStage 6: CRM Output
Deliver qualified leads directly to your sales team’s CRM:
@app.task
def push_to_crm(lead):
"""Push enriched lead to CRM"""
crm_payload = {
"properties": {
"company": lead.get("company_name"),
"email": lead["emails"][0] if lead.get("emails") else None,
"phone": lead["phones"][0] if lead.get("phones") else None,
"website": lead.get("website"),
"lead_source": "automated_scraping",
"lead_score": lead.get("quality_score"),
}
}
# Example: HubSpot API
response = requests.post(
"https://api.hubapi.com/crm/v3/objects/contacts",
headers={"Authorization": f"Bearer {HUBSPOT_API_KEY}"},
json=crm_payload
)
if response.status_code == 409:
# Contact already exists - update instead
update_crm_contact(lead)Monitoring and Alerting
A production pipeline needs monitoring to catch failures early:
class PipelineMonitor:
"""Monitor pipeline health metrics"""
def __init__(self, redis_client):
self.redis = redis_client
def record_metric(self, stage, metric, value):
key = f"pipeline:{stage}:{metric}"
self.redis.lpush(key, json.dumps({
"value": value,
"timestamp": time.time()
}))
self.redis.ltrim(key, 0, 999) # Keep last 1000 entries
def check_health(self):
"""Check pipeline health and alert on issues"""
metrics = {}
# Check scraper success rate
success = self.redis.llen("pipeline:scraper:success")
failure = self.redis.llen("pipeline:scraper:failure")
total = success + failure
if total > 0:
success_rate = success / total
metrics["scraper_success_rate"] = success_rate
if success_rate < 0.7:
self.alert("Scraper success rate below 70%")
# Check proxy block rate
blocks = self.redis.llen("pipeline:proxy:blocked")
requests_total = self.redis.llen("pipeline:proxy:requests")
if requests_total > 0:
block_rate = blocks / requests_total
metrics["proxy_block_rate"] = block_rate
if block_rate > 0.1:
self.alert("Proxy block rate above 10%")
return metricsScaling Considerations
As your pipeline grows from hundreds to tens of thousands of leads per day, consider these scaling strategies. For teams already running web scraping infrastructure, many of these patterns will be familiar:
- Horizontal scaling — Add more Celery workers across multiple machines.
- Proxy pool expansion — Use multiple proxy providers to increase available IP diversity.
- Database sharding — Partition leads by industry or geography for faster queries.
- Caching layer — Cache frequently accessed pages to reduce proxy bandwidth costs.
- Incremental updates — Re-scrape existing leads monthly rather than daily to conserve resources.
Conclusion
An automated lead scraping pipeline transforms B2B prospecting from a manual research task into a scalable data operation. The architecture outlined here — URL discovery, proxy routing, distributed scraping, parsing, enrichment, and CRM integration — handles the full lifecycle from raw web data to sales-ready leads. Start with a single data source, prove the pipeline works end-to-end, and then add sources and scale horizontally. The mobile proxy layer is what makes the entire system reliable at scale, ensuring consistent access to target websites without the disruption of IP bans and rate limits.
- Building a B2B Contact Enrichment Pipeline with Mobile Proxies
- B2B Lead Enrichment: Using Proxies to Verify and Augment Contact Data
- How to Scrape Job Listings at Scale with Rotating Proxies
- Proxies for HR Tech: Salary Benchmarking & Talent Intelligence
- How to Scrape AliExpress Product Data Without Getting Blocked
- Amazon Buy Box Monitoring: Proxy Setup for Continuous Tracking
- Building a B2B Contact Enrichment Pipeline with Mobile Proxies
- B2B Lead Enrichment: Using Proxies to Verify and Augment Contact Data
- How to Scrape Job Listings at Scale with Rotating Proxies
- Proxies for HR Tech: Salary Benchmarking & Talent Intelligence
- aiohttp + BeautifulSoup: Async Python Scraping
- How to Scrape AliExpress Product Data Without Getting Blocked
- Building a B2B Contact Enrichment Pipeline with Mobile Proxies
- B2B Lead Enrichment: Using Proxies to Verify and Augment Contact Data
- How to Scrape Job Listings at Scale with Rotating Proxies
- Proxies for HR Tech: Salary Benchmarking & Talent Intelligence
- aiohttp + BeautifulSoup: Async Python Scraping
- How to Scrape AliExpress Product Data Without Getting Blocked
- Building a B2B Contact Enrichment Pipeline with Mobile Proxies
- B2B Lead Enrichment: Using Proxies to Verify and Augment Contact Data
- How to Scrape Job Listings at Scale with Rotating Proxies
- Proxies for HR Tech: Salary Benchmarking & Talent Intelligence
- aiohttp + BeautifulSoup: Async Python Scraping
- How to Scrape AliExpress Product Data Without Getting Blocked
- Building a B2B Contact Enrichment Pipeline with Mobile Proxies
- B2B Lead Enrichment: Using Proxies to Verify and Augment Contact Data
- How to Scrape Job Listings at Scale with Rotating Proxies
- Proxies for HR Tech: Salary Benchmarking & Talent Intelligence
- aiohttp + BeautifulSoup: Async Python Scraping
- How to Scrape AliExpress Product Data Without Getting Blocked
- Building a B2B Contact Enrichment Pipeline with Mobile Proxies
- B2B Lead Enrichment: Using Proxies to Verify and Augment Contact Data
- How to Scrape Job Listings at Scale with Rotating Proxies
- Proxies for HR Tech: Salary Benchmarking & Talent Intelligence
- aiohttp + BeautifulSoup: Async Python Scraping
- How to Scrape AliExpress Product Data Without Getting Blocked
- Building a B2B Contact Enrichment Pipeline with Mobile Proxies
- B2B Lead Enrichment: Using Proxies to Verify and Augment Contact Data
- How to Scrape Job Listings at Scale with Rotating Proxies
- Proxies for HR Tech: Salary Benchmarking & Talent Intelligence
- aiohttp + BeautifulSoup: Async Python Scraping
- How to Scrape AliExpress Product Data Without Getting Blocked
- Building a B2B Contact Enrichment Pipeline with Mobile Proxies
- B2B Lead Enrichment: Using Proxies to Verify and Augment Contact Data
- How to Scrape Job Listings at Scale with Rotating Proxies
- Proxies for HR Tech: Salary Benchmarking & Talent Intelligence
- aiohttp + BeautifulSoup: Async Python Scraping
- How to Scrape AliExpress Product Data Without Getting Blocked
Related Reading
- Building a B2B Contact Enrichment Pipeline with Mobile Proxies
- B2B Lead Enrichment: Using Proxies to Verify and Augment Contact Data
- How to Scrape Job Listings at Scale with Rotating Proxies
- Proxies for HR Tech: Salary Benchmarking & Talent Intelligence
- aiohttp + BeautifulSoup: Async Python Scraping
- How to Scrape AliExpress Product Data Without Getting Blocked