Building a Web Crawler in Python: Step-by-Step

Building a Web Crawler in Python: Step-by-Step

A web crawler systematically discovers and visits pages across a website or the entire web. Unlike a scraper that extracts data from known URLs, a crawler follows links to find new pages. Search engines like Google run the world’s largest crawlers, but you can build a focused crawler in Python for site mapping, content auditing, link checking, and large-scale data collection.

This tutorial builds a crawler from scratch, starting with a simple version and incrementally adding URL management, politeness rules, concurrency, and data storage.

Table of Contents

Crawler vs Scraper

FeatureScraperCrawler
PurposeExtract specific dataDiscover and visit pages
URL sourceKnown list of URLsFollows links from seed URLs
ScopeTargeted pagesEntire site or domain
ComplexityLowerHigher (URL frontier, dedup)

A typical project uses both: the crawler discovers pages, and the scraper extracts data from each page.

Basic Crawler

The simplest crawler visits a page, extracts links, and follows them:

import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import time

def simple_crawler(seed_url, max_pages=50):
    visited = set()
    to_visit = [seed_url]
    domain = urlparse(seed_url).netloc

    session = requests.Session()
    session.headers.update({
        "User-Agent": "Mozilla/5.0 (compatible; PythonCrawler/1.0)"
    })

    while to_visit and len(visited) < max_pages:
        url = to_visit.pop(0)

        if url in visited:
            continue

        try:
            response = session.get(url, timeout=30)
            response.raise_for_status()
        except requests.RequestException as e:
            print(f"Error: {url} — {e}")
            continue

        visited.add(url)
        print(f"[{len(visited)}/{max_pages}] {url}")

        # Parse and extract links
        soup = BeautifulSoup(response.text, "lxml")
        for link in soup.find_all("a", href=True):
            absolute_url = urljoin(url, link["href"])
            parsed = urlparse(absolute_url)

            # Stay on the same domain, skip fragments
            if parsed.netloc == domain and absolute_url not in visited:
                clean_url = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
                if clean_url not in visited:
                    to_visit.append(clean_url)

        time.sleep(1)  # Polite delay

    return visited

# Run
pages = simple_crawler("https://books.toscrape.com/", max_pages=20)
print(f"\nCrawled {len(pages)} pages")

URL Management

A production crawler needs proper URL normalization and deduplication:

from urllib.parse import urljoin, urlparse, urlunparse, parse_qs, urlencode
from collections import deque

class URLFrontier:
    """Manages URLs to crawl with deduplication and normalization."""

    def __init__(self):
        self.queue = deque()
        self.seen = set()

    def normalize(self, url):
        """Normalize URL to prevent duplicate visits."""
        parsed = urlparse(url)

        # Remove fragments
        # Lowercase scheme and host
        # Remove trailing slash from path
        # Sort query parameters
        path = parsed.path.rstrip("/") or "/"
        query = urlencode(sorted(parse_qs(parsed.query).items()))

        normalized = urlunparse((
            parsed.scheme.lower(),
            parsed.netloc.lower(),
            path,
            "",  # params
            query,
            "",  # fragment
        ))
        return normalized

    def add(self, url):
        """Add URL if not seen before. Returns True if added."""
        normalized = self.normalize(url)
        if normalized not in self.seen:
            self.seen.add(normalized)
            self.queue.append(normalized)
            return True
        return False

    def get(self):
        """Get next URL to crawl."""
        if self.queue:
            return self.queue.popleft()
        return None

    def __len__(self):
        return len(self.queue)

    @property
    def total_seen(self):
        return len(self.seen)


# Usage
frontier = URLFrontier()
frontier.add("https://example.com/page")
frontier.add("https://example.com/page/")      # Duplicate — won't be added
frontier.add("https://example.com/page#section") # Duplicate — fragment removed
frontier.add("https://Example.com/Page")         # Duplicate — case normalized
print(f"Queue size: {len(frontier)}, Total seen: {frontier.total_seen}")

Respecting robots.txt

Always check robots.txt before crawling:

from urllib.robotparser import RobotFileParser
from urllib.parse import urlparse

class RobotsChecker:
    def __init__(self):
        self.parsers = {}

    def can_fetch(self, url, user_agent="*"):
        parsed = urlparse(url)
        domain = f"{parsed.scheme}://{parsed.netloc}"

        if domain not in self.parsers:
            rp = RobotFileParser()
            rp.set_url(f"{domain}/robots.txt")
            try:
                rp.read()
            except Exception:
                # If robots.txt fails, assume allowed
                return True
            self.parsers[domain] = rp

        return self.parsers[domain].can_fetch(user_agent, url)

    def crawl_delay(self, url, user_agent="*"):
        parsed = urlparse(url)
        domain = f"{parsed.scheme}://{parsed.netloc}"
        if domain in self.parsers:
            delay = self.parsers[domain].crawl_delay(user_agent)
            return delay if delay else 1.0
        return 1.0


