How to Build an Automated Lead Scraping Pipeline with Proxies

How to Build an Automated Lead Scraping Pipeline with Proxies

Manual lead generation does not scale. A sales rep spending four hours per day researching prospects on LinkedIn, business directories, and company websites might find 30-50 qualified leads. An automated pipeline running 24/7 with mobile proxies can generate thousands of enriched leads per day without human intervention.

This guide covers the complete architecture for building a production-grade automated lead scraping pipeline — from URL discovery through data enrichment to CRM delivery.

Pipeline Architecture

A robust lead scraping pipeline has six stages:

[URL Discovery] → [Proxy Router] → [Scraper Workers] → [Data Parser] → [Enrichment] → [CRM Output]

Each stage runs independently and communicates through a message queue. This decoupled architecture means you can scale individual components without rebuilding the entire system.

Technology Stack

ComponentRecommended ToolPurpose
Message QueueRedis / RabbitMQJob distribution
Scraper FrameworkScrapy / aiohttpHTTP requests
Browser AutomationPlaywrightJS-heavy sites
Proxy LayerMobile proxy gatewayIP rotation
Data StorePostgreSQLLead storage
SchedulerCelery / APSchedulerTask scheduling
CRM IntegrationREST APIHubSpot / Salesforce

Stage 1: URL Discovery

Before you can scrape leads, you need a list of target URLs. There are three primary discovery methods:

Search Engine Scraping

Use Google search to find business listing pages for your target niche:

import requests
from bs4 import BeautifulSoup
import time
import random

def google_search_urls(query, proxy_url, num_results=100):
    """Scrape Google search results for target URLs"""
    urls = []
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
    }

    for start in range(0, num_results, 10):
        params = {
            "q": query,
            "start": start,
            "num": 10,
        }
        response = requests.get(
            "https://www.google.com/search",
            params=params,
            headers=headers,
            proxies={"https": proxy_url},
            timeout=15
        )
        soup = BeautifulSoup(response.text, 'lxml')

        for result in soup.select('div.g a[href]'):
            href = result.get('href', '')
            if href.startswith('http') and 'google.com' not in href:
                urls.append(href)

        time.sleep(random.uniform(5, 15))  # Respect rate limits

    return list(set(urls))

Sitemap Parsing

Many business directories publish XML sitemaps that list every business page:

import xml.etree.ElementTree as ET

def parse_sitemap(sitemap_url, proxy_url):
    """Extract URLs from XML sitemap"""
    response = requests.get(
        sitemap_url,
        proxies={"https": proxy_url},
        timeout=15
    )
    root = ET.fromstring(response.content)
    namespace = {'ns': 'http://www.sitemaps.org/schemas/sitemap/0.9'}

    urls = []
    for url_element in root.findall('.//ns:url/ns:loc', namespace):
        urls.append(url_element.text)

    return urls

Seed List Expansion

Start with a known list of company domains and expand by scraping their partners, customers, and competitors pages:

def expand_seed_list(seed_domains, proxy_url, session):
    """Find related companies from seed domain websites"""
    expanded = set(seed_domains)

    for domain in seed_domains:
        try:
            response = session.get(
                f"https://{domain}",
                proxies={"https": proxy_url},
                timeout=15
            )
            soup = BeautifulSoup(response.text, 'lxml')

            # Look for partner/customer logo sections
            for img in soup.find_all('img', alt=True):
                alt = img['alt'].lower()
                if any(kw in alt for kw in ['partner', 'customer', 'client', 'trusted by']):
                    # Extract linked domain
                    parent_link = img.find_parent('a', href=True)
                    if parent_link:
                        from urllib.parse import urlparse
                        linked_domain = urlparse(parent_link['href']).netloc
                        if linked_domain:
                            expanded.add(linked_domain)
        except Exception:
            continue

    return list(expanded)

Stage 2: Proxy Router

The proxy router assigns proxy sessions to scraping jobs based on the target website’s difficulty level. For key terminology used in proxy configuration, see our proxy glossary.

import redis
import json

