Creating a Multi-Threaded Link Crawler

Creating a Multi-Threaded Link Crawler

A link crawler discovers all pages on a website by following hyperlinks from a starting URL. Unlike a scraper that extracts data, a crawler maps the site structure — finding pages, detecting broken links, building sitemaps, and identifying crawl patterns. Multi-threading makes this fast enough for large sites with thousands of pages.

Features

  • Multi-threaded crawling with configurable concurrency
  • URL deduplication to avoid infinite loops
  • Depth limiting to control crawl scope
  • robots.txt compliance
  • Subdomain handling (stay on domain or follow subdomains)
  • Broken link detection
  • Sitemap XML export
  • Proxy rotation for large crawls

Implementation

import asyncio
import httpx
import time
import hashlib
import xml.etree.ElementTree as ET
from urllib.parse import urljoin, urlparse, urldefrag
from dataclasses import dataclass, field
from typing import Set, List, Dict, Optional
from collections import deque
from selectolax.parser import HTMLParser
import logging

logger = logging.getLogger(__name__)

@dataclass
class CrawlResult:
    url: str
    status_code: int = 0
    content_type: str = ""
    title: str = ""
    depth: int = 0
    outgoing_links: List[str] = field(default_factory=list)
    incoming_links: int = 0
    load_time_ms: int = 0
    word_count: int = 0
    error: str = ""
    redirect_url: str = ""
    is_internal: bool = True

@dataclass
class CrawlStats:
    pages_crawled: int = 0
    pages_found: int = 0
    broken_links: int = 0
    redirects: int = 0
    external_links: int = 0
    total_time_seconds: float = 0
    avg_response_ms: float = 0
    max_depth_reached: int = 0


