How to Build a Multi-Source Lead Scoring System with Proxy-Collected Data

How to Build a Multi-Source Lead Scoring System with Proxy-Collected Data

Lead scoring determines which prospects your sales team contacts first. A well-built scoring system separates the 5% of leads that are ready to buy from the 95% that need nurturing — or should be discarded entirely. The best scoring systems combine data from multiple sources, each providing a different signal about the prospect’s fit and buying readiness.

Mobile proxies enable the large-scale data collection that feeds multi-source scoring. By scraping company websites, social profiles, technology databases, hiring data, and intent signals, you build scoring models that dramatically outperform simple firmographic filters.

The Four Dimensions of Lead Scoring

Effective B2B lead scoring evaluates prospects across four dimensions:

DimensionWhat It MeasuresData Sources
Firmographic FitDoes the company match your ICP?LinkedIn, Crunchbase, company websites
Technographic FitDoes their tech stack align with your product?BuiltWith, company websites, job postings
Intent SignalsAre they actively looking for a solution?Forums, G2 reviews, content engagement
EngagementHave they interacted with your brand?Website analytics, email opens, content downloads

Each dimension contributes to a composite score, typically on a 0-100 scale.

Data Collection Architecture

Multi-Source Data Pipeline

from dataclasses import dataclass, field
from typing import Dict, List, Optional, Any
from datetime import datetime
import asyncio
import json

@dataclass
class LeadData:
    """Container for all collected lead data"""
    email: str
    company_domain: str
    raw_data: Dict[str, Any] = field(default_factory=dict)
    firmographic_data: Dict = field(default_factory=dict)
    technographic_data: Dict = field(default_factory=dict)
    intent_data: Dict = field(default_factory=dict)
    engagement_data: Dict = field(default_factory=dict)
    scores: Dict = field(default_factory=dict)
    total_score: float = 0.0
    collected_at: str = field(default_factory=lambda: datetime.utcnow().isoformat())

