Build a RAG Chatbot with Web Scraping: the Complete Python Tutorial

Build a RAG Chatbot with Web Scraping: the Complete Python Tutorial

Retrieval Augmented Generation (RAG) has become the standard approach for building chatbots that answer questions based on specific data rather than general knowledge. but before you can build a RAG system, you need data. and for many use cases, that data lives on websites.

this tutorial walks you through the entire pipeline: scraping websites to build a knowledge base, processing the content into embeddings, storing them in a vector database, and building a chatbot that retrieves relevant context before generating answers. by the end, you will have a working RAG chatbot powered by scraped web data.

What We Are Building

the complete system has four components:

┌──────────────────┐     ┌──────────────────┐     ┌──────────────────┐     ┌──────────────────┐
│   Web Scraper    │ --> │  Text Processor  │ --> │  Vector Store    │ --> │   RAG Chatbot    │
│  (collect data)  │     │ (chunk & embed)  │     │  (store & search)│     │ (retrieve & gen) │
└──────────────────┘     └──────────────────┘     └──────────────────┘     └──────────────────┘
  • Web Scraper: crawls target websites and extracts clean text content
  • Text Processor: chunks the text and creates embeddings
  • Vector Store: stores embeddings for efficient similarity search
  • RAG Chatbot: takes user questions, retrieves relevant chunks, and generates answers

Prerequisites and Setup

# create project
mkdir rag-scraper-chatbot && cd rag-scraper-chatbot
python -m venv venv
source venv/bin/activate

# install dependencies
pip install httpx selectolax markdownify
pip install openai chromadb tiktoken
pip install langchain langchain-openai langchain-chroma
pip install rich  # for nice terminal output

create a .env file:

OPENAI_API_KEY=your_openai_key
PROXY_URL=http://user:pass@gate.proxyservice.com:7777

Step 1: Build the Web Scraper

the scraper needs to crawl a website, extract clean text content, and handle pagination or site navigation.

# scraper.py
import httpx
from selectolax.parser import HTMLParser
from markdownify import markdownify
from urllib.parse import urljoin, urlparse
from dataclasses import dataclass, field
from typing import Optional
import time
import re
import json

@dataclass
class ScrapedPage:
    url: str
    title: str
    content: str  # clean markdown text
    word_count: int
    scraped_at: str