# Usage
robots = RobotsChecker()
url = "https://books.toscrape.com/catalogue/page-1.html"

if robots.can_fetch(url):
    delay = robots.crawl_delay(url)
    print(f"OK to crawl, delay: {delay}s")
else:
    print("Blocked by robots.txt")

Rate Limiting and Politeness

import time
import random
from collections import defaultdict

class PoliteCrawler:
    """Crawler that respects per-domain rate limits."""

    def __init__(self, default_delay=1.5, max_delay=5.0):
        self.default_delay = default_delay
        self.max_delay = max_delay
        self.last_request_time = defaultdict(float)

    def wait_for_domain(self, url):
        domain = urlparse(url).netloc
        elapsed = time.time() - self.last_request_time[domain]
        delay = random.uniform(self.default_delay, self.max_delay)

        if elapsed < delay:
            sleep_time = delay - elapsed
            time.sleep(sleep_time)

        self.last_request_time[domain] = time.time()

    def fetch(self, session, url):
        self.wait_for_domain(url)
        response = session.get(url, timeout=30)
        response.raise_for_status()
        return response

Concurrent Crawling

Using asyncio and aiohttp

import asyncio
import aiohttp
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from collections import deque

class AsyncCrawler:
    def __init__(self, seed_url, max_pages=100, concurrency=5):
        self.seed_url = seed_url
        self.max_pages = max_pages
        self.concurrency = concurrency
        self.domain = urlparse(seed_url).netloc
        self.visited = set()
        self.queue = asyncio.Queue()
        self.results = []

    def should_crawl(self, url):
        parsed = urlparse(url)
        if parsed.netloc != self.domain:
            return False
        if any(url.endswith(ext) for ext in ['.pdf', '.jpg', '.png', '.css', '.js']):
            return False
        return url not in self.visited

    async def fetch(self, session, url):
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
                if response.status == 200:
                    html = await response.text()
                    return html
        except Exception as e:
            print(f"Error fetching {url}: {e}")
        return None

    def extract_links(self, html, base_url):
        soup = BeautifulSoup(html, "lxml")
        links = set()
        for a_tag in soup.find_all("a", href=True):
            absolute = urljoin(base_url, a_tag["href"])
            parsed = urlparse(absolute)
            clean = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
            if self.should_crawl(clean):
                links.add(clean)
        return links

    async def worker(self, session):
        while True:
            url = await self.queue.get()
            try:
                if url in self.visited or len(self.visited) >= self.max_pages:
                    continue

                self.visited.add(url)
                html = await self.fetch(session, url)

                if html:
                    # Extract data
                    soup = BeautifulSoup(html, "lxml")
                    title = soup.title.string if soup.title else ""
                    self.results.append({"url": url, "title": title})

                    # Discover new links
                    links = self.extract_links(html, url)
                    for link in links:
                        if link not in self.visited:
                            await self.queue.put(link)

                    print(f"[{len(self.visited)}/{self.max_pages}] {url}")

                await asyncio.sleep(0.5)  # Rate limit

            finally:
                self.queue.task_done()

    async def crawl(self):
        await self.queue.put(self.seed_url)

        headers = {"User-Agent": "Mozilla/5.0 (compatible; AsyncCrawler/1.0)"}
        async with aiohttp.ClientSession(headers=headers) as session:
            workers = [asyncio.create_task(self.worker(session))
                       for _ in range(self.concurrency)]

            # Wait until queue is empty or max pages reached
            while len(self.visited) < self.max_pages and (
                    not self.queue.empty() or any(not w.done() for w in workers)):
                await asyncio.sleep(1)

                if self.queue.empty() and len(self.visited) > 0:
                    break

            for worker in workers:
                worker.cancel()

        return self.results


# Run
async def main():
    crawler = AsyncCrawler("https://books.toscrape.com/", max_pages=50, concurrency=3)
    results = await crawler.crawl()
    print(f"\nCrawled {len(results)} pages")

asyncio.run(main())

Using ThreadPoolExecutor

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
import threading