class DataCollector:
    """Collect lead data from multiple sources using proxies"""

    def __init__(self, proxy_pool):
        self.proxy_pool = proxy_pool

    async def collect_all_data(self, email, company_domain):
        """Collect data from all sources for a lead"""
        lead = LeadData(email=email, company_domain=company_domain)

        # Run all data collectors in parallel
        results = await asyncio.gather(
            self.collect_firmographic(company_domain),
            self.collect_technographic(company_domain),
            self.collect_intent(company_domain, email),
            return_exceptions=True,
        )

        if not isinstance(results[0], Exception):
            lead.firmographic_data = results[0]
        if not isinstance(results[1], Exception):
            lead.technographic_data = results[1]
        if not isinstance(results[2], Exception):
            lead.intent_data = results[2]

        return lead

    async def collect_firmographic(self, domain):
        """Collect firmographic data from company website and LinkedIn"""
        proxy = self.proxy_pool.get_next()
        data = {}

        try:
            import aiohttp
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    f"https://{domain}",
                    proxy=proxy,
                    timeout=aiohttp.ClientTimeout(total=15),
                    headers={"User-Agent": "Mozilla/5.0"}
                ) as response:
                    html = await response.text()

                    import re

                    # Extract company description
                    desc_match = re.search(
                        r'<meta\s+name="description"\s+content="([^"]+)"',
                        html, re.IGNORECASE
                    )
                    if desc_match:
                        data["description"] = desc_match.group(1)

                    # Detect company size signals
                    emp_match = re.search(
                        r'(\d{1,5})\+?\s*(?:employees|team members|people)',
                        html, re.IGNORECASE
                    )
                    if emp_match:
                        data["estimated_employees"] = int(emp_match.group(1))

                    # Founded year
                    founded_match = re.search(
                        r'(?:founded|established|since)\s*(?:in\s*)?(\d{4})',
                        html, re.IGNORECASE
                    )
                    if founded_match:
                        data["founded_year"] = int(founded_match.group(1))

                    # Location
                    loc_match = re.search(
                        r'(?:headquartered|based|located)\s+in\s+([^.<]+)',
                        html, re.IGNORECASE
                    )
                    if loc_match:
                        data["headquarters"] = loc_match.group(1).strip()

        except Exception as e:
            data["error"] = str(e)

        return data

    async def collect_technographic(self, domain):
        """Detect technology stack from company website"""
        proxy = self.proxy_pool.get_next()
        data = {"technologies": []}

        tech_signatures = {
            "Salesforce": ["force.com", "salesforce.com", "pardot"],
            "HubSpot": ["hs-analytics", "hubspot.com", "hstc="],
            "Marketo": ["marketo.com", "mktoweb"],
            "Intercom": ["intercom.io", "widget.intercom"],
            "Drift": ["drift.com", "driftt"],
            "Segment": ["cdn.segment.com", "analytics.js"],
            "Google Analytics": ["google-analytics.com", "gtag"],
            "Stripe": ["js.stripe.com"],
            "Shopify": ["cdn.shopify.com"],
            "WordPress": ["wp-content", "wp-includes"],
            "React": ["react", "_reactRootContainer"],
            "AWS": ["amazonaws.com"],
            "Cloudflare": ["cloudflare"],
            "Zendesk": ["zendesk.com", "zdassets"],
            "Slack": ["slack.com"],
            "Jira": ["atlassian.com", "jira"],
        }

        try:
            import aiohttp
            async with aiohttp.ClientSession() as session:
                async with session.get(
                    f"https://{domain}",
                    proxy=proxy,
                    timeout=aiohttp.ClientTimeout(total=15),
                    headers={"User-Agent": "Mozilla/5.0"}
                ) as response:
                    html = (await response.text()).lower()
                    headers = dict(response.headers)

                    for tech, signatures in tech_signatures.items():
                        if any(sig.lower() in html for sig in signatures):
                            data["technologies"].append(tech)

                    # Check headers
                    if "cloudflare" in headers.get("server", "").lower():
                        if "Cloudflare" not in data["technologies"]:
                            data["technologies"].append("Cloudflare")

        except Exception as e:
            data["error"] = str(e)

        return data

    async def collect_intent(self, domain, email):
        """Collect intent signals from various sources"""
        proxy = self.proxy_pool.get_next()
        data = {"signals": []}

        # Check for recent job postings (hiring = growing)
        try:
            import aiohttp
            async with aiohttp.ClientSession() as session:
                # Check careers page
                for careers_path in ["/careers", "/jobs", "/join-us", "/hiring"]:
                    try:
                        async with session.get(
                            f"https://{domain}{careers_path}",
                            proxy=proxy,
                            timeout=aiohttp.ClientTimeout(total=10),
                            headers={"User-Agent": "Mozilla/5.0"},
                            allow_redirects=True,
                        ) as response:
                            if response.status == 200:
                                html = await response.text()
                                import re
                                job_count = len(re.findall(
                                    r'(?:job|position|role|opening)',
                                    html, re.IGNORECASE
                                ))
                                if job_count > 5:
                                    data["signals"].append({
                                        "type": "hiring_actively",
                                        "strength": min(job_count / 10, 1.0),
                                        "details": f"~{job_count} job-related mentions on careers page"
                                    })
                                break
                    except Exception:
                        continue
        except Exception:
            pass

        return data

The Scoring Engine

Firmographic Scoring

