Build a News Crawler in Python: Step-by-Step Tutorial

Build a News Crawler in Python: Step-by-Step Tutorial

a news crawler collects articles from multiple sources, extracts their content, and stores everything in a searchable database. it is one of the most practical scraping projects you can build because the output is immediately useful for research, monitoring, and content analysis.

this tutorial walks you through building a production-quality news crawler from scratch. by the end, you will have a system that monitors RSS feeds, follows links to full articles, extracts clean text, and stores everything in a SQLite database with full-text search.

What You Will Build

the finished crawler has these capabilities:

  • monitors hundreds of RSS feeds across multiple news categories
  • follows links to full article pages and extracts clean text
  • deduplicates articles to avoid storing the same story twice
  • stores everything in a SQLite database with full-text search
  • runs on a schedule with proxy rotation for reliability
  • exports data to JSON or CSV for analysis

Architecture

┌──────────────────┐     ┌──────────────────┐
│   RSS Monitor     │────>│   Article Queue   │
│  (feedparser)     │     │  (dedup + priority)│
└──────────────────┘     └────────┬─────────┘
                                  │
                         ┌────────▼─────────┐
                         │   Content Fetcher  │
                         │  (requests+proxies)│
                         └────────┬─────────┘
                                  │
                         ┌────────▼─────────┐
                         │   Text Extractor   │
                         │  (readability)     │
                         └────────┬─────────┘
                                  │
                         ┌────────▼─────────┐
                         │   SQLite Storage   │
                         │  (FTS5 search)     │
                         └──────────────────┘

Prerequisites

install the required packages:

pip install feedparser requests beautifulsoup4 readability-lxml \
    lxml fake-useragent schedule

readability-lxml is the key library here. it is a Python port of Mozilla’s Readability algorithm, which strips away navigation, ads, and sidebars to extract just the article content.

Step 1: Define Your Feed Sources

start by organizing your RSS feed sources. group them by category so you can prioritize and filter later:

# feeds.py

FEEDS = {
    "tech": [
        {"name": "TechCrunch", "url": "https://techcrunch.com/feed/"},
        {"name": "Ars Technica", "url": "https://feeds.arstechnica.com/arstechnica/index"},
        {"name": "The Verge", "url": "https://www.theverge.com/rss/index.xml"},
        {"name": "Wired", "url": "https://www.wired.com/feed/rss"},
        {"name": "Hacker News", "url": "https://hnrss.org/frontpage"},
    ],
    "security": [
        {"name": "Krebs on Security", "url": "https://krebsonsecurity.com/feed/"},
        {"name": "The Hacker News", "url": "https://feeds.feedburner.com/TheHackersNews"},
        {"name": "Dark Reading", "url": "https://www.darkreading.com/rss.xml"},
        {"name": "BleepingComputer", "url": "https://www.bleepingcomputer.com/feed/"},
    ],
    "data": [
        {"name": "KDnuggets", "url": "https://www.kdnuggets.com/feed"},
        {"name": "Towards Data Science", "url": "https://towardsdatascience.com/feed"},
        {"name": "Data Science Central", "url": "https://www.datasciencecentral.com/feed"},
    ],
    "business": [
        {"name": "Reuters", "url": "https://feeds.reuters.com/reuters/businessNews"},
        {"name": "Bloomberg", "url": "https://feeds.bloomberg.com/technology/news.rss"},
        {"name": "CNBC", "url": "https://search.cnbc.com/rs/search/combinedcms/view.xml?partnerId=wrss01&id=10001147"},
    ],
}


def get_all_feeds():
    """flatten all feeds into a single list with categories."""
    all_feeds = []
    for category, feeds in FEEDS.items():
        for feed in feeds:
            all_feeds.append({
                **feed,
                "category": category,
            })
    return all_feeds

Step 2: Build the RSS Monitor

the RSS monitor checks feeds for new articles and puts them into a queue:

# rss_monitor.py