@dataclass
class SiteScraper:
    base_url: str
    proxy_url: Optional[str] = None
    max_pages: int = 50
    delay_seconds: float = 2.0
    visited: set = field(default_factory=set)
    pages: list = field(default_factory=list)

    def _get_client(self) -> httpx.Client:
        """create an HTTP client with proxy support"""
        kwargs = {
            "timeout": 30.0,
            "follow_redirects": True,
            "headers": {
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                              "AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
                "Accept": "text/html,application/xhtml+xml",
                "Accept-Language": "en-US,en;q=0.9",
            },
        }
        if self.proxy_url:
            kwargs["proxy"] = self.proxy_url
        return httpx.Client(**kwargs)

    def crawl(self) -> list[ScrapedPage]:
        """crawl the website starting from base_url"""
        from datetime import datetime

        urls_to_visit = [self.base_url]
        client = self._get_client()

        while urls_to_visit and len(self.visited) < self.max_pages:
            url = urls_to_visit.pop(0)

            if url in self.visited:
                continue

            print(f"scraping [{len(self.visited) + 1}/{self.max_pages}]: {url}")

            try:
                response = client.get(url)
                if response.status_code != 200:
                    print(f"  skipping: HTTP {response.status_code}")
                    continue

                self.visited.add(url)

                # parse the page
                tree = HTMLParser(response.text)

                # extract title
                title_el = tree.css_first("title")
                title = title_el.text().strip() if title_el else url

                # extract and clean content
                content = self._extract_content(tree)

                if not content or len(content.split()) < 50:
                    print(f"  skipping: too little content")
                    continue

                page = ScrapedPage(
                    url=url,
                    title=title,
                    content=content,
                    word_count=len(content.split()),
                    scraped_at=datetime.utcnow().isoformat(),
                )
                self.pages.append(page)
                print(f"  extracted {page.word_count} words")

                # find links to other pages on the same domain
                new_urls = self._extract_links(tree, url)
                for new_url in new_urls:
                    if new_url not in self.visited and new_url not in urls_to_visit:
                        urls_to_visit.append(new_url)

                time.sleep(self.delay_seconds)

            except Exception as e:
                print(f"  error: {e}")
                continue

        client.close()
        print(f"\ncrawl complete: {len(self.pages)} pages scraped")
        return self.pages

    def _extract_content(self, tree: HTMLParser) -> str:
        """extract clean text content from HTML"""
        # remove unwanted elements
        for selector in ["script", "style", "nav", "header", "footer",
                        ".sidebar", ".menu", ".advertisement", ".cookie-notice",
                        "[role='navigation']", "[role='banner']"]:
            for el in tree.css(selector):
                el.decompose()

        # try to find main content area
        main_content = (
            tree.css_first("main") or
            tree.css_first("article") or
            tree.css_first("[role='main']") or
            tree.css_first(".content") or
            tree.css_first("#content") or
            tree.body
        )

        if not main_content:
            return ""

        # convert to markdown for clean text
        html = main_content.html
        markdown = markdownify(html, heading_style="ATX", strip=["img", "video"])

        # clean up the markdown
        markdown = re.sub(r'\n{3,}', '\n\n', markdown)  # collapse multiple newlines
        markdown = re.sub(r'[ \t]+', ' ', markdown)  # collapse whitespace
        markdown = markdown.strip()

        return markdown

    def _extract_links(self, tree: HTMLParser, current_url: str) -> list[str]:
        """extract internal links from the page"""
        base_domain = urlparse(self.base_url).netloc
        links = []

        for link in tree.css("a[href]"):
            href = link.attributes.get("href", "")
            if not href or href.startswith("#") or href.startswith("mailto:"):
                continue

            # resolve relative URLs
            absolute_url = urljoin(current_url, href)

            # only follow links on the same domain
            if urlparse(absolute_url).netloc == base_domain:
                # remove fragments and query strings for deduplication
                clean_url = absolute_url.split("#")[0].split("?")[0]
                if clean_url not in links:
                    links.append(clean_url)

        return links

    def save_to_json(self, filename: str):
        """save scraped pages to JSON"""
        from dataclasses import asdict
        with open(filename, "w", encoding="utf-8") as f:
            json.dump([asdict(p) for p in self.pages], f, indent=2, ensure_ascii=False)
        print(f"saved {len(self.pages)} pages to {filename}")


# usage
if __name__ == "__main__":
    import os
    from dotenv import load_dotenv
    load_dotenv()

    scraper = SiteScraper(
        base_url="https://docs.example.com",
        proxy_url=os.getenv("PROXY_URL"),
        max_pages=30,
        delay_seconds=2.0,
    )

    pages = scraper.crawl()
    scraper.save_to_json("scraped_data.json")

Step 2: Process Text into Chunks

RAG systems work best when documents are split into smaller, focused chunks. the chunking strategy significantly affects retrieval quality.

# processor.py
from dataclasses import dataclass
import tiktoken
import re

@dataclass
class TextChunk:
    text: str
    source_url: str
    source_title: str
    chunk_index: int
    token_count: int
    metadata: dict