class FirmographicScorer:
    """Score leads based on firmographic fit"""

    def __init__(self, icp_config):
        self.icp = icp_config

    def score(self, firmographic_data):
        """Score firmographic fit (0-25 points)"""
        score = 0
        details = []

        # Company size scoring
        employees = firmographic_data.get("estimated_employees", 0)
        ideal_min = self.icp.get("min_employees", 10)
        ideal_max = self.icp.get("max_employees", 1000)

        if ideal_min <= employees <= ideal_max:
            score += 10
            details.append(f"Company size ({employees}) within ICP range")
        elif employees > 0:
            # Partial credit for close matches
            if employees < ideal_min:
                ratio = employees / ideal_min
            else:
                ratio = ideal_max / employees
            score += int(10 * ratio)
            details.append(f"Company size ({employees}) partially matches ICP")

        # Industry match
        description = firmographic_data.get("description", "").lower()
        target_industries = self.icp.get("target_industries", [])
        for industry in target_industries:
            if industry.lower() in description:
                score += 8
                details.append(f"Industry match: {industry}")
                break

        # Geography match
        headquarters = firmographic_data.get("headquarters", "").lower()
        target_geos = self.icp.get("target_geographies", [])
        for geo in target_geos:
            if geo.lower() in headquarters:
                score += 7
                details.append(f"Geography match: {geo}")
                break

        return {"score": min(score, 25), "max": 25, "details": details}

Technographic Scoring

Understanding technology stack compatibility is fundamental to B2B sales. The proxy infrastructure for tech stack detection is explained in our proxy glossary.

class TechnographicScorer:
    """Score leads based on technology stack fit"""

    def __init__(self, tech_config):
        self.config = tech_config

    def score(self, technographic_data):
        """Score technographic fit (0-25 points)"""
        score = 0
        details = []
        technologies = technographic_data.get("technologies", [])

        # Positive signals (technologies that indicate good fit)
        positive_tech = self.config.get("positive_technologies", [])
        for tech in positive_tech:
            if tech in technologies:
                score += 5
                details.append(f"Uses {tech} (positive signal)")

        # Negative signals (technologies that indicate poor fit)
        negative_tech = self.config.get("negative_technologies", [])
        for tech in negative_tech:
            if tech in technologies:
                score -= 5
                details.append(f"Uses {tech} (negative signal)")

        # Competitor technology (replacement opportunity)
        competitor_tech = self.config.get("competitor_technologies", [])
        for tech in competitor_tech:
            if tech in technologies:
                score += 8
                details.append(f"Uses competitor {tech} (replacement opportunity)")

        # Complementary technology (integration opportunity)
        complement_tech = self.config.get("complementary_technologies", [])
        for tech in complement_tech:
            if tech in technologies:
                score += 4
                details.append(f"Uses {tech} (integration opportunity)")

        return {"score": max(0, min(score, 25)), "max": 25, "details": details}

Intent Scoring

class IntentScorer:
    """Score leads based on buying intent signals"""

    def score(self, intent_data):
        """Score intent signals (0-30 points)"""
        score = 0
        details = []
        signals = intent_data.get("signals", [])

        signal_weights = {
            "hiring_actively": 8,
            "funding_recent": 10,
            "competitor_review": 12,
            "content_engagement": 7,
            "forum_discussion": 9,
            "pricing_page_visit": 15,
            "demo_request": 25,
        }

        for signal in signals:
            signal_type = signal.get("type")
            weight = signal_weights.get(signal_type, 5)
            strength = signal.get("strength", 1.0)
            adjusted_weight = int(weight * strength)

            score += adjusted_weight
            details.append(f"Intent signal: {signal_type} (strength: {strength:.1f})")

        return {"score": min(score, 30), "max": 30, "details": details}

Engagement Scoring

class EngagementScorer:
    """Score leads based on engagement with your brand"""

    def score(self, engagement_data):
        """Score engagement (0-20 points)"""
        score = 0
        details = []

        # Website visits
        visits = engagement_data.get("website_visits", 0)
        if visits >= 5:
            score += 8
            details.append(f"High website engagement ({visits} visits)")
        elif visits >= 2:
            score += 4
            details.append(f"Some website engagement ({visits} visits)")

        # Email engagement
        opens = engagement_data.get("email_opens", 0)
        clicks = engagement_data.get("email_clicks", 0)

        if clicks >= 2:
            score += 6
            details.append(f"Strong email engagement ({clicks} clicks)")
        elif opens >= 3:
            score += 3
            details.append(f"Moderate email engagement ({opens} opens)")

        # Content downloads
        downloads = engagement_data.get("content_downloads", 0)
        if downloads >= 1:
            score += 6
            details.append(f"Downloaded {downloads} content piece(s)")

        return {"score": min(score, 20), "max": 20, "details": details}

Composite Scoring System

Combine all four dimensions into a final score:

class LeadScoringEngine:
    """Composite lead scoring system"""

    def __init__(self, icp_config, tech_config):
        self.firmographic = FirmographicScorer(icp_config)
        self.technographic = TechnographicScorer(tech_config)
        self.intent = IntentScorer()
        self.engagement = EngagementScorer()

    def score_lead(self, lead_data: LeadData) -> LeadData:
        """Calculate composite lead score"""
        # Score each dimension
        firm_result = self.firmographic.score(lead_data.firmographic_data)
        tech_result = self.technographic.score(lead_data.technographic_data)
        intent_result = self.intent.score(lead_data.intent_data)
        engage_result = self.engagement.score(lead_data.engagement_data)

        lead_data.scores = {
            "firmographic": firm_result,
            "technographic": tech_result,
            "intent": intent_result,
            "engagement": engage_result,
        }

        # Calculate total (out of 100)
        lead_data.total_score = (
            firm_result["score"] +
            tech_result["score"] +
            intent_result["score"] +
            engage_result["score"]
        )

        return lead_data

    def classify_lead(self, lead_data: LeadData):
        """Classify lead by score into action categories"""
        score = lead_data.total_score

        if score >= 80:
            return {
                "category": "hot",
                "action": "immediate_sales_contact",
                "priority": 1,
                "description": "High-priority lead - contact within 24 hours",
            }
        elif score >= 60:
            return {
                "category": "warm",
                "action": "sales_outreach",
                "priority": 2,
                "description": "Qualified lead - add to outreach sequence",
            }
        elif score >= 40:
            return {
                "category": "nurture",
                "action": "marketing_nurture",
                "priority": 3,
                "description": "Potential fit - add to nurture campaign",
            }
        else:
            return {
                "category": "cold",
                "action": "low_priority",
                "priority": 4,
                "description": "Low fit - monitor for changes",
            }

Batch Scoring Pipeline

Process large lead lists through the scoring system. For teams using proxies for web scraping at scale, this pipeline integrates directly with existing data collection infrastructure.

class BatchScoringPipeline:
    """Score large batches of leads"""

    def __init__(self, collector, scorer, proxy_pool):
        self.collector = collector
        self.scorer = scorer
        self.proxy_pool = proxy_pool

    async def score_batch(self, leads, batch_size=20):
        """Score a batch of leads with data collection"""
        scored_leads = []

        for i in range(0, len(leads), batch_size):
            batch = leads[i:i + batch_size]

            # Collect data for batch
            tasks = [
                self.collector.collect_all_data(lead["email"], lead.get("domain"))
                for lead in batch
            ]
            lead_data_list = await asyncio.gather(*tasks, return_exceptions=True)

            # Score each lead
            for lead_data in lead_data_list:
                if isinstance(lead_data, LeadData):
                    scored = self.scorer.score_lead(lead_data)
                    classification = self.scorer.classify_lead(scored)
                    scored_leads.append({
                        "lead": scored,
                        "classification": classification,
                    })

            print(f"Scored {min(i + batch_size, len(leads))}/{len(leads)} leads")
            await asyncio.sleep(2)

        # Sort by score descending
        scored_leads.sort(key=lambda x: x["lead"].total_score, reverse=True)

        return scored_leads

    def generate_report(self, scored_leads):
        """Generate scoring distribution report"""
        categories = {"hot": 0, "warm": 0, "nurture": 0, "cold": 0}
        total_score = 0

        for item in scored_leads:
            cat = item["classification"]["category"]
            categories[cat] += 1
            total_score += item["lead"].total_score

        avg_score = total_score / len(scored_leads) if scored_leads else 0

        return {
            "total_leads": len(scored_leads),
            "average_score": round(avg_score, 1),
            "distribution": categories,
            "top_10": [
                {
                    "email": item["lead"].email,
                    "domain": item["lead"].company_domain,
                    "score": item["lead"].total_score,
                    "category": item["classification"]["category"],
                }
                for item in scored_leads[:10]
            ],
        }