import feedparser
import hashlib
from datetime import datetime, timezone
from email.utils import parsedate_to_datetime


class RSSMonitor:
    """monitor RSS feeds for new articles."""

    def __init__(self, seen_urls=None):
        self.seen_urls = seen_urls or set()

    def check_feed(self, feed_info):
        """check a single feed and return new articles."""
        try:
            feed = feedparser.parse(feed_info["url"])

            if feed.bozo and not feed.entries:
                print(f"error parsing {feed_info['name']}: {feed.bozo_exception}")
                return []

            new_articles = []

            for entry in feed.entries:
                url = entry.get("link", "")
                if not url or url in self.seen_urls:
                    continue

                self.seen_urls.add(url)

                article = {
                    "url": url,
                    "title": entry.get("title", "").strip(),
                    "summary": self._clean_summary(
                        entry.get("summary", "")
                    ),
                    "published": self._parse_date(entry),
                    "source_name": feed_info["name"],
                    "source_url": feed_info["url"],
                    "category": feed_info.get("category", "general"),
                    "url_hash": hashlib.md5(url.encode()).hexdigest(),
                }

                new_articles.append(article)

            return new_articles

        except Exception as e:
            print(f"failed to check {feed_info['name']}: {e}")
            return []

    def check_all_feeds(self, feeds):
        """check all feeds and return new articles."""
        all_new = []

        for feed_info in feeds:
            new = self.check_feed(feed_info)
            if new:
                print(f"  {feed_info['name']}: {len(new)} new articles")
                all_new.extend(new)

        return all_new

    def _parse_date(self, entry):
        """extract and normalize the published date."""
        for field in ["published", "updated", "created"]:
            raw = entry.get(field)
            if raw:
                try:
                    dt = parsedate_to_datetime(raw)
                    return dt.isoformat()
                except (TypeError, ValueError):
                    pass

        return datetime.now(timezone.utc).isoformat()

    def _clean_summary(self, summary):
        """strip HTML from feed summary."""
        from bs4 import BeautifulSoup
        if not summary:
            return ""
        soup = BeautifulSoup(summary, "html.parser")
        return soup.get_text(strip=True)[:500]

Step 3: Build the Content Fetcher

the content fetcher retrieves the full article page. this is where proxy rotation matters most, since you are making HTTP requests to many different news sites:

# fetcher.py

import requests
from fake_useragent import UserAgent
import time
import random


class ContentFetcher:
    """fetch full article pages with proxy support."""

    def __init__(self, proxy_url=None, proxy_list=None):
        self.session = requests.Session()
        self.ua = UserAgent()
        self.proxy_url = proxy_url
        self.proxy_list = proxy_list or []
        self.proxy_index = 0

        if proxy_url:
            self.session.proxies = {
                "http": proxy_url,
                "https": proxy_url,
            }

    def fetch(self, url, max_retries=3):
        """fetch a URL with retries and proxy rotation."""

        for attempt in range(max_retries):
            try:
                headers = {
                    "User-Agent": self.ua.random,
                    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
                    "Accept-Language": "en-US,en;q=0.9",
                    "Accept-Encoding": "gzip, deflate, br",
                    "DNT": "1",
                    "Connection": "keep-alive",
                }

                # rotate proxy if using a list
                proxies = self._get_proxy()

                response = self.session.get(
                    url,
                    headers=headers,
                    proxies=proxies,
                    timeout=20,
                    allow_redirects=True,
                )

                if response.status_code == 200:
                    return response.text

                if response.status_code == 429:
                    # rate limited, wait longer
                    wait = 10 * (attempt + 1)
                    print(f"rate limited on {url}, waiting {wait}s...")
                    time.sleep(wait)
                    continue

                if response.status_code in (403, 451):
                    # blocked, try different proxy
                    print(f"blocked ({response.status_code}) on {url}")
                    self._rotate_proxy()
                    continue

                print(f"got {response.status_code} for {url}")
                return None

            except requests.exceptions.Timeout:
                print(f"timeout on {url} (attempt {attempt + 1})")
                time.sleep(2)
            except requests.exceptions.ConnectionError:
                print(f"connection error on {url} (attempt {attempt + 1})")
                self._rotate_proxy()
                time.sleep(3)
            except Exception as e:
                print(f"error fetching {url}: {e}")
                return None

        return None

    def _get_proxy(self):
        """get current proxy configuration."""
        if self.proxy_list:
            proxy = self.proxy_list[self.proxy_index % len(self.proxy_list)]
            return {"http": proxy, "https": proxy}
        return {}

    def _rotate_proxy(self):
        """switch to the next proxy."""
        if self.proxy_list:
            self.proxy_index = (self.proxy_index + 1) % len(self.proxy_list)
            print(f"rotated to proxy {self.proxy_index}")