class TextProcessor:
    def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50,
                 model: str = "text-embedding-3-small"):
        self.chunk_size = chunk_size  # in tokens
        self.chunk_overlap = chunk_overlap
        self.encoder = tiktoken.encoding_for_model(model)

    def process_pages(self, pages: list[dict]) -> list[TextChunk]:
        """process scraped pages into chunks"""
        all_chunks = []

        for page in pages:
            chunks = self._chunk_page(page)
            all_chunks.extend(chunks)

        print(f"created {len(all_chunks)} chunks from {len(pages)} pages")
        return all_chunks

    def _chunk_page(self, page: dict) -> list[TextChunk]:
        """split a single page into chunks"""
        content = page["content"]
        url = page["url"]
        title = page["title"]

        # first, split by headings to maintain semantic coherence
        sections = self._split_by_headings(content)

        chunks = []
        chunk_index = 0

        for section_title, section_text in sections:
            # tokenize the section
            tokens = self.encoder.encode(section_text)

            if len(tokens) <= self.chunk_size:
                # section fits in one chunk
                chunks.append(TextChunk(
                    text=section_text,
                    source_url=url,
                    source_title=title,
                    chunk_index=chunk_index,
                    token_count=len(tokens),
                    metadata={
                        "section": section_title,
                        "page_title": title,
                    }
                ))
                chunk_index += 1
            else:
                # section needs to be split further
                sub_chunks = self._split_tokens(
                    section_text, tokens, section_title, url, title, chunk_index
                )
                chunks.extend(sub_chunks)
                chunk_index += len(sub_chunks)

        return chunks

    def _split_by_headings(self, content: str) -> list[tuple[str, str]]:
        """split content into sections based on markdown headings"""
        sections = []
        current_heading = "Introduction"
        current_text = []

        for line in content.split("\n"):
            heading_match = re.match(r'^#{1,3}\s+(.+)$', line)
            if heading_match:
                # save previous section
                if current_text:
                    text = "\n".join(current_text).strip()
                    if text:
                        sections.append((current_heading, text))

                current_heading = heading_match.group(1).strip()
                current_text = []
            else:
                current_text.append(line)

        # save last section
        if current_text:
            text = "\n".join(current_text).strip()
            if text:
                sections.append((current_heading, text))

        return sections

    def _split_tokens(self, text: str, tokens: list, section_title: str,
                      url: str, title: str, start_index: int) -> list[TextChunk]:
        """split text into overlapping token-based chunks"""
        chunks = []
        start = 0

        while start < len(tokens):
            end = start + self.chunk_size

            # decode the token range back to text
            chunk_tokens = tokens[start:end]
            chunk_text = self.encoder.decode(chunk_tokens)

            # try to break at a sentence boundary
            if end < len(tokens):
                last_period = chunk_text.rfind(". ")
                if last_period > len(chunk_text) * 0.5:
                    chunk_text = chunk_text[:last_period + 1]
                    # recalculate actual token count
                    chunk_tokens = self.encoder.encode(chunk_text)

            chunks.append(TextChunk(
                text=chunk_text.strip(),
                source_url=url,
                source_title=title,
                chunk_index=start_index + len(chunks),
                token_count=len(chunk_tokens),
                metadata={
                    "section": section_title,
                    "page_title": title,
                }
            ))

            start = start + self.chunk_size - self.chunk_overlap

        return chunks

    def count_tokens(self, text: str) -> int:
        """count tokens in a text string"""
        return len(self.encoder.encode(text))

Step 3: Create Embeddings and Store in Vector Database

# vector_store.py
import chromadb
from chromadb.config import Settings
from openai import OpenAI
import os
from typing import Optional