class ProxyRouter:
    """Route scraping jobs through appropriate proxy configurations"""

    def __init__(self, proxy_gateway, redis_client):
        self.gateway = proxy_gateway
        self.redis = redis_client
        self.profiles = {
            "low_protection": {
                "rotation": "per_request",
                "concurrency": 50,
                "delay": 0.5,
            },
            "medium_protection": {
                "rotation": "per_request",
                "concurrency": 10,
                "delay": 2,
            },
            "high_protection": {
                "rotation": "sticky_30min",
                "concurrency": 3,
                "delay": 5,
                "browser": True,
            },
        }

    def classify_target(self, domain):
        """Classify target website protection level"""
        high_protection = ['linkedin.com', 'zoominfo.com', 'apollo.io']
        medium_protection = ['yelp.com', 'yellowpages.com', 'bbb.org']

        if domain in high_protection:
            return "high_protection"
        elif domain in medium_protection:
            return "medium_protection"
        return "low_protection"

    def get_proxy_config(self, domain):
        """Return proxy configuration for target domain"""
        level = self.classify_target(domain)
        profile = self.profiles[level]

        if profile["rotation"] == "sticky_30min":
            session_id = f"sticky_{domain}_{int(time.time())}"
            proxy_url = f"http://user-session-{session_id}:pass@{self.gateway}"
        else:
            proxy_url = f"http://user:pass@{self.gateway}"

        return {
            "proxy_url": proxy_url,
            "concurrency": profile["concurrency"],
            "delay": profile["delay"],
            "use_browser": profile.get("browser", False),
        }

Stage 3: Scraper Workers

Workers consume URLs from the queue, scrape content through the proxy router, and push raw HTML to the parsing stage:

import asyncio
import aiohttp
from celery import Celery