class ThreadedCrawler:
    def __init__(self, seed_url, max_pages=100, max_workers=5):
        self.seed_url = seed_url
        self.max_pages = max_pages
        self.max_workers = max_workers
        self.domain = urlparse(seed_url).netloc
        self.visited = set()
        self.to_visit = [seed_url]
        self.lock = threading.Lock()
        self.results = []
        self.session = requests.Session()
        self.session.headers.update({
            "User-Agent": "Mozilla/5.0 (compatible; ThreadedCrawler/1.0)"
        })

    def fetch_and_parse(self, url):
        try:
            response = self.session.get(url, timeout=30)
            response.raise_for_status()
            soup = BeautifulSoup(response.text, "lxml")

            title = soup.title.string if soup.title else ""
            links = set()

            for a_tag in soup.find_all("a", href=True):
                absolute = urljoin(url, a_tag["href"])
                parsed = urlparse(absolute)
                if parsed.netloc == self.domain:
                    clean = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
                    links.add(clean)

            return {"url": url, "title": title, "links": links}

        except Exception as e:
            return {"url": url, "error": str(e), "links": set()}

    def crawl(self):
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            while self.to_visit and len(self.visited) < self.max_pages:
                # Get batch of URLs
                batch = []
                with self.lock:
                    while self.to_visit and len(batch) < self.max_workers:
                        url = self.to_visit.pop(0)
                        if url not in self.visited:
                            self.visited.add(url)
                            batch.append(url)

                if not batch:
                    break

                # Submit batch
                futures = {executor.submit(self.fetch_and_parse, url): url
                           for url in batch}

                for future in as_completed(futures):
                    result = future.result()
                    if "error" not in result:
                        self.results.append(result)
                        print(f"[{len(self.visited)}] {result['url']}")

                        with self.lock:
                            for link in result["links"]:
                                if link not in self.visited:
                                    self.to_visit.append(link)

        return self.results

# Run
crawler = ThreadedCrawler("https://books.toscrape.com/", max_pages=50)
results = crawler.crawl()
print(f"Crawled {len(results)} pages")

Extracting and Storing Data

import json
import csv
import sqlite3