def create_fetcher_with_proxies(proxy_config):
    """create a fetcher with proxy configuration."""

    if proxy_config.get("type") == "rotating_gateway":
        return ContentFetcher(
            proxy_url=proxy_config["gateway_url"]
        )

    elif proxy_config.get("type") == "proxy_list":
        return ContentFetcher(
            proxy_list=proxy_config["proxies"]
        )

    else:
        return ContentFetcher()

Step 4: Build the Text Extractor

the text extractor takes raw HTML and produces clean article text. Mozilla’s Readability algorithm does the heavy lifting:

# extractor.py

from readability import Document
from bs4 import BeautifulSoup
import re


class TextExtractor:
    """extract clean article text from HTML."""

    def extract(self, html, url=""):
        """extract article content from HTML."""
        try:
            doc = Document(html, url=url)

            # get the clean article HTML
            article_html = doc.summary()
            title = doc.title()

            # convert to plain text
            soup = BeautifulSoup(article_html, "html.parser")

            # remove remaining cruft
            for tag in soup.find_all(["script", "style", "iframe", "form"]):
                tag.decompose()

            text = soup.get_text(separator="\n", strip=True)
            text = self._clean_text(text)

            # extract metadata
            full_soup = BeautifulSoup(html, "html.parser")
            metadata = self._extract_metadata(full_soup)

            return {
                "title": title,
                "text": text,
                "word_count": len(text.split()),
                "author": metadata.get("author", ""),
                "description": metadata.get("description", ""),
                "og_image": metadata.get("og_image", ""),
                "language": metadata.get("language", "en"),
            }

        except Exception as e:
            print(f"extraction error: {e}")
            return None

    def _clean_text(self, text):
        """clean up extracted text."""
        # normalize whitespace
        text = re.sub(r"\n{3,}", "\n\n", text)
        text = re.sub(r" {2,}", " ", text)

        # remove common cruft
        cruft_patterns = [
            r"Advertisement\s*",
            r"Subscribe to our newsletter.*",
            r"Share this article.*",
            r"Related articles?:?\s*",
            r"Read more:?\s*$",
            r"Sign up for.*newsletter",
        ]

        for pattern in cruft_patterns:
            text = re.sub(pattern, "", text, flags=re.IGNORECASE)

        return text.strip()

    def _extract_metadata(self, soup):
        """extract metadata from the page head."""
        metadata = {}

        # author
        author_meta = soup.find("meta", attrs={"name": "author"})
        if author_meta:
            metadata["author"] = author_meta.get("content", "")

        # description
        desc_meta = soup.find("meta", attrs={"name": "description"})
        if desc_meta:
            metadata["description"] = desc_meta.get("content", "")

        # og:image
        og_image = soup.find("meta", attrs={"property": "og:image"})
        if og_image:
            metadata["og_image"] = og_image.get("content", "")

        # language
        html_tag = soup.find("html")
        if html_tag:
            metadata["language"] = html_tag.get("lang", "en")[:2]

        return metadata

Step 5: Build the Database Layer