class VectorStore:
    def __init__(self, collection_name: str = "scraped_knowledge",
                 persist_directory: str = "./chroma_db"):
        self.openai = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
        self.embedding_model = "text-embedding-3-small"

        # initialize ChromaDB with persistence
        self.client = chromadb.PersistentClient(path=persist_directory)
        self.collection = self.client.get_or_create_collection(
            name=collection_name,
            metadata={"hnsw:space": "cosine"}
        )

    def add_chunks(self, chunks: list, batch_size: int = 100):
        """embed and store text chunks"""
        total = len(chunks)
        print(f"embedding and storing {total} chunks...")

        for i in range(0, total, batch_size):
            batch = chunks[i:i + batch_size]
            texts = [c.text for c in batch]

            # create embeddings
            response = self.openai.embeddings.create(
                model=self.embedding_model,
                input=texts,
            )

            embeddings = [item.embedding for item in response.data]

            # prepare metadata
            ids = [f"chunk_{i + j}" for j in range(len(batch))]
            metadatas = [{
                "source_url": c.source_url,
                "source_title": c.source_title,
                "section": c.metadata.get("section", ""),
                "chunk_index": c.chunk_index,
                "token_count": c.token_count,
            } for c in batch]

            # add to ChromaDB
            self.collection.add(
                ids=ids,
                embeddings=embeddings,
                documents=texts,
                metadatas=metadatas,
            )

            print(f"  stored chunks {i + 1} to {min(i + batch_size, total)}")

        print(f"done. collection now has {self.collection.count()} chunks")

    def search(self, query: str, n_results: int = 5,
               filter_url: Optional[str] = None) -> list[dict]:
        """search for relevant chunks"""
        # create query embedding
        response = self.openai.embeddings.create(
            model=self.embedding_model,
            input=[query],
        )
        query_embedding = response.data[0].embedding

        # build filter
        where_filter = None
        if filter_url:
            where_filter = {"source_url": {"$eq": filter_url}}

        # search
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=n_results,
            where=where_filter,
            include=["documents", "metadatas", "distances"],
        )

        # format results
        formatted = []
        for i in range(len(results["documents"][0])):
            formatted.append({
                "text": results["documents"][0][i],
                "metadata": results["metadatas"][0][i],
                "similarity": 1 - results["distances"][0][i],  # convert distance to similarity
            })

        return formatted

    def get_stats(self) -> dict:
        """get collection statistics"""
        return {
            "total_chunks": self.collection.count(),
            "collection_name": self.collection.name,
        }

Step 4: Build the RAG Chatbot

# chatbot.py
from openai import OpenAI
import os
from typing import Optional

class RAGChatbot:
    def __init__(self, vector_store, model: str = "gpt-4o-mini"):
        self.vector_store = vector_store
        self.openai = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
        self.model = model
        self.conversation_history = []
        self.system_prompt = """you are a helpful assistant that answers questions based on the provided context.

rules:
- only answer based on the provided context. if the context does not contain enough information to answer, say so.
- cite your sources by mentioning the page title and URL when relevant.
- be concise but thorough.
- if the user asks about something not in the context, explain that you can only answer based on the available knowledge base."""

    def ask(self, question: str, n_context: int = 5) -> dict:
        """ask a question and get a RAG-powered answer"""
        # step 1: retrieve relevant context
        search_results = self.vector_store.search(question, n_results=n_context)

        # step 2: build context string
        context = self._build_context(search_results)

        # step 3: generate answer
        messages = [
            {"role": "system", "content": self.system_prompt},
        ]

        # add conversation history for multi-turn support
        for msg in self.conversation_history[-6:]:  # keep last 3 exchanges
            messages.append(msg)

        # add the current question with context
        user_message = f"""Context from knowledge base:
---
{context}
---

Question: {question}"""

        messages.append({"role": "user", "content": user_message})

        response = self.openai.chat.completions.create(
            model=self.model,
            messages=messages,
            temperature=0.3,
            max_tokens=1024,
        )

        answer = response.choices[0].message.content

        # update conversation history
        self.conversation_history.append({"role": "user", "content": question})
        self.conversation_history.append({"role": "assistant", "content": answer})

        return {
            "answer": answer,
            "sources": [{
                "title": r["metadata"]["source_title"],
                "url": r["metadata"]["source_url"],
                "section": r["metadata"]["section"],
                "similarity": round(r["similarity"], 3),
            } for r in search_results],
            "model": self.model,
            "context_chunks_used": len(search_results),
        }

    def _build_context(self, search_results: list[dict]) -> str:
        """build a context string from search results"""
        context_parts = []

        for i, result in enumerate(search_results, 1):
            source = f"[Source: {result['metadata']['source_title']}]"
            section = result['metadata'].get('section', '')
            if section:
                source += f" [Section: {section}]"

            context_parts.append(f"{source}\n{result['text']}")

        return "\n\n---\n\n".join(context_parts)

    def reset_conversation(self):
        """clear conversation history"""
        self.conversation_history = []
        print("conversation history cleared")