app = Celery('lead_pipeline', broker='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3)
def scrape_url(self, url, proxy_config):
    """Celery task to scrape a single URL"""
    try:
        response = requests.get(
            url,
            proxies={"https": proxy_config["proxy_url"]},
            timeout=30,
            headers={
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
            }
        )

        if response.status_code == 200:
            # Push to parsing queue
            parse_html.delay(url, response.text)
        elif response.status_code == 429:
            # Rate limited - retry with exponential backoff
            raise self.retry(countdown=60 * (2 ** self.request.retries))
        elif response.status_code == 403:
            # Blocked - retry with browser-based scraping
            scrape_with_browser.delay(url, proxy_config)

    except requests.exceptions.RequestException as exc:
        raise self.retry(exc=exc, countdown=30)


@app.task
def scrape_with_browser(url, proxy_config):
    """Fallback browser-based scraping for protected sites"""
    from playwright.sync_api import sync_playwright

    with sync_playwright() as p:
        browser = p.chromium.launch(proxy={
            "server": proxy_config["proxy_url"]
        })
        page = browser.new_page()
        page.goto(url, wait_until="networkidle", timeout=30000)
        html = page.content()
        browser.close()

    parse_html.delay(url, html)

Stage 4: Data Parser

The parser extracts structured lead data from raw HTML:

from bs4 import BeautifulSoup
import re

@app.task
def parse_html(url, html):
    """Extract structured lead data from HTML"""
    soup = BeautifulSoup(html, 'lxml')
    lead = {"source_url": url}

    # Extract company name
    title = soup.find('title')
    lead['company_name'] = title.get_text().split('|')[0].strip() if title else None

    # Extract emails
    email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
    emails = re.findall(email_pattern, html)
    lead['emails'] = list(set(e.lower() for e in emails if not e.endswith(('.png', '.jpg'))))

    # Extract phone numbers
    phone_pattern = r'[\+]?[(]?[0-9]{1,3}[)]?[-\s\.]?[(]?[0-9]{1,3}[)]?[-\s\.]?[0-9]{3,6}[-\s\.]?[0-9]{3,6}'
    phones = re.findall(phone_pattern, html)
    lead['phones'] = list(set(phones))[:3]

    # Extract social links
    social_patterns = {
        'linkedin': r'linkedin\.com/(?:company|in)/[a-zA-Z0-9_-]+',
        'twitter': r'twitter\.com/[a-zA-Z0-9_]+',
    }
    for platform, pattern in social_patterns.items():
        match = re.search(pattern, html)
        if match:
            lead[f'{platform}_url'] = f'https://{match.group()}'

    # Push to enrichment
    if lead.get('emails') or lead.get('phones'):
        enrich_lead.delay(lead)

Stage 5: Enrichment

Cross-reference scraped data with additional sources to build complete prospect profiles:

@app.task
def enrich_lead(lead):
    """Enrich lead with additional data sources"""
    enriched = lead.copy()

    # Enrich with company website data
    if lead.get('company_name'):
        # Look up company domain
        domain = find_company_domain(lead['company_name'])
        if domain:
            enriched['website'] = domain
            enriched['tech_stack'] = detect_tech_stack(domain)

    # Classify lead quality
    enriched['quality_score'] = calculate_quality_score(enriched)

    # Store in database
    store_lead(enriched)

    # Push high-quality leads to CRM
    if enriched['quality_score'] >= 70:
        push_to_crm.delay(enriched)


def calculate_quality_score(lead):
    """Score lead completeness and quality"""
    score = 0
    if lead.get('company_name'): score += 15
    if lead.get('emails'): score += 25
    if lead.get('phones'): score += 15
    if lead.get('website'): score += 10
    if lead.get('linkedin_url'): score += 15
    if lead.get('tech_stack'): score += 10
    if lead.get('company_name') and len(lead['company_name']) > 2: score += 10
    return score

Stage 6: CRM Output

Deliver qualified leads directly to your sales team’s CRM:

@app.task
def push_to_crm(lead):
    """Push enriched lead to CRM"""
    crm_payload = {
        "properties": {
            "company": lead.get("company_name"),
            "email": lead["emails"][0] if lead.get("emails") else None,
            "phone": lead["phones"][0] if lead.get("phones") else None,
            "website": lead.get("website"),
            "lead_source": "automated_scraping",
            "lead_score": lead.get("quality_score"),
        }
    }

    # Example: HubSpot API
    response = requests.post(
        "https://api.hubapi.com/crm/v3/objects/contacts",
        headers={"Authorization": f"Bearer {HUBSPOT_API_KEY}"},
        json=crm_payload
    )

    if response.status_code == 409:
        # Contact already exists - update instead
        update_crm_contact(lead)

Monitoring and Alerting

A production pipeline needs monitoring to catch failures early:

class PipelineMonitor:
    """Monitor pipeline health metrics"""

    def __init__(self, redis_client):
        self.redis = redis_client

    def record_metric(self, stage, metric, value):
        key = f"pipeline:{stage}:{metric}"
        self.redis.lpush(key, json.dumps({
            "value": value,
            "timestamp": time.time()
        }))
        self.redis.ltrim(key, 0, 999)  # Keep last 1000 entries

    def check_health(self):
        """Check pipeline health and alert on issues"""
        metrics = {}

        # Check scraper success rate
        success = self.redis.llen("pipeline:scraper:success")
        failure = self.redis.llen("pipeline:scraper:failure")
        total = success + failure
        if total > 0:
            success_rate = success / total
            metrics["scraper_success_rate"] = success_rate
            if success_rate < 0.7:
                self.alert("Scraper success rate below 70%")

        # Check proxy block rate
        blocks = self.redis.llen("pipeline:proxy:blocked")
        requests_total = self.redis.llen("pipeline:proxy:requests")
        if requests_total > 0:
            block_rate = blocks / requests_total
            metrics["proxy_block_rate"] = block_rate
            if block_rate > 0.1:
                self.alert("Proxy block rate above 10%")

        return metrics

Scaling Considerations

As your pipeline grows from hundreds to tens of thousands of leads per day, consider these scaling strategies. For teams already running web scraping infrastructure, many of these patterns will be familiar:

  • Horizontal scaling — Add more Celery workers across multiple machines.
  • Proxy pool expansion — Use multiple proxy providers to increase available IP diversity.
  • Database sharding — Partition leads by industry or geography for faster queries.
  • Caching layer — Cache frequently accessed pages to reduce proxy bandwidth costs.
  • Incremental updates — Re-scrape existing leads monthly rather than daily to conserve resources.

Conclusion

An automated lead scraping pipeline transforms B2B prospecting from a manual research task into a scalable data operation. The architecture outlined here — URL discovery, proxy routing, distributed scraping, parsing, enrichment, and CRM integration — handles the full lifecycle from raw web data to sales-ready leads. Start with a single data source, prove the pipeline works end-to-end, and then add sources and scale horizontally. The mobile proxy layer is what makes the entire system reliable at scale, ensuring consistent access to target websites without the disruption of IP bans and rate limits.


Related Reading

Scroll to Top