def save_results(results, format="json"):
    if format == "json":
        with open("crawl_results.json", "w") as f:
            json.dump(results, f, indent=2, default=str)

    elif format == "csv":
        with open("crawl_results.csv", "w", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=["url", "title"])
            writer.writeheader()
            writer.writerows(results)

    elif format == "sqlite":
        conn = sqlite3.connect("crawl_results.db")
        cursor = conn.cursor()
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS pages (
                url TEXT PRIMARY KEY,
                title TEXT,
                crawled_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        for r in results:
            cursor.execute(
                "INSERT OR REPLACE INTO pages (url, title) VALUES (?, ?)",
                (r["url"], r.get("title", ""))
            )
        conn.commit()
        conn.close()

    print(f"Saved {len(results)} results as {format}")

Handling Edge Cases

from urllib.parse import urlparse

def is_valid_url(url):
    """Filter out non-HTTP URLs and unwanted file types."""
    try:
        parsed = urlparse(url)

        # Must be HTTP/HTTPS
        if parsed.scheme not in ("http", "https"):
            return False

        # Skip common non-page extensions
        skip_extensions = {
            ".pdf", ".jpg", ".jpeg", ".png", ".gif", ".svg",
            ".css", ".js", ".ico", ".xml", ".json",
            ".zip", ".tar", ".gz", ".mp4", ".mp3",
        }
        path_lower = parsed.path.lower()
        if any(path_lower.endswith(ext) for ext in skip_extensions):
            return False

        # Skip mailto, tel, javascript
        if url.startswith(("mailto:", "tel:", "javascript:")):
            return False

        return True
    except Exception:
        return False


def handle_redirects(session, url, max_redirects=5):
    """Follow redirects manually to track the chain."""
    chain = [url]
    for _ in range(max_redirects):
        response = session.get(url, allow_redirects=False, timeout=30)
        if response.status_code in (301, 302, 303, 307, 308):
            url = response.headers.get("Location", "")
            if url:
                chain.append(url)
            else:
                break
        else:
            break
    return chain, response

Production-Ready Crawler

import asyncio
import aiohttp
import logging
import json
import time
from bs4 import BeautifulSoup
from urllib.parse import urljoin, urlparse
from urllib.robotparser import RobotFileParser
from dataclasses import dataclass, asdict
from typing import Optional

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(message)s")
logger = logging.getLogger(__name__)

@dataclass
class CrawlResult:
    url: str
    status_code: int
    title: Optional[str]
    word_count: int
    links_found: int
    crawl_time: float

class ProductionCrawler:
    def __init__(self, seed_urls, max_pages=1000, concurrency=10,
                 delay=1.0, output_file="crawl_data.json"):
        self.seed_urls = seed_urls
        self.max_pages = max_pages
        self.concurrency = concurrency
        self.delay = delay
        self.output_file = output_file
        self.allowed_domains = {urlparse(u).netloc for u in seed_urls}
        self.visited = set()
        self.queue = asyncio.Queue()
        self.results = []
        self.robots = {}
        self.errors = 0

    async def check_robots(self, session, url):
        domain = urlparse(url).netloc
        if domain not in self.robots:
            robots_url = f"https://{domain}/robots.txt"
            try:
                async with session.get(robots_url) as resp:
                    if resp.status == 200:
                        text = await resp.text()
                        rp = RobotFileParser()
                        rp.parse(text.splitlines())
                        self.robots[domain] = rp
                        return rp.can_fetch("*", url)
            except Exception:
                pass
            self.robots[domain] = None
        rp = self.robots.get(domain)
        return rp.can_fetch("*", url) if rp else True

    async def worker(self, session, worker_id):
        while len(self.visited) < self.max_pages:
            try:
                url = await asyncio.wait_for(self.queue.get(), timeout=10)
            except asyncio.TimeoutError:
                break

            if url in self.visited:
                self.queue.task_done()
                continue

            if not await self.check_robots(session, url):
                self.queue.task_done()
                continue

            self.visited.add(url)
            start = time.time()

            try:
                async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as resp:
                    if resp.status != 200:
                        self.queue.task_done()
                        continue

                    html = await resp.text()
                    elapsed = time.time() - start

                    soup = BeautifulSoup(html, "lxml")
                    title = soup.title.string.strip() if soup.title and soup.title.string else None
                    text = soup.get_text()
                    word_count = len(text.split())

                    # Extract links
                    links = set()
                    for a in soup.find_all("a", href=True):
                        abs_url = urljoin(url, a["href"])
                        parsed = urlparse(abs_url)
                        if parsed.netloc in self.allowed_domains:
                            clean = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
                            if clean not in self.visited:
                                links.add(clean)
                                await self.queue.put(clean)

                    result = CrawlResult(
                        url=url,
                        status_code=resp.status,
                        title=title,
                        word_count=word_count,
                        links_found=len(links),
                        crawl_time=round(elapsed, 3),
                    )
                    self.results.append(result)

                    logger.info(f"[{len(self.visited)}/{self.max_pages}] {url} "
                                f"({word_count} words, {len(links)} links, {elapsed:.2f}s)")

            except Exception as e:
                self.errors += 1
                logger.warning(f"Error on {url}: {e}")

            finally:
                self.queue.task_done()
                await asyncio.sleep(self.delay)

    async def run(self):
        for url in self.seed_urls:
            await self.queue.put(url)

        headers = {"User-Agent": "Mozilla/5.0 (compatible; ProductionCrawler/1.0)"}
        connector = aiohttp.TCPConnector(limit=self.concurrency)

        async with aiohttp.ClientSession(headers=headers, connector=connector) as session:
            workers = [asyncio.create_task(self.worker(session, i))
                       for i in range(self.concurrency)]
            await asyncio.gather(*workers, return_exceptions=True)

        # Save results
        with open(self.output_file, "w") as f:
            json.dump([asdict(r) for r in self.results], f, indent=2)

        logger.info(f"Crawl complete: {len(self.results)} pages, {self.errors} errors")
        return self.results


# Run
async def main():
    crawler = ProductionCrawler(
        seed_urls=["https://books.toscrape.com/"],
        max_pages=200,
        concurrency=5,
        delay=1.0,
    )
    results = await crawler.run()

asyncio.run(main())

When to Use Scrapy Instead

For serious crawling projects, Scrapy is usually a better choice than building from scratch:

  • Built-in URL deduplication — Automatic request fingerprinting
  • Middleware system — Pluggable proxy rotation, user-agent rotation, retry logic
  • Data pipelines — Process and store items through configurable pipelines
  • Crawl management — Pause/resume, depth limits, URL filtering
  • JavaScript support — Via scrapy-playwright integration
  • Monitoring — Stats collection, logging, Scrapyd deployment

Build your own crawler when you need complete control or when learning how crawlers work.

FAQ

What is the difference between a web crawler and a web scraper?

A crawler discovers and visits web pages by following links from seed URLs — it maps the structure of a site. A scraper extracts specific data from known pages. Most real-world projects combine both: the crawler finds pages, and the scraper extracts data from them.

How many pages per second can a Python crawler process?

With async programming (aiohttp), a Python crawler can process 50-200 pages per second on static sites, limited mainly by network latency and politeness delays. Browser-based crawlers (Selenium, Playwright) process 1-10 pages per second.

How do I avoid getting blocked while crawling?

Respect robots.txt, add polite delays (1-3 seconds), rotate User-Agent headers, use rotating proxies, and limit your crawl rate per domain. See our proxy glossary for proxy type details.

Should I build my own crawler or use Scrapy?

For learning, build your own. For production, use Scrapy. Scrapy handles URL management, deduplication, retries, rate limiting, and data pipelines — all things you’d need to build yourself otherwise.


Learn more about scraping frameworks: Scrapy tutorial, Python web scraping libraries, web scraping proxies.

External Resources:


Related Reading

last updated: April 3, 2026

Scroll to Top
message me on telegram

Resources

Proxy Signals Podcast
Operator-level insights on mobile proxies and access infrastructure.

Multi-Account Proxies: Setup, Types, Tools & Mistakes (2026)