SQLite with FTS5 (full-text search) gives you a powerful, zero-dependency storage layer:

# storage.py

import sqlite3
import json
from datetime import datetime


class NewsDatabase:
    """SQLite database with full-text search for news articles."""

    def __init__(self, db_path="news.db"):
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path)
        self.conn.row_factory = sqlite3.Row
        self._create_tables()

    def _create_tables(self):
        """create tables and full-text search index."""
        self.conn.executescript("""
            CREATE TABLE IF NOT EXISTS articles (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                url TEXT UNIQUE NOT NULL,
                url_hash TEXT NOT NULL,
                title TEXT,
                text TEXT,
                summary TEXT,
                author TEXT,
                source_name TEXT,
                category TEXT,
                published TEXT,
                scraped_at TEXT,
                word_count INTEGER,
                og_image TEXT,
                language TEXT DEFAULT 'en',
                metadata TEXT
            );

            CREATE INDEX IF NOT EXISTS idx_articles_source
                ON articles(source_name);
            CREATE INDEX IF NOT EXISTS idx_articles_category
                ON articles(category);
            CREATE INDEX IF NOT EXISTS idx_articles_published
                ON articles(published);
            CREATE INDEX IF NOT EXISTS idx_articles_url_hash
                ON articles(url_hash);

            CREATE VIRTUAL TABLE IF NOT EXISTS articles_fts
                USING fts5(
                    title, text, summary, source_name, category,
                    content=articles,
                    content_rowid=id
                );

            CREATE TRIGGER IF NOT EXISTS articles_ai
                AFTER INSERT ON articles
            BEGIN
                INSERT INTO articles_fts(
                    rowid, title, text, summary, source_name, category
                )
                VALUES (
                    new.id, new.title, new.text, new.summary,
                    new.source_name, new.category
                );
            END;
        """)
        self.conn.commit()

    def insert_article(self, article):
        """insert an article, skipping duplicates."""
        try:
            self.conn.execute(
                """INSERT OR IGNORE INTO articles
                   (url, url_hash, title, text, summary, author,
                    source_name, category, published, scraped_at,
                    word_count, og_image, language, metadata)
                   VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
                (
                    article["url"],
                    article.get("url_hash", ""),
                    article.get("title", ""),
                    article.get("text", ""),
                    article.get("summary", ""),
                    article.get("author", ""),
                    article.get("source_name", ""),
                    article.get("category", ""),
                    article.get("published", ""),
                    datetime.utcnow().isoformat(),
                    article.get("word_count", 0),
                    article.get("og_image", ""),
                    article.get("language", "en"),
                    json.dumps(article.get("metadata", {})),
                ),
            )
            self.conn.commit()
            return True
        except sqlite3.IntegrityError:
            return False  # duplicate

    def search(self, query, limit=20):
        """full-text search across articles."""
        cursor = self.conn.execute(
            """SELECT a.*, rank
               FROM articles_fts fts
               JOIN articles a ON a.id = fts.rowid
               WHERE articles_fts MATCH ?
               ORDER BY rank
               LIMIT ?""",
            (query, limit),
        )
        return [dict(row) for row in cursor.fetchall()]

    def get_stats(self):
        """get database statistics."""
        stats = {}
        cursor = self.conn.execute("SELECT COUNT(*) FROM articles")
        stats["total_articles"] = cursor.fetchone()[0]

        cursor = self.conn.execute(
            "SELECT category, COUNT(*) FROM articles GROUP BY category"
        )
        stats["by_category"] = dict(cursor.fetchall())

        cursor = self.conn.execute(
            "SELECT source_name, COUNT(*) FROM articles "
            "GROUP BY source_name ORDER BY COUNT(*) DESC LIMIT 10"
        )
        stats["top_sources"] = dict(cursor.fetchall())

        return stats

    def export_json(self, output_path, category=None, limit=None):
        """export articles to JSON."""
        query = "SELECT * FROM articles"
        params = []

        if category:
            query += " WHERE category = ?"
            params.append(category)

        query += " ORDER BY published DESC"

        if limit:
            query += " LIMIT ?"
            params.append(limit)

        cursor = self.conn.execute(query, params)
        articles = [dict(row) for row in cursor.fetchall()]

        with open(output_path, "w") as f:
            json.dump(articles, f, indent=2, default=str)

        print(f"exported {len(articles)} articles to {output_path}")

    def close(self):
        self.conn.close()

Step 6: Wire Everything Together

the main crawler script coordinates all the components:

# crawler.py

import time
from datetime import datetime
from feeds import get_all_feeds
from rss_monitor import RSSMonitor
from fetcher import ContentFetcher
from extractor import TextExtractor
from storage import NewsDatabase


class NewsCrawler:
    """main news crawler that ties everything together."""

    def __init__(self, db_path="news.db", proxy_url=None):
        self.db = NewsDatabase(db_path)
        self.monitor = RSSMonitor()
        self.fetcher = ContentFetcher(proxy_url=proxy_url)
        self.extractor = TextExtractor()
        self.feeds = get_all_feeds()

        # load seen URLs from database
        self._load_seen_urls()

    def _load_seen_urls(self):
        """load previously seen URLs to avoid re-processing."""
        cursor = self.db.conn.execute("SELECT url FROM articles")
        self.monitor.seen_urls = {row[0] for row in cursor.fetchall()}
        print(f"loaded {len(self.monitor.seen_urls)} seen URLs")

    def run(self, fetch_full=True):
        """run one crawl cycle."""
        print(f"\n[{datetime.now():%Y-%m-%d %H:%M}] starting crawl cycle...")

        # check all feeds for new articles
        new_articles = self.monitor.check_all_feeds(self.feeds)
        print(f"found {len(new_articles)} new articles from RSS feeds")

        if not new_articles:
            return 0

        saved = 0

        for i, article in enumerate(new_articles):
            print(f"\n[{i+1}/{len(new_articles)}] {article['title'][:80]}")

            if fetch_full:
                # fetch the full article page
                html = self.fetcher.fetch(article["url"])

                if html:
                    # extract clean content
                    extracted = self.extractor.extract(
                        html, url=article["url"]
                    )

                    if extracted and extracted["word_count"] > 50:
                        article.update({
                            "text": extracted["text"],
                            "word_count": extracted["word_count"],
                            "author": extracted["author"],
                            "og_image": extracted["og_image"],
                            "language": extracted["language"],
                        })

                        # use extracted title if better
                        if len(extracted["title"]) > len(article.get("title", "")):
                            article["title"] = extracted["title"]
                    else:
                        print(f"  extraction produced insufficient content")
                else:
                    print(f"  could not fetch full article")

                # rate limiting
                time.sleep(1.5)

            # save to database
            if self.db.insert_article(article):
                saved += 1
                print(f"  saved ({article.get('word_count', 0)} words)")
            else:
                print(f"  skipped (duplicate)")

        print(f"\ncrawl complete: {saved}/{len(new_articles)} new articles saved")
        return saved

    def run_scheduled(self, interval_minutes=30):
        """run the crawler on a schedule."""
        import schedule

        schedule.every(interval_minutes).minutes.do(self.run)

        print(f"crawler scheduled every {interval_minutes} minutes")
        print(f"monitoring {len(self.feeds)} feeds")
        print(f"press ctrl+c to stop\n")

        # run once immediately
        self.run()

        while True:
            schedule.run_pending()
            time.sleep(1)


def main():
    # configure proxy
    proxy_url = "http://user:pass@residential.provider.com:8080"

    crawler = NewsCrawler(
        db_path="news.db",
        proxy_url=proxy_url,
    )

    # run once
    crawler.run()

    # or run on schedule
    # crawler.run_scheduled(interval_minutes=30)

    # print stats
    stats = crawler.db.get_stats()
    print(f"\ndatabase stats:")
    print(f"  total articles: {stats['total_articles']}")
    print(f"  by category: {stats['by_category']}")
    print(f"  top sources: {stats['top_sources']}")

    # search example
    results = crawler.db.search("web scraping proxy", limit=5)
    print(f"\nsearch results for 'web scraping proxy':")
    for r in results:
        print(f"  {r['title']} ({r['source_name']})")


if __name__ == "__main__":
    main()

Step 7: Add Content Deduplication

news stories often appear on multiple sites. detecting duplicates beyond exact URL matching requires comparing content similarity:

# dedup.py

import hashlib
import re
from collections import defaultdict


class ContentDeduplicator:
    """detect duplicate and near-duplicate articles."""

    def __init__(self, threshold=0.8):
        self.threshold = threshold
        self.content_hashes = set()
        self.shingle_index = defaultdict(set)

    def is_duplicate(self, text, url_hash):
        """check if content is a duplicate."""
        if not text or len(text) < 100:
            return False

        # exact duplicate check via hash
        content_hash = hashlib.md5(
            self._normalize(text).encode()
        ).hexdigest()

        if content_hash in self.content_hashes:
            return True

        self.content_hashes.add(content_hash)

        # near-duplicate check via shingles
        shingles = self._get_shingles(text, k=5)

        for existing_hash, existing_shingles in self.shingle_index.items():
            similarity = self._jaccard(shingles, existing_shingles)
            if similarity > self.threshold:
                return True

        self.shingle_index[url_hash] = shingles
        return False

    def _normalize(self, text):
        """normalize text for comparison."""
        text = text.lower()
        text = re.sub(r"[^a-z0-9\s]", "", text)
        text = re.sub(r"\s+", " ", text)
        return text.strip()

    def _get_shingles(self, text, k=5):
        """create k-shingles from text."""
        words = self._normalize(text).split()
        return set(
            " ".join(words[i:i+k])
            for i in range(len(words) - k + 1)
        )

    def _jaccard(self, set_a, set_b):
        """compute Jaccard similarity."""
        if not set_a or not set_b:
            return 0.0
        intersection = len(set_a & set_b)
        union = len(set_a | set_b)
        return intersection / union if union else 0.0

Performance Tips

Feed Checking

  • check feeds in parallel using concurrent.futures.ThreadPoolExecutor
  • cache feed ETags and Last-Modified headers to skip unchanged feeds
  • prioritize high-volume feeds for more frequent checking

Fetching

  • use connection pooling via requests.Session
  • set reasonable timeouts (15-20 seconds)
  • skip URLs that match known paywall or login patterns
  • respect robots.txt directives

Storage

  • run PRAGMA journal_mode=WAL for better concurrent write performance
  • vacuum the database periodically to reclaim space
  • partition old articles into separate databases if the main one grows too large

Proxy Usage

for a news crawler hitting dozens of different sites, a rotating residential proxy gateway works best. each request goes through a different IP, which prevents any single site from rate-limiting you:

# optimal proxy setup for news crawling
proxy_config = {
    "type": "rotating_gateway",
    "gateway_url": "http://user:pass@gate.provider.com:7777",
    "country": "US",
}

datacenter proxies work fine for most news sites since they do not have aggressive anti-bot measures. save residential proxies for sites behind Cloudflare or similar protections.

Conclusion

this news crawler gives you a complete pipeline from RSS monitoring to full-text searchable storage. the modular design makes it easy to add new feed sources, swap in different extractors, or change the storage backend.

the code in this tutorial is roughly 500 lines across six files. it can handle hundreds of feeds and collect thousands of articles per day while staying well within polite crawling limits. with proxy rotation enabled, it runs reliably even against sites that rate-limit aggressive crawlers.

from here you can extend it with email alerts for keyword matches, a web dashboard for browsing articles, integration with Slack or Telegram for notifications, or an AI layer that summarizes and categorizes articles automatically.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top