Step 5: Put It All Together

# main.py
import json
import os
from dotenv import load_dotenv
from scraper import SiteScraper
from processor import TextProcessor
from vector_store import VectorStore
from chatbot import RAGChatbot

load_dotenv()

def build_knowledge_base():
    """scrape a website and build the knowledge base"""
    # step 1: scrape
    print("=" * 60)
    print("STEP 1: SCRAPING WEBSITE")
    print("=" * 60)

    scraper = SiteScraper(
        base_url="https://docs.example.com",
        proxy_url=os.getenv("PROXY_URL"),
        max_pages=30,
        delay_seconds=2.0,
    )
    pages = scraper.crawl()
    scraper.save_to_json("scraped_data.json")

    # step 2: process into chunks
    print("\n" + "=" * 60)
    print("STEP 2: PROCESSING INTO CHUNKS")
    print("=" * 60)

    processor = TextProcessor(chunk_size=500, chunk_overlap=50)
    page_dicts = [{"url": p.url, "title": p.title, "content": p.content} for p in pages]
    chunks = processor.process_pages(page_dicts)

    # step 3: embed and store
    print("\n" + "=" * 60)
    print("STEP 3: CREATING EMBEDDINGS AND STORING")
    print("=" * 60)

    store = VectorStore(collection_name="my_knowledge_base")
    store.add_chunks(chunks)

    stats = store.get_stats()
    print(f"\nknowledge base built: {stats['total_chunks']} chunks stored")

    return store


def run_chatbot(store: VectorStore = None):
    """run the interactive chatbot"""
    if store is None:
        store = VectorStore(collection_name="my_knowledge_base")

    chatbot = RAGChatbot(store)

    print("\n" + "=" * 60)
    print("RAG CHATBOT READY")
    print("type 'quit' to exit, 'reset' to clear history")
    print("=" * 60)

    while True:
        question = input("\nyou: ").strip()

        if not question:
            continue
        if question.lower() == "quit":
            break
        if question.lower() == "reset":
            chatbot.reset_conversation()
            continue

        result = chatbot.ask(question)

        print(f"\nassistant: {result['answer']}")
        print(f"\nsources ({len(result['sources'])}):")
        for source in result['sources']:
            print(f"  - {source['title']} ({source['section']}) "
                  f"[similarity: {source['similarity']}]")
            print(f"    {source['url']}")


def load_existing_data():
    """load previously scraped data and build knowledge base"""
    with open("scraped_data.json", "r") as f:
        pages = json.load(f)

    processor = TextProcessor(chunk_size=500, chunk_overlap=50)
    chunks = processor.process_pages(pages)

    store = VectorStore(collection_name="my_knowledge_base")
    store.add_chunks(chunks)

    return store


if __name__ == "__main__":
    import sys

    if len(sys.argv) > 1 and sys.argv[1] == "chat":
        # just run the chatbot with existing knowledge base
        run_chatbot()
    elif len(sys.argv) > 1 and sys.argv[1] == "load":
        # load existing scraped data
        store = load_existing_data()
        run_chatbot(store)
    else:
        # full pipeline: scrape, process, chat
        store = build_knowledge_base()
        run_chatbot(store)

Improving Retrieval Quality

combine vector similarity with keyword search for better results:

# add to VectorStore class
def hybrid_search(self, query: str, n_results: int = 5) -> list[dict]:
    """combine semantic search with keyword matching"""
    # semantic search
    semantic_results = self.search(query, n_results=n_results * 2)

    # keyword search (simple term matching)
    keywords = query.lower().split()
    keyword_scores = {}

    for result in semantic_results:
        text_lower = result["text"].lower()
        keyword_hits = sum(1 for kw in keywords if kw in text_lower)
        keyword_ratio = keyword_hits / len(keywords)
        keyword_scores[result["text"][:50]] = keyword_ratio

    # combine scores: 70% semantic, 30% keyword
    for result in semantic_results:
        key = result["text"][:50]
        kw_score = keyword_scores.get(key, 0)
        result["combined_score"] = (
            0.7 * result["similarity"] + 0.3 * kw_score
        )

    # sort by combined score and return top n
    semantic_results.sort(key=lambda x: x["combined_score"], reverse=True)
    return semantic_results[:n_results]

Query Expansion

improve retrieval by expanding the user’s query with related terms:

def expand_query(self, original_query: str) -> str:
    """use LLM to generate an expanded search query"""
    response = self.openai.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{
            "role": "user",
            "content": f"""Generate 3 alternative phrasings of this search query to improve retrieval.
Return only the expanded query as a single string.

Original: {original_query}"""
        }],
        temperature=0.7,
        max_tokens=200,
    )

    expanded = response.choices[0].message.content
    return f"{original_query} {expanded}"

Scaling the System

Keeping the Knowledge Base Fresh

set up periodic re-scraping to keep your chatbot current:

# refresh.py
import schedule
import time

def refresh_knowledge_base():
    """re-scrape and update the knowledge base"""
    scraper = SiteScraper(
        base_url="https://docs.example.com",
        proxy_url=os.getenv("PROXY_URL"),
        max_pages=30,
    )
    pages = scraper.crawl()

    processor = TextProcessor()
    chunks = processor.process_pages(
        [{"url": p.url, "title": p.title, "content": p.content} for p in pages]
    )

    # recreate the collection with fresh data
    store = VectorStore(collection_name="my_knowledge_base")
    store.client.delete_collection("my_knowledge_base")
    store.collection = store.client.create_collection(
        name="my_knowledge_base",
        metadata={"hnsw:space": "cosine"}
    )
    store.add_chunks(chunks)
    print("knowledge base refreshed")

# run refresh daily at 3 AM
schedule.every().day.at("03:00").do(refresh_knowledge_base)

while True:
    schedule.run_pending()
    time.sleep(60)

Multi-Source Knowledge Base

scrape multiple websites into the same knowledge base:

sources = [
    {"url": "https://docs.example.com", "name": "Documentation"},
    {"url": "https://blog.example.com", "name": "Blog"},
    {"url": "https://community.example.com/faq", "name": "FAQ"},
]

all_pages = []
for source in sources:
    scraper = SiteScraper(
        base_url=source["url"],
        proxy_url=os.getenv("PROXY_URL"),
        max_pages=20,
    )
    pages = scraper.crawl()
    all_pages.extend(pages)

processor = TextProcessor()
page_dicts = [{"url": p.url, "title": p.title, "content": p.content} for p in all_pages]
chunks = processor.process_pages(page_dicts)

store = VectorStore()
store.add_chunks(chunks)

Cost Estimation

here is what this system costs to run:

componentcost per 1000 pagesnotes
proxy (scraping)1 to 5 USDdepends on proxy provider and page count
embeddings (text-embedding-3-small)~0.02 USDvery cheap at 0.02 USD per 1M tokens
vector storage (ChromaDB)freeself-hosted, open source
chat completions (GPT-4o-mini)~0.01 USD per querydepends on context size

the entire pipeline for scraping 100 pages and serving hundreds of queries costs well under 10 USD.

Conclusion

building a RAG chatbot powered by web scraping combines two powerful capabilities: automated data collection and intelligent question answering. the pipeline described here is production-ready but also extensible. you can swap ChromaDB for Pinecone or Weaviate, replace OpenAI with a local model, or add more sophisticated chunking strategies.

the key insight is that the quality of your RAG chatbot depends far more on the quality of your scraping and chunking pipeline than on the LLM you choose. clean, well-structured text chunks with good metadata make retrieval more accurate, which in turn makes the generated answers more useful.

start with a small knowledge base (20 to 30 pages), test your chatbot thoroughly, and scale from there. the proxy investment ensures reliable scraping even for sites with moderate anti-bot protection, and the vector database handles the heavy lifting of finding relevant context at query time.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top