Creating a Multi-Threaded Link Crawler
A link crawler discovers all pages on a website by following hyperlinks from a starting URL. Unlike a scraper that extracts data, a crawler maps the site structure — finding pages, detecting broken links, building sitemaps, and identifying crawl patterns. Multi-threading makes this fast enough for large sites with thousands of pages.
Features
- Multi-threaded crawling with configurable concurrency
- URL deduplication to avoid infinite loops
- Depth limiting to control crawl scope
- robots.txt compliance
- Subdomain handling (stay on domain or follow subdomains)
- Broken link detection
- Sitemap XML export
- Proxy rotation for large crawls
Implementation
import asyncio
import httpx
import time
import hashlib
import xml.etree.ElementTree as ET
from urllib.parse import urljoin, urlparse, urldefrag
from dataclasses import dataclass, field
from typing import Set, List, Dict, Optional
from collections import deque
from selectolax.parser import HTMLParser
import logging
logger = logging.getLogger(__name__)
@dataclass
class CrawlResult:
url: str
status_code: int = 0
content_type: str = ""
title: str = ""
depth: int = 0
outgoing_links: List[str] = field(default_factory=list)
incoming_links: int = 0
load_time_ms: int = 0
word_count: int = 0
error: str = ""
redirect_url: str = ""
is_internal: bool = True
@dataclass
class CrawlStats:
pages_crawled: int = 0
pages_found: int = 0
broken_links: int = 0
redirects: int = 0
external_links: int = 0
total_time_seconds: float = 0
avg_response_ms: float = 0
max_depth_reached: int = 0
class LinkCrawler:
def __init__(
self,
start_url: str,
max_depth: int = 5,
max_pages: int = 1000,
concurrency: int = 10,
timeout: int = 30,
proxies: List[str] = None,
follow_subdomains: bool = False,
respect_robots: bool = True,
):
self.start_url = start_url
self.max_depth = max_depth
self.max_pages = max_pages
self.concurrency = concurrency
self.timeout = timeout
self.proxies = proxies or []
self.proxy_index = 0
self.follow_subdomains = follow_subdomains
self.respect_robots = respect_robots
parsed = urlparse(start_url)
self.base_domain = parsed.netloc
self.scheme = parsed.scheme
self.visited: Set[str] = set()
self.results: Dict[str, CrawlResult] = {}
self.queue: asyncio.Queue = None
self.stats = CrawlStats()
self.disallowed_paths: Set[str] = set()
self.crawl_delay: float = 0
def _normalize_url(self, url: str) -> str:
"""Normalize URL for deduplication."""
url, _ = urldefrag(url) # Remove fragments
url = url.rstrip('/')
if url.startswith('//'):
url = f"{self.scheme}:{url}"
return url
def _is_internal(self, url: str) -> bool:
parsed = urlparse(url)
if not parsed.netloc:
return True
if parsed.netloc == self.base_domain:
return True
if self.follow_subdomains:
base_parts = self.base_domain.split('.')
url_parts = parsed.netloc.split('.')
if len(base_parts) >= 2 and len(url_parts) >= 2:
return base_parts[-2:] == url_parts[-2:]
return False
def _should_crawl(self, url: str) -> bool:
parsed = urlparse(url)
# Skip non-HTTP
if parsed.scheme not in ('http', 'https', ''):
return False
# Skip common non-page extensions
skip_ext = {
'.pdf', '.jpg', '.jpeg', '.png', '.gif', '.svg',
'.css', '.js', '.ico', '.woff', '.woff2', '.ttf',
'.mp4', '.mp3', '.zip', '.tar', '.gz', '.exe',
}
path = parsed.path.lower()
if any(path.endswith(ext) for ext in skip_ext):
return False
# Check robots.txt
if self.respect_robots:
for disallowed in self.disallowed_paths:
if parsed.path.startswith(disallowed):
return False
return True
async def _load_robots(self):
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(
f"{self.scheme}://{self.base_domain}/robots.txt"
)
if resp.status_code == 200:
for line in resp.text.splitlines():
line = line.strip()
if line.lower().startswith('disallow:'):
path = line.split(':', 1)[1].strip()
if path:
self.disallowed_paths.add(path)
elif line.lower().startswith('crawl-delay:'):
try:
self.crawl_delay = float(
line.split(':', 1)[1].strip()
)
except ValueError:
pass
logger.info(
f"Loaded robots.txt: {len(self.disallowed_paths)} "
f"disallowed paths, crawl-delay: {self.crawl_delay}s"
)
except Exception:
pass
def _get_proxy(self) -> Optional[str]:
if not self.proxies:
return None
proxy = self.proxies[self.proxy_index % len(self.proxies)]
self.proxy_index += 1
return proxy
def _extract_links(self, html: str, base_url: str) -> List[str]:
tree = HTMLParser(html)
links = []
for a in tree.css('a[href]'):
href = a.attributes.get('href', '').strip()
if not href or href.startswith(('#', 'javascript:', 'mailto:', 'tel:')):
continue
absolute = urljoin(base_url, href)
normalized = self._normalize_url(absolute)
links.append(normalized)
return links
async def _crawl_url(self, url: str, depth: int):
if url in self.visited:
return
if len(self.visited) >= self.max_pages:
return
if depth > self.max_depth:
return
self.visited.add(url)
result = CrawlResult(url=url, depth=depth)
proxy = self._get_proxy()
try:
start = time.monotonic()
async with httpx.AsyncClient(
proxy=proxy,
timeout=self.timeout,
follow_redirects=True,
) as client:
resp = await client.get(url, headers={
'User-Agent': 'Mozilla/5.0 (compatible; LinkCrawler/1.0)',
})
result.load_time_ms = int((time.monotonic() - start) * 1000)
result.status_code = resp.status_code
result.content_type = resp.headers.get('content-type', '')
if str(resp.url) != url:
result.redirect_url = str(resp.url)
self.stats.redirects += 1
if resp.status_code >= 400:
self.stats.broken_links += 1
if resp.status_code == 200 and 'text/html' in result.content_type:
tree = HTMLParser(resp.text)
# Extract title
title = tree.css_first('title')
result.title = title.text(strip=True) if title else ""
# Word count
body = tree.css_first('body')
if body:
text = body.text(separator=' ', strip=True)
result.word_count = len(text.split())
# Extract and queue links
links = self._extract_links(resp.text, url)
result.outgoing_links = links
for link in links:
if self._is_internal(link):
if (link not in self.visited and
self._should_crawl(link) and
len(self.visited) < self.max_pages):
await self.queue.put((link, depth + 1))
self.stats.pages_found += 1
else:
self.stats.external_links += 1
result_ext = CrawlResult(
url=link, is_internal=False, depth=depth + 1
)
self.results[link] = result_ext
except Exception as e:
result.error = str(e)[:200]
result.status_code = 0
self.results[url] = result
self.stats.pages_crawled += 1
if depth > self.stats.max_depth_reached:
self.stats.max_depth_reached = depth
if self.stats.pages_crawled % 10 == 0:
print(
f" Crawled: {self.stats.pages_crawled} | "
f"Queue: ~{self.queue.qsize()} | "
f"Depth: {depth}"
)
# Respect crawl delay
if self.crawl_delay > 0:
await asyncio.sleep(self.crawl_delay)
async def crawl(self) -> Dict[str, CrawlResult]:
print(f"Starting crawl: {self.start_url}")
print(f"Max depth: {self.max_depth}, Max pages: {self.max_pages}")
if self.respect_robots:
await self._load_robots()
self.queue = asyncio.Queue()
await self.queue.put((self.start_url, 0))
start_time = time.monotonic()
async def worker():
while True:
try:
url, depth = await asyncio.wait_for(
self.queue.get(), timeout=10
)
await self._crawl_url(url, depth)
self.queue.task_done()
except asyncio.TimeoutError:
break
except Exception as e:
logger.error(f"Worker error: {e}")
workers = [
asyncio.create_task(worker())
for _ in range(self.concurrency)
]
await asyncio.gather(*workers)
self.stats.total_time_seconds = round(
time.monotonic() - start_time, 1
)
if self.stats.pages_crawled > 0:
total_ms = sum(
r.load_time_ms for r in self.results.values()
if r.load_time_ms > 0
)
self.stats.avg_response_ms = round(
total_ms / self.stats.pages_crawled
)
# Count incoming links
for result in self.results.values():
for link in result.outgoing_links:
if link in self.results:
self.results[link].incoming_links += 1
return self.results
def export_sitemap(self, filepath: str = "sitemap.xml"):
urlset = ET.Element(
"urlset",
xmlns="http://www.sitemaps.org/schemas/sitemap/0.9",
)
for url, result in sorted(self.results.items()):
if result.status_code == 200 and result.is_internal:
url_elem = ET.SubElement(urlset, "url")
ET.SubElement(url_elem, "loc").text = url
# Priority based on depth
priority = max(0.1, 1.0 - (result.depth * 0.2))
ET.SubElement(url_elem, "priority").text = f"{priority:.1f}"
tree = ET.ElementTree(urlset)
ET.indent(tree, space=" ")
tree.write(filepath, xml_declaration=True, encoding="UTF-8")
print(f"Sitemap exported to {filepath} ({len(urlset)} URLs)")
def export_broken_links(self, filepath: str = "broken_links.csv"):
import csv
broken = [
r for r in self.results.values()
if r.status_code >= 400 or (r.status_code == 0 and r.error)
]
with open(filepath, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['url', 'status_code', 'error', 'found_at_depth'])
for r in broken:
writer.writerow([r.url, r.status_code, r.error, r.depth])
print(f"Broken links exported: {len(broken)} links to {filepath}")
def print_summary(self):
s = self.stats
print(f"\n{'='*60}")
print("CRAWL SUMMARY")
print(f"{'='*60}")
print(f"Pages crawled: {s.pages_crawled}")
print(f"Unique pages found: {s.pages_found}")
print(f"Broken links: {s.broken_links}")
print(f"Redirects: {s.redirects}")
print(f"External links: {s.external_links}")
print(f"Max depth: {s.max_depth_reached}")
print(f"Avg response time: {s.avg_response_ms}ms")
print(f"Total time: {s.total_time_seconds}s")
# Top pages by incoming links
internal = [
r for r in self.results.values()
if r.is_internal and r.status_code == 200
]
top = sorted(internal, key=lambda x: x.incoming_links, reverse=True)
print(f"\n--- Most Linked Pages ---")
for page in top[:10]:
print(f" {page.incoming_links:3d} links → {page.url[:60]}")
# Usage
async def main():
crawler = LinkCrawler(
start_url="https://example.com",
max_depth=3,
max_pages=500,
concurrency=10,
proxies=[
"http://user:pass@proxy1.example.com:8080",
],
)
results = await crawler.crawl()
crawler.print_summary()
crawler.export_sitemap("sitemap.xml")
crawler.export_broken_links("broken_links.csv")
asyncio.run(main())Internal Links
- Building a Proxy Rotation Library in Python — proxy rotation for crawling
- Building a Rate-Limited Scraper with Asyncio — respectful crawling
- Web Scraping ETL Pipeline with Airflow — schedule crawls
- Building a Distributed Scraping System with Redis — distributed crawling
- Web Scraping with Python Guide — scraping fundamentals
FAQ
How do I handle infinite crawl loops?
URL normalization and deduplication prevent loops. The crawler normalizes URLs by removing fragments, trailing slashes, and query parameter ordering. Each normalized URL is checked against the visited set before crawling. The max_pages limit provides a hard stop.
Should I crawl JavaScript-rendered pages?
For sites that load content via JavaScript, the HTML-only crawler will miss dynamically loaded links. Integrate Playwright for JS rendering, but be aware this reduces crawl speed significantly. Most sites have enough server-rendered links for sitemap generation.
How fast can I crawl without getting blocked?
With 10 concurrent connections and no additional delay, most sites tolerate 5-10 requests per second. Always check robots.txt for crawl-delay directives. For large sites, use proxies to distribute the load across multiple IPs.
How do I handle authentication-protected pages?
Pass session cookies or authentication headers in the httpx client. Log in once before starting the crawl, capture the session cookie, and include it in all subsequent requests. Some sites require a fresh authentication token periodically.
What is the difference between a crawler and a scraper?
A crawler discovers pages by following links — its output is a list of URLs and their metadata. A scraper extracts specific data from pages. In practice, you often combine them: crawl first to find all product pages, then scrape each page for product details.
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
Related Reading
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)