How to Build a Multi-Source Lead Scoring System with Proxy-Collected Data
Lead scoring determines which prospects your sales team contacts first. A well-built scoring system separates the 5% of leads that are ready to buy from the 95% that need nurturing — or should be discarded entirely. The best scoring systems combine data from multiple sources, each providing a different signal about the prospect’s fit and buying readiness.
Mobile proxies enable the large-scale data collection that feeds multi-source scoring. By scraping company websites, social profiles, technology databases, hiring data, and intent signals, you build scoring models that dramatically outperform simple firmographic filters.
The Four Dimensions of Lead Scoring
Effective B2B lead scoring evaluates prospects across four dimensions:
| Dimension | What It Measures | Data Sources |
|---|---|---|
| Firmographic Fit | Does the company match your ICP? | LinkedIn, Crunchbase, company websites |
| Technographic Fit | Does their tech stack align with your product? | BuiltWith, company websites, job postings |
| Intent Signals | Are they actively looking for a solution? | Forums, G2 reviews, content engagement |
| Engagement | Have they interacted with your brand? | Website analytics, email opens, content downloads |
Each dimension contributes to a composite score, typically on a 0-100 scale.
Data Collection Architecture
Multi-Source Data Pipeline
from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime
import asyncio
import json
@dataclass
class LeadData:
"""Container for all collected lead data"""
email: str
company_domain: str
raw_data: Dict[str, Any] = field(default_factory=dict)
firmographic_data: Dict = field(default_factory=dict)
technographic_data: Dict = field(default_factory=dict)
intent_data: Dict = field(default_factory=dict)
engagement_data: Dict = field(default_factory=dict)
scores: Dict = field(default_factory=dict)
total_score: float = 0.0
collected_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())
class DataCollector:
"""Collect lead data from multiple sources using proxies"""
def __init__(self, proxy_pool):
self.proxy_pool = proxy_pool
async def collect_all_data(self, email, company_domain):
"""Collect data from all sources for a lead"""
lead = LeadData(email=email, company_domain=company_domain)
# Run all data collectors in parallel
results = await asyncio.gather(
self.collect_firmographic(company_domain),
self.collect_technographic(company_domain),
self.collect_intent(company_domain, email),
return_exceptions=True,
)
if not isinstance(results[0], Exception):
lead.firmographic_data = results[0]
if not isinstance(results[1], Exception):
lead.technographic_data = results[1]
if not isinstance(results[2], Exception):
lead.intent_data = results[2]
return lead
async def collect_firmographic(self, domain):
"""Collect firmographic data from company website and LinkedIn"""
proxy = self.proxy_pool.get_next()
data = {}
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(
f"https://{domain}",
proxy=proxy,
timeout=aiohttp.ClientTimeout(total=15),
headers={"User-Agent": "Mozilla/5.0"}
) as response:
html = await response.text()
import re
# Extract company description
desc_match = re.search(
r'<meta\s+name="description"\s+content="([^"]+)"',
html, re.IGNORECASE
)
if desc_match:
data["description"] = desc_match.group(1)
# Detect company size signals
emp_match = re.search(
r'(\d{1,5})\+?\s*(?:employees|team members|people)',
html, re.IGNORECASE
)
if emp_match:
data["estimated_employees"] = int(emp_match.group(1))
# Founded year
founded_match = re.search(
r'(?:founded|established|since)\s*(?:in\s*)?(\d{4})',
html, re.IGNORECASE
)
if founded_match:
data["founded_year"] = int(founded_match.group(1))
# Location
loc_match = re.search(
r'(?:headquartered|based|located)\s+in\s+([^.<]+)',
html, re.IGNORECASE
)
if loc_match:
data["headquarters"] = loc_match.group(1).strip()
except Exception as e:
data["error"] = str(e)
return data
async def collect_technographic(self, domain):
"""Detect technology stack from company website"""
proxy = self.proxy_pool.get_next()
data = {"technologies": []}
tech_signatures = {
"Salesforce": ["force.com", "salesforce.com", "pardot"],
"HubSpot": ["hs-analytics", "hubspot.com", "hstc="],
"Marketo": ["marketo.com", "mktoweb"],
"Intercom": ["intercom.io", "widget.intercom"],
"Drift": ["drift.com", "driftt"],
"Segment": ["cdn.segment.com", "analytics.js"],
"Google Analytics": ["google-analytics.com", "gtag"],
"Stripe": ["js.stripe.com"],
"Shopify": ["cdn.shopify.com"],
"WordPress": ["wp-content", "wp-includes"],
"React": ["react", "_reactRootContainer"],
"AWS": ["amazonaws.com"],
"Cloudflare": ["cloudflare"],
"Zendesk": ["zendesk.com", "zdassets"],
"Slack": ["slack.com"],
"Jira": ["atlassian.com", "jira"],
}
try:
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(
f"https://{domain}",
proxy=proxy,
timeout=aiohttp.ClientTimeout(total=15),
headers={"User-Agent": "Mozilla/5.0"}
) as response:
html = (await response.text()).lower()
headers = dict(response.headers)
for tech, signatures in tech_signatures.items():
if any(sig.lower() in html for sig in signatures):
data["technologies"].append(tech)
# Check headers
if "cloudflare" in headers.get("server", "").lower():
if "Cloudflare" not in data["technologies"]:
data["technologies"].append("Cloudflare")
except Exception as e:
data["error"] = str(e)
return data
async def collect_intent(self, domain, email):
"""Collect intent signals from various sources"""
proxy = self.proxy_pool.get_next()
data = {"signals": []}
# Check for recent job postings (hiring = growing)
try:
import aiohttp
async with aiohttp.ClientSession() as session:
# Check careers page
for careers_path in ["/careers", "/jobs", "/join-us", "/hiring"]:
try:
async with session.get(
f"https://{domain}{careers_path}",
proxy=proxy,
timeout=aiohttp.ClientTimeout(total=10),
headers={"User-Agent": "Mozilla/5.0"},
allow_redirects=True,
) as response:
if response.status == 200:
html = await response.text()
import re
job_count = len(re.findall(
r'(?:job|position|role|opening)',
html, re.IGNORECASE
))
if job_count > 5:
data["signals"].append({
"type": "hiring_actively",
"strength": min(job_count / 10, 1.0),
"details": f"~{job_count} job-related mentions on careers page"
})
break
except Exception:
continue
except Exception:
pass
return dataThe Scoring Engine
Firmographic Scoring
class FirmographicScorer:
"""Score leads based on firmographic fit"""
def __init__(self, icp_config):
self.icp = icp_config
def score(self, firmographic_data):
"""Score firmographic fit (0-25 points)"""
score = 0
details = []
# Company size scoring
employees = firmographic_data.get("estimated_employees", 0)
ideal_min = self.icp.get("min_employees", 10)
ideal_max = self.icp.get("max_employees", 1000)
if ideal_min <= employees <= ideal_max:
score += 10
details.append(f"Company size ({employees}) within ICP range")
elif employees > 0:
# Partial credit for close matches
if employees < ideal_min:
ratio = employees / ideal_min
else:
ratio = ideal_max / employees
score += int(10 * ratio)
details.append(f"Company size ({employees}) partially matches ICP")
# Industry match
description = firmographic_data.get("description", "").lower()
target_industries = self.icp.get("target_industries", [])
for industry in target_industries:
if industry.lower() in description:
score += 8
details.append(f"Industry match: {industry}")
break
# Geography match
headquarters = firmographic_data.get("headquarters", "").lower()
target_geos = self.icp.get("target_geographies", [])
for geo in target_geos:
if geo.lower() in headquarters:
score += 7
details.append(f"Geography match: {geo}")
break
return {"score": min(score, 25), "max": 25, "details": details}Technographic Scoring
Understanding technology stack compatibility is fundamental to B2B sales. The proxy infrastructure for tech stack detection is explained in our proxy glossary.
class TechnographicScorer:
"""Score leads based on technology stack fit"""
def __init__(self, tech_config):
self.config = tech_config
def score(self, technographic_data):
"""Score technographic fit (0-25 points)"""
score = 0
details = []
technologies = technographic_data.get("technologies", [])
# Positive signals (technologies that indicate good fit)
positive_tech = self.config.get("positive_technologies", [])
for tech in positive_tech:
if tech in technologies:
score += 5
details.append(f"Uses {tech} (positive signal)")
# Negative signals (technologies that indicate poor fit)
negative_tech = self.config.get("negative_technologies", [])
for tech in negative_tech:
if tech in technologies:
score -= 5
details.append(f"Uses {tech} (negative signal)")
# Competitor technology (replacement opportunity)
competitor_tech = self.config.get("competitor_technologies", [])
for tech in competitor_tech:
if tech in technologies:
score += 8
details.append(f"Uses competitor {tech} (replacement opportunity)")
# Complementary technology (integration opportunity)
complement_tech = self.config.get("complementary_technologies", [])
for tech in complement_tech:
if tech in technologies:
score += 4
details.append(f"Uses {tech} (integration opportunity)")
return {"score": max(0, min(score, 25)), "max": 25, "details": details}Intent Scoring
class IntentScorer:
"""Score leads based on buying intent signals"""
def score(self, intent_data):
"""Score intent signals (0-30 points)"""
score = 0
details = []
signals = intent_data.get("signals", [])
signal_weights = {
"hiring_actively": 8,
"funding_recent": 10,
"competitor_review": 12,
"content_engagement": 7,
"forum_discussion": 9,
"pricing_page_visit": 15,
"demo_request": 25,
}
for signal in signals:
signal_type = signal.get("type")
weight = signal_weights.get(signal_type, 5)
strength = signal.get("strength", 1.0)
adjusted_weight = int(weight * strength)
score += adjusted_weight
details.append(f"Intent signal: {signal_type} (strength: {strength:.1f})")
return {"score": min(score, 30), "max": 30, "details": details}Engagement Scoring
class EngagementScorer:
"""Score leads based on engagement with your brand"""
def score(self, engagement_data):
"""Score engagement (0-20 points)"""
score = 0
details = []
# Website visits
visits = engagement_data.get("website_visits", 0)
if visits >= 5:
score += 8
details.append(f"High website engagement ({visits} visits)")
elif visits >= 2:
score += 4
details.append(f"Some website engagement ({visits} visits)")
# Email engagement
opens = engagement_data.get("email_opens", 0)
clicks = engagement_data.get("email_clicks", 0)
if clicks >= 2:
score += 6
details.append(f"Strong email engagement ({clicks} clicks)")
elif opens >= 3:
score += 3
details.append(f"Moderate email engagement ({opens} opens)")
# Content downloads
downloads = engagement_data.get("content_downloads", 0)
if downloads >= 1:
score += 6
details.append(f"Downloaded {downloads} content piece(s)")
return {"score": min(score, 20), "max": 20, "details": details}Composite Scoring System
Combine all four dimensions into a final score:
class LeadScoringEngine:
"""Composite lead scoring system"""
def __init__(self, icp_config, tech_config):
self.firmographic = FirmographicScorer(icp_config)
self.technographic = TechnographicScorer(tech_config)
self.intent = IntentScorer()
self.engagement = EngagementScorer()
def score_lead(self, lead_data: LeadData) -> LeadData:
"""Calculate composite lead score"""
# Score each dimension
firm_result = self.firmographic.score(lead_data.firmographic_data)
tech_result = self.technographic.score(lead_data.technographic_data)
intent_result = self.intent.score(lead_data.intent_data)
engage_result = self.engagement.score(lead_data.engagement_data)
lead_data.scores = {
"firmographic": firm_result,
"technographic": tech_result,
"intent": intent_result,
"engagement": engage_result,
}
# Calculate total (out of 100)
lead_data.total_score = (
firm_result["score"] +
tech_result["score"] +
intent_result["score"] +
engage_result["score"]
)
return lead_data
def classify_lead(self, lead_data: LeadData):
"""Classify lead by score into action categories"""
score = lead_data.total_score
if score >= 80:
return {
"category": "hot",
"action": "immediate_sales_contact",
"priority": 1,
"description": "High-priority lead - contact within 24 hours",
}
elif score >= 60:
return {
"category": "warm",
"action": "sales_outreach",
"priority": 2,
"description": "Qualified lead - add to outreach sequence",
}
elif score >= 40:
return {
"category": "nurture",
"action": "marketing_nurture",
"priority": 3,
"description": "Potential fit - add to nurture campaign",
}
else:
return {
"category": "cold",
"action": "low_priority",
"priority": 4,
"description": "Low fit - monitor for changes",
}Batch Scoring Pipeline
Process large lead lists through the scoring system. For teams using proxies for web scraping at scale, this pipeline integrates directly with existing data collection infrastructure.
class BatchScoringPipeline:
"""Score large batches of leads"""
def __init__(self, collector, scorer, proxy_pool):
self.collector = collector
self.scorer = scorer
self.proxy_pool = proxy_pool
async def score_batch(self, leads, batch_size=20):
"""Score a batch of leads with data collection"""
scored_leads = []
for i in range(0, len(leads), batch_size):
batch = leads[i:i + batch_size]
# Collect data for batch
tasks = [
self.collector.collect_all_data(lead["email"], lead.get("domain"))
for lead in batch
]
lead_data_list = await asyncio.gather(*tasks, return_exceptions=True)
# Score each lead
for lead_data in lead_data_list:
if isinstance(lead_data, LeadData):
scored = self.scorer.score_lead(lead_data)
classification = self.scorer.classify_lead(scored)
scored_leads.append({
"lead": scored,
"classification": classification,
})
print(f"Scored {min(i + batch_size, len(leads))}/{len(leads)} leads")
await asyncio.sleep(2)
# Sort by score descending
scored_leads.sort(key=lambda x: x["lead"].total_score, reverse=True)
return scored_leads
def generate_report(self, scored_leads):
"""Generate scoring distribution report"""
categories = {"hot": 0, "warm": 0, "nurture": 0, "cold": 0}
total_score = 0
for item in scored_leads:
cat = item["classification"]["category"]
categories[cat] += 1
total_score += item["lead"].total_score
avg_score = total_score / len(scored_leads) if scored_leads else 0
return {
"total_leads": len(scored_leads),
"average_score": round(avg_score, 1),
"distribution": categories,
"top_10": [
{
"email": item["lead"].email,
"domain": item["lead"].company_domain,
"score": item["lead"].total_score,
"category": item["classification"]["category"],
}
for item in scored_leads[:10]
],
}Score Calibration and Optimization
A/B Testing Scoring Models
class ScoringModelTest:
"""A/B test different scoring configurations"""
def __init__(self):
self.models = {}
self.results = {}
def add_model(self, name, config):
"""Register a scoring model for testing"""
self.models[name] = config
def evaluate_model(self, model_name, historical_leads, conversion_data):
"""Evaluate model accuracy against historical conversion data"""
model = self.models[model_name]
scorer = LeadScoringEngine(
model.get("icp_config", {}),
model.get("tech_config", {}),
)
predictions = []
for lead in historical_leads:
scored = scorer.score_lead(lead)
classification = scorer.classify_lead(scored)
actual_converted = conversion_data.get(lead.email, False)
predictions.append({
"email": lead.email,
"predicted_score": scored.total_score,
"predicted_category": classification["category"],
"actually_converted": actual_converted,
})
# Calculate accuracy metrics
hot_leads = [p for p in predictions if p["predicted_category"] == "hot"]
hot_conversion_rate = (
sum(1 for p in hot_leads if p["actually_converted"]) / len(hot_leads)
if hot_leads else 0
)
cold_leads = [p for p in predictions if p["predicted_category"] == "cold"]
cold_miss_rate = (
sum(1 for p in cold_leads if p["actually_converted"]) / len(cold_leads)
if cold_leads else 0
)
self.results[model_name] = {
"hot_conversion_rate": hot_conversion_rate,
"cold_miss_rate": cold_miss_rate,
"total_evaluated": len(predictions),
}
return self.results[model_name]CRM Integration
Push scored leads to your CRM with scoring context:
def push_scored_leads_to_crm(scored_leads, crm_api_key):
"""Push scored leads to HubSpot with scoring data"""
import requests
for item in scored_leads:
lead = item["lead"]
classification = item["classification"]
properties = {
"email": lead.email,
"website": lead.company_domain,
"lead_score_total": lead.total_score,
"lead_score_firmographic": lead.scores.get("firmographic", {}).get("score", 0),
"lead_score_technographic": lead.scores.get("technographic", {}).get("score", 0),
"lead_score_intent": lead.scores.get("intent", {}).get("score", 0),
"lead_score_engagement": lead.scores.get("engagement", {}).get("score", 0),
"lead_category": classification["category"],
"lead_priority": classification["priority"],
"hs_lead_status": "NEW" if classification["category"] == "hot" else "OPEN",
}
# Add tech stack as custom property
tech_stack = lead.technographic_data.get("technologies", [])
if tech_stack:
properties["detected_technologies"] = "; ".join(tech_stack)
response = requests.post(
"https://api.hubapi.com/crm/v3/objects/contacts",
headers={
"Authorization": f"Bearer {crm_api_key}",
"Content-Type": "application/json",
},
json={"properties": properties},
)
if response.status_code == 409:
# Contact exists - update score
# Find and update existing contact
passConclusion
Multi-source lead scoring transforms raw prospect data into prioritized, actionable sales intelligence. By combining firmographic fit, technology stack analysis, intent signals, and engagement data — all collected through proxy-powered scraping — you build scoring models that reliably predict which leads will convert. The key is continuous calibration: track which scored leads actually convert, adjust your weights accordingly, and A/B test model variations. Start with simple scoring rules, validate against your conversion data, and gradually add complexity as you accumulate enough data to measure the impact of each scoring dimension. The companies that master lead scoring consistently outperform competitors in both sales efficiency and conversion rates.
- How to Build an Automated Lead Scraping Pipeline with Proxies
- Building a B2B Contact Enrichment Pipeline with Mobile Proxies
- How to Scrape Job Listings at Scale with Rotating Proxies
- Proxies for HR Tech: Salary Benchmarking & Talent Intelligence
- How to Scrape AliExpress Product Data Without Getting Blocked
- Amazon Buy Box Monitoring: Proxy Setup for Continuous Tracking
- How to Build an Automated Lead Scraping Pipeline with Proxies
- Building a B2B Contact Enrichment Pipeline with Mobile Proxies
- How to Scrape Job Listings at Scale with Rotating Proxies
- Proxies for HR Tech: Salary Benchmarking & Talent Intelligence
- aiohttp + BeautifulSoup: Async Python Scraping
- How to Scrape AliExpress Product Data Without Getting Blocked
- How to Build an Automated Lead Scraping Pipeline with Proxies
- Building a B2B Contact Enrichment Pipeline with Mobile Proxies
- How to Scrape Job Listings at Scale with Rotating Proxies
- Proxies for HR Tech: Salary Benchmarking & Talent Intelligence
- aiohttp + BeautifulSoup: Async Python Scraping
- How to Scrape AliExpress Product Data Without Getting Blocked
- How to Build an Automated Lead Scraping Pipeline with Proxies
- Building a B2B Contact Enrichment Pipeline with Mobile Proxies
- How to Scrape Job Listings at Scale with Rotating Proxies
- Proxies for HR Tech: Salary Benchmarking & Talent Intelligence
- aiohttp + BeautifulSoup: Async Python Scraping
- How to Scrape AliExpress Product Data Without Getting Blocked
Related Reading
- How to Build an Automated Lead Scraping Pipeline with Proxies
- Building a B2B Contact Enrichment Pipeline with Mobile Proxies
- How to Scrape Job Listings at Scale with Rotating Proxies
- Proxies for HR Tech: Salary Benchmarking & Talent Intelligence
- aiohttp + BeautifulSoup: Async Python Scraping
- How to Scrape AliExpress Product Data Without Getting Blocked
last updated: April 3, 2026