Score Calibration and Optimization

A/B Testing Scoring Models

class ScoringModelTest:
    """A/B test different scoring configurations"""

    def __init__(self):
        self.models = {}
        self.results = {}

    def add_model(self, name, config):
        """Register a scoring model for testing"""
        self.models[name] = config

    def evaluate_model(self, model_name, historical_leads, conversion_data):
        """Evaluate model accuracy against historical conversion data"""
        model = self.models[model_name]
        scorer = LeadScoringEngine(
            model.get("icp_config", {}),
            model.get("tech_config", {}),
        )

        predictions = []
        for lead in historical_leads:
            scored = scorer.score_lead(lead)
            classification = scorer.classify_lead(scored)
            actual_converted = conversion_data.get(lead.email, False)

            predictions.append({
                "email": lead.email,
                "predicted_score": scored.total_score,
                "predicted_category": classification["category"],
                "actually_converted": actual_converted,
            })

        # Calculate accuracy metrics
        hot_leads = [p for p in predictions if p["predicted_category"] == "hot"]
        hot_conversion_rate = (
            sum(1 for p in hot_leads if p["actually_converted"]) / len(hot_leads)
            if hot_leads else 0
        )

        cold_leads = [p for p in predictions if p["predicted_category"] == "cold"]
        cold_miss_rate = (
            sum(1 for p in cold_leads if p["actually_converted"]) / len(cold_leads)
            if cold_leads else 0
        )

        self.results[model_name] = {
            "hot_conversion_rate": hot_conversion_rate,
            "cold_miss_rate": cold_miss_rate,
            "total_evaluated": len(predictions),
        }

        return self.results[model_name]

CRM Integration

Push scored leads to your CRM with scoring context:

def push_scored_leads_to_crm(scored_leads, crm_api_key):
    """Push scored leads to HubSpot with scoring data"""
    import requests

    for item in scored_leads:
        lead = item["lead"]
        classification = item["classification"]

        properties = {
            "email": lead.email,
            "website": lead.company_domain,
            "lead_score_total": lead.total_score,
            "lead_score_firmographic": lead.scores.get("firmographic", {}).get("score", 0),
            "lead_score_technographic": lead.scores.get("technographic", {}).get("score", 0),
            "lead_score_intent": lead.scores.get("intent", {}).get("score", 0),
            "lead_score_engagement": lead.scores.get("engagement", {}).get("score", 0),
            "lead_category": classification["category"],
            "lead_priority": classification["priority"],
            "hs_lead_status": "NEW" if classification["category"] == "hot" else "OPEN",
        }

        # Add tech stack as custom property
        tech_stack = lead.technographic_data.get("technologies", [])
        if tech_stack:
            properties["detected_technologies"] = "; ".join(tech_stack)

        response = requests.post(
            "https://api.hubapi.com/crm/v3/objects/contacts",
            headers={
                "Authorization": f"Bearer {crm_api_key}",
                "Content-Type": "application/json",
            },
            json={"properties": properties},
        )

        if response.status_code == 409:
            # Contact exists - update score
            # Find and update existing contact
            pass

Conclusion

Multi-source lead scoring transforms raw prospect data into prioritized, actionable sales intelligence. By combining firmographic fit, technology stack analysis, intent signals, and engagement data — all collected through proxy-powered scraping — you build scoring models that reliably predict which leads will convert. The key is continuous calibration: track which scored leads actually convert, adjust your weights accordingly, and A/B test model variations. Start with simple scoring rules, validate against your conversion data, and gradually add complexity as you accumulate enough data to measure the impact of each scoring dimension. The companies that master lead scoring consistently outperform competitors in both sales efficiency and conversion rates.


Related Reading

last updated: April 3, 2026

Scroll to Top

Resources

Proxy Signals Podcast
Operator-level insights on mobile proxies and access infrastructure.

Multi-Account Proxies: Setup, Types, Tools & Mistakes (2026)