class LinkCrawler:
    def __init__(
        self,
        start_url: str,
        max_depth: int = 5,
        max_pages: int = 1000,
        concurrency: int = 10,
        timeout: int = 30,
        proxies: List[str] = None,
        follow_subdomains: bool = False,
        respect_robots: bool = True,
    ):
        self.start_url = start_url
        self.max_depth = max_depth
        self.max_pages = max_pages
        self.concurrency = concurrency
        self.timeout = timeout
        self.proxies = proxies or []
        self.proxy_index = 0
        self.follow_subdomains = follow_subdomains
        self.respect_robots = respect_robots

        parsed = urlparse(start_url)
        self.base_domain = parsed.netloc
        self.scheme = parsed.scheme

        self.visited: Set[str] = set()
        self.results: Dict[str, CrawlResult] = {}
        self.queue: asyncio.Queue = None
        self.stats = CrawlStats()

        self.disallowed_paths: Set[str] = set()
        self.crawl_delay: float = 0

    def _normalize_url(self, url: str) -> str:
        """Normalize URL for deduplication."""
        url, _ = urldefrag(url)  # Remove fragments
        url = url.rstrip('/')
        if url.startswith('//'):
            url = f"{self.scheme}:{url}"
        return url

    def _is_internal(self, url: str) -> bool:
        parsed = urlparse(url)
        if not parsed.netloc:
            return True
        if parsed.netloc == self.base_domain:
            return True
        if self.follow_subdomains:
            base_parts = self.base_domain.split('.')
            url_parts = parsed.netloc.split('.')
            if len(base_parts) >= 2 and len(url_parts) >= 2:
                return base_parts[-2:] == url_parts[-2:]
        return False

    def _should_crawl(self, url: str) -> bool:
        parsed = urlparse(url)

        # Skip non-HTTP
        if parsed.scheme not in ('http', 'https', ''):
            return False

        # Skip common non-page extensions
        skip_ext = {
            '.pdf', '.jpg', '.jpeg', '.png', '.gif', '.svg',
            '.css', '.js', '.ico', '.woff', '.woff2', '.ttf',
            '.mp4', '.mp3', '.zip', '.tar', '.gz', '.exe',
        }
        path = parsed.path.lower()
        if any(path.endswith(ext) for ext in skip_ext):
            return False

        # Check robots.txt
        if self.respect_robots:
            for disallowed in self.disallowed_paths:
                if parsed.path.startswith(disallowed):
                    return False

        return True

    async def _load_robots(self):
        try:
            async with httpx.AsyncClient(timeout=10) as client:
                resp = await client.get(
                    f"{self.scheme}://{self.base_domain}/robots.txt"
                )
                if resp.status_code == 200:
                    for line in resp.text.splitlines():
                        line = line.strip()
                        if line.lower().startswith('disallow:'):
                            path = line.split(':', 1)[1].strip()
                            if path:
                                self.disallowed_paths.add(path)
                        elif line.lower().startswith('crawl-delay:'):
                            try:
                                self.crawl_delay = float(
                                    line.split(':', 1)[1].strip()
                                )
                            except ValueError:
                                pass
                    logger.info(
                        f"Loaded robots.txt: {len(self.disallowed_paths)} "
                        f"disallowed paths, crawl-delay: {self.crawl_delay}s"
                    )
        except Exception:
            pass

    def _get_proxy(self) -> Optional[str]:
        if not self.proxies:
            return None
        proxy = self.proxies[self.proxy_index % len(self.proxies)]
        self.proxy_index += 1
        return proxy

    def _extract_links(self, html: str, base_url: str) -> List[str]:
        tree = HTMLParser(html)
        links = []

        for a in tree.css('a[href]'):
            href = a.attributes.get('href', '').strip()
            if not href or href.startswith(('#', 'javascript:', 'mailto:', 'tel:')):
                continue

            absolute = urljoin(base_url, href)
            normalized = self._normalize_url(absolute)
            links.append(normalized)

        return links

    async def _crawl_url(self, url: str, depth: int):
        if url in self.visited:
            return
        if len(self.visited) >= self.max_pages:
            return
        if depth > self.max_depth:
            return

        self.visited.add(url)
        result = CrawlResult(url=url, depth=depth)

        proxy = self._get_proxy()

        try:
            start = time.monotonic()
            async with httpx.AsyncClient(
                proxy=proxy,
                timeout=self.timeout,
                follow_redirects=True,
            ) as client:
                resp = await client.get(url, headers={
                    'User-Agent': 'Mozilla/5.0 (compatible; LinkCrawler/1.0)',
                })

            result.load_time_ms = int((time.monotonic() - start) * 1000)
            result.status_code = resp.status_code
            result.content_type = resp.headers.get('content-type', '')

            if str(resp.url) != url:
                result.redirect_url = str(resp.url)
                self.stats.redirects += 1

            if resp.status_code >= 400:
                self.stats.broken_links += 1

            if resp.status_code == 200 and 'text/html' in result.content_type:
                tree = HTMLParser(resp.text)

                # Extract title
                title = tree.css_first('title')
                result.title = title.text(strip=True) if title else ""

                # Word count
                body = tree.css_first('body')
                if body:
                    text = body.text(separator=' ', strip=True)
                    result.word_count = len(text.split())

                # Extract and queue links
                links = self._extract_links(resp.text, url)
                result.outgoing_links = links

                for link in links:
                    if self._is_internal(link):
                        if (link not in self.visited and
                            self._should_crawl(link) and
                            len(self.visited) < self.max_pages):
                            await self.queue.put((link, depth + 1))
                            self.stats.pages_found += 1
                    else:
                        self.stats.external_links += 1
                        result_ext = CrawlResult(
                            url=link, is_internal=False, depth=depth + 1
                        )
                        self.results[link] = result_ext

        except Exception as e:
            result.error = str(e)[:200]
            result.status_code = 0

        self.results[url] = result
        self.stats.pages_crawled += 1

        if depth > self.stats.max_depth_reached:
            self.stats.max_depth_reached = depth

        if self.stats.pages_crawled % 10 == 0:
            print(
                f"  Crawled: {self.stats.pages_crawled} | "
                f"Queue: ~{self.queue.qsize()} | "
                f"Depth: {depth}"
            )

        # Respect crawl delay
        if self.crawl_delay > 0:
            await asyncio.sleep(self.crawl_delay)

    async def crawl(self) -> Dict[str, CrawlResult]:
        print(f"Starting crawl: {self.start_url}")
        print(f"Max depth: {self.max_depth}, Max pages: {self.max_pages}")

        if self.respect_robots:
            await self._load_robots()

        self.queue = asyncio.Queue()
        await self.queue.put((self.start_url, 0))

        start_time = time.monotonic()

        async def worker():
            while True:
                try:
                    url, depth = await asyncio.wait_for(
                        self.queue.get(), timeout=10
                    )
                    await self._crawl_url(url, depth)
                    self.queue.task_done()
                except asyncio.TimeoutError:
                    break
                except Exception as e:
                    logger.error(f"Worker error: {e}")

        workers = [
            asyncio.create_task(worker())
            for _ in range(self.concurrency)
        ]

        await asyncio.gather(*workers)

        self.stats.total_time_seconds = round(
            time.monotonic() - start_time, 1
        )

        if self.stats.pages_crawled > 0:
            total_ms = sum(
                r.load_time_ms for r in self.results.values()
                if r.load_time_ms > 0
            )
            self.stats.avg_response_ms = round(
                total_ms / self.stats.pages_crawled
            )

        # Count incoming links
        for result in self.results.values():
            for link in result.outgoing_links:
                if link in self.results:
                    self.results[link].incoming_links += 1

        return self.results

    def export_sitemap(self, filepath: str = "sitemap.xml"):
        urlset = ET.Element(
            "urlset",
            xmlns="http://www.sitemaps.org/schemas/sitemap/0.9",
        )

        for url, result in sorted(self.results.items()):
            if result.status_code == 200 and result.is_internal:
                url_elem = ET.SubElement(urlset, "url")
                ET.SubElement(url_elem, "loc").text = url

                # Priority based on depth
                priority = max(0.1, 1.0 - (result.depth * 0.2))
                ET.SubElement(url_elem, "priority").text = f"{priority:.1f}"

        tree = ET.ElementTree(urlset)
        ET.indent(tree, space="  ")
        tree.write(filepath, xml_declaration=True, encoding="UTF-8")
        print(f"Sitemap exported to {filepath} ({len(urlset)} URLs)")

    def export_broken_links(self, filepath: str = "broken_links.csv"):
        import csv
        broken = [
            r for r in self.results.values()
            if r.status_code >= 400 or (r.status_code == 0 and r.error)
        ]

        with open(filepath, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(['url', 'status_code', 'error', 'found_at_depth'])
            for r in broken:
                writer.writerow([r.url, r.status_code, r.error, r.depth])

        print(f"Broken links exported: {len(broken)} links to {filepath}")

    def print_summary(self):
        s = self.stats
        print(f"\n{'='*60}")
        print("CRAWL SUMMARY")
        print(f"{'='*60}")
        print(f"Pages crawled: {s.pages_crawled}")
        print(f"Unique pages found: {s.pages_found}")
        print(f"Broken links: {s.broken_links}")
        print(f"Redirects: {s.redirects}")
        print(f"External links: {s.external_links}")
        print(f"Max depth: {s.max_depth_reached}")
        print(f"Avg response time: {s.avg_response_ms}ms")
        print(f"Total time: {s.total_time_seconds}s")

        # Top pages by incoming links
        internal = [
            r for r in self.results.values()
            if r.is_internal and r.status_code == 200
        ]
        top = sorted(internal, key=lambda x: x.incoming_links, reverse=True)

        print(f"\n--- Most Linked Pages ---")
        for page in top[:10]:
            print(f"  {page.incoming_links:3d} links → {page.url[:60]}")


# Usage
async def main():
    crawler = LinkCrawler(
        start_url="https://example.com",
        max_depth=3,
        max_pages=500,
        concurrency=10,
        proxies=[
            "http://user:pass@proxy1.example.com:8080",
        ],
    )

    results = await crawler.crawl()

    crawler.print_summary()
    crawler.export_sitemap("sitemap.xml")
    crawler.export_broken_links("broken_links.csv")

asyncio.run(main())

Internal Links

FAQ

How do I handle infinite crawl loops?

URL normalization and deduplication prevent loops. The crawler normalizes URLs by removing fragments, trailing slashes, and query parameter ordering. Each normalized URL is checked against the visited set before crawling. The max_pages limit provides a hard stop.

Should I crawl JavaScript-rendered pages?

For sites that load content via JavaScript, the HTML-only crawler will miss dynamically loaded links. Integrate Playwright for JS rendering, but be aware this reduces crawl speed significantly. Most sites have enough server-rendered links for sitemap generation.

How fast can I crawl without getting blocked?

With 10 concurrent connections and no additional delay, most sites tolerate 5-10 requests per second. Always check robots.txt for crawl-delay directives. For large sites, use proxies to distribute the load across multiple IPs.

How do I handle authentication-protected pages?

Pass session cookies or authentication headers in the httpx client. Log in once before starting the crawl, capture the session cookie, and include it in all subsequent requests. Some sites require a fresh authentication token periodically.

What is the difference between a crawler and a scraper?

A crawler discovers pages by following links — its output is a list of URLs and their metadata. A scraper extracts specific data from pages. In practice, you often combine them: crawl first to find all product pages, then scrape each page for product details.


Related Reading

Scroll to Top