Build a RAG Chatbot with Web Scraping: the Complete Python Tutorial
Retrieval Augmented Generation (RAG) has become the standard approach for building chatbots that answer questions based on specific data rather than general knowledge. but before you can build a RAG system, you need data. and for many use cases, that data lives on websites.
this tutorial walks you through the entire pipeline: scraping websites to build a knowledge base, processing the content into embeddings, storing them in a vector database, and building a chatbot that retrieves relevant context before generating answers. by the end, you will have a working RAG chatbot powered by scraped web data.
What We Are Building
the complete system has four components:
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ Web Scraper │ --> │ Text Processor │ --> │ Vector Store │ --> │ RAG Chatbot │
│ (collect data) │ │ (chunk & embed) │ │ (store & search)│ │ (retrieve & gen) │
└──────────────────┘ └──────────────────┘ └──────────────────┘ └──────────────────┘
- Web Scraper: crawls target websites and extracts clean text content
- Text Processor: chunks the text and creates embeddings
- Vector Store: stores embeddings for efficient similarity search
- RAG Chatbot: takes user questions, retrieves relevant chunks, and generates answers
Prerequisites and Setup
# create project
mkdir rag-scraper-chatbot && cd rag-scraper-chatbot
python -m venv venv
source venv/bin/activate
# install dependencies
pip install httpx selectolax markdownify
pip install openai chromadb tiktoken
pip install langchain langchain-openai langchain-chroma
pip install rich # for nice terminal output
create a .env file:
OPENAI_API_KEY=your_openai_key
PROXY_URL=http://user:pass@gate.proxyservice.com:7777
Step 1: Build the Web Scraper
the scraper needs to crawl a website, extract clean text content, and handle pagination or site navigation.
# scraper.py
import httpx
from selectolax.parser import HTMLParser
from markdownify import markdownify
from urllib.parse import urljoin, urlparse
from dataclasses import dataclass, field
from typing import Optional
import time
import re
import json
@dataclass
class ScrapedPage:
url: str
title: str
content: str # clean markdown text
word_count: int
scraped_at: str
@dataclass
class SiteScraper:
base_url: str
proxy_url: Optional[str] = None
max_pages: int = 50
delay_seconds: float = 2.0
visited: set = field(default_factory=set)
pages: list = field(default_factory=list)
def _get_client(self) -> httpx.Client:
"""create an HTTP client with proxy support"""
kwargs = {
"timeout": 30.0,
"follow_redirects": True,
"headers": {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml",
"Accept-Language": "en-US,en;q=0.9",
},
}
if self.proxy_url:
kwargs["proxy"] = self.proxy_url
return httpx.Client(**kwargs)
def crawl(self) -> list[ScrapedPage]:
"""crawl the website starting from base_url"""
from datetime import datetime
urls_to_visit = [self.base_url]
client = self._get_client()
while urls_to_visit and len(self.visited) < self.max_pages:
url = urls_to_visit.pop(0)
if url in self.visited:
continue
print(f"scraping [{len(self.visited) + 1}/{self.max_pages}]: {url}")
try:
response = client.get(url)
if response.status_code != 200:
print(f" skipping: HTTP {response.status_code}")
continue
self.visited.add(url)
# parse the page
tree = HTMLParser(response.text)
# extract title
title_el = tree.css_first("title")
title = title_el.text().strip() if title_el else url
# extract and clean content
content = self._extract_content(tree)
if not content or len(content.split()) < 50:
print(f" skipping: too little content")
continue
page = ScrapedPage(
url=url,
title=title,
content=content,
word_count=len(content.split()),
scraped_at=datetime.utcnow().isoformat(),
)
self.pages.append(page)
print(f" extracted {page.word_count} words")
# find links to other pages on the same domain
new_urls = self._extract_links(tree, url)
for new_url in new_urls:
if new_url not in self.visited and new_url not in urls_to_visit:
urls_to_visit.append(new_url)
time.sleep(self.delay_seconds)
except Exception as e:
print(f" error: {e}")
continue
client.close()
print(f"\ncrawl complete: {len(self.pages)} pages scraped")
return self.pages
def _extract_content(self, tree: HTMLParser) -> str:
"""extract clean text content from HTML"""
# remove unwanted elements
for selector in ["script", "style", "nav", "header", "footer",
".sidebar", ".menu", ".advertisement", ".cookie-notice",
"[role='navigation']", "[role='banner']"]:
for el in tree.css(selector):
el.decompose()
# try to find main content area
main_content = (
tree.css_first("main") or
tree.css_first("article") or
tree.css_first("[role='main']") or
tree.css_first(".content") or
tree.css_first("#content") or
tree.body
)
if not main_content:
return ""
# convert to markdown for clean text
html = main_content.html
markdown = markdownify(html, heading_style="ATX", strip=["img", "video"])
# clean up the markdown
markdown = re.sub(r'\n{3,}', '\n\n', markdown) # collapse multiple newlines
markdown = re.sub(r'[ \t]+', ' ', markdown) # collapse whitespace
markdown = markdown.strip()
return markdown
def _extract_links(self, tree: HTMLParser, current_url: str) -> list[str]:
"""extract internal links from the page"""
base_domain = urlparse(self.base_url).netloc
links = []
for link in tree.css("a[href]"):
href = link.attributes.get("href", "")
if not href or href.startswith("#") or href.startswith("mailto:"):
continue
# resolve relative URLs
absolute_url = urljoin(current_url, href)
# only follow links on the same domain
if urlparse(absolute_url).netloc == base_domain:
# remove fragments and query strings for deduplication
clean_url = absolute_url.split("#")[0].split("?")[0]
if clean_url not in links:
links.append(clean_url)
return links
def save_to_json(self, filename: str):
"""save scraped pages to JSON"""
from dataclasses import asdict
with open(filename, "w", encoding="utf-8") as f:
json.dump([asdict(p) for p in self.pages], f, indent=2, ensure_ascii=False)
print(f"saved {len(self.pages)} pages to {filename}")
# usage
if __name__ == "__main__":
import os
from dotenv import load_dotenv
load_dotenv()
scraper = SiteScraper(
base_url="https://docs.example.com",
proxy_url=os.getenv("PROXY_URL"),
max_pages=30,
delay_seconds=2.0,
)
pages = scraper.crawl()
scraper.save_to_json("scraped_data.json")
Step 2: Process Text into Chunks
RAG systems work best when documents are split into smaller, focused chunks. the chunking strategy significantly affects retrieval quality.
# processor.py
from dataclasses import dataclass
import tiktoken
import re
@dataclass
class TextChunk:
text: str
source_url: str
source_title: str
chunk_index: int
token_count: int
metadata: dict
class TextProcessor:
def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50,
model: str = "text-embedding-3-small"):
self.chunk_size = chunk_size # in tokens
self.chunk_overlap = chunk_overlap
self.encoder = tiktoken.encoding_for_model(model)
def process_pages(self, pages: list[dict]) -> list[TextChunk]:
"""process scraped pages into chunks"""
all_chunks = []
for page in pages:
chunks = self._chunk_page(page)
all_chunks.extend(chunks)
print(f"created {len(all_chunks)} chunks from {len(pages)} pages")
return all_chunks
def _chunk_page(self, page: dict) -> list[TextChunk]:
"""split a single page into chunks"""
content = page["content"]
url = page["url"]
title = page["title"]
# first, split by headings to maintain semantic coherence
sections = self._split_by_headings(content)
chunks = []
chunk_index = 0
for section_title, section_text in sections:
# tokenize the section
tokens = self.encoder.encode(section_text)
if len(tokens) <= self.chunk_size:
# section fits in one chunk
chunks.append(TextChunk(
text=section_text,
source_url=url,
source_title=title,
chunk_index=chunk_index,
token_count=len(tokens),
metadata={
"section": section_title,
"page_title": title,
}
))
chunk_index += 1
else:
# section needs to be split further
sub_chunks = self._split_tokens(
section_text, tokens, section_title, url, title, chunk_index
)
chunks.extend(sub_chunks)
chunk_index += len(sub_chunks)
return chunks
def _split_by_headings(self, content: str) -> list[tuple[str, str]]:
"""split content into sections based on markdown headings"""
sections = []
current_heading = "Introduction"
current_text = []
for line in content.split("\n"):
heading_match = re.match(r'^#{1,3}\s+(.+)$', line)
if heading_match:
# save previous section
if current_text:
text = "\n".join(current_text).strip()
if text:
sections.append((current_heading, text))
current_heading = heading_match.group(1).strip()
current_text = []
else:
current_text.append(line)
# save last section
if current_text:
text = "\n".join(current_text).strip()
if text:
sections.append((current_heading, text))
return sections
def _split_tokens(self, text: str, tokens: list, section_title: str,
url: str, title: str, start_index: int) -> list[TextChunk]:
"""split text into overlapping token-based chunks"""
chunks = []
start = 0
while start < len(tokens):
end = start + self.chunk_size
# decode the token range back to text
chunk_tokens = tokens[start:end]
chunk_text = self.encoder.decode(chunk_tokens)
# try to break at a sentence boundary
if end < len(tokens):
last_period = chunk_text.rfind(". ")
if last_period > len(chunk_text) * 0.5:
chunk_text = chunk_text[:last_period + 1]
# recalculate actual token count
chunk_tokens = self.encoder.encode(chunk_text)
chunks.append(TextChunk(
text=chunk_text.strip(),
source_url=url,
source_title=title,
chunk_index=start_index + len(chunks),
token_count=len(chunk_tokens),
metadata={
"section": section_title,
"page_title": title,
}
))
start = start + self.chunk_size - self.chunk_overlap
return chunks
def count_tokens(self, text: str) -> int:
"""count tokens in a text string"""
return len(self.encoder.encode(text))
Step 3: Create Embeddings and Store in Vector Database
# vector_store.py
import chromadb
from chromadb.config import Settings
from openai import OpenAI
import os
from typing import Optional
class VectorStore:
def __init__(self, collection_name: str = "scraped_knowledge",
persist_directory: str = "./chroma_db"):
self.openai = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
self.embedding_model = "text-embedding-3-small"
# initialize ChromaDB with persistence
self.client = chromadb.PersistentClient(path=persist_directory)
self.collection = self.client.get_or_create_collection(
name=collection_name,
metadata={"hnsw:space": "cosine"}
)
def add_chunks(self, chunks: list, batch_size: int = 100):
"""embed and store text chunks"""
total = len(chunks)
print(f"embedding and storing {total} chunks...")
for i in range(0, total, batch_size):
batch = chunks[i:i + batch_size]
texts = [c.text for c in batch]
# create embeddings
response = self.openai.embeddings.create(
model=self.embedding_model,
input=texts,
)
embeddings = [item.embedding for item in response.data]
# prepare metadata
ids = [f"chunk_{i + j}" for j in range(len(batch))]
metadatas = [{
"source_url": c.source_url,
"source_title": c.source_title,
"section": c.metadata.get("section", ""),
"chunk_index": c.chunk_index,
"token_count": c.token_count,
} for c in batch]
# add to ChromaDB
self.collection.add(
ids=ids,
embeddings=embeddings,
documents=texts,
metadatas=metadatas,
)
print(f" stored chunks {i + 1} to {min(i + batch_size, total)}")
print(f"done. collection now has {self.collection.count()} chunks")
def search(self, query: str, n_results: int = 5,
filter_url: Optional[str] = None) -> list[dict]:
"""search for relevant chunks"""
# create query embedding
response = self.openai.embeddings.create(
model=self.embedding_model,
input=[query],
)
query_embedding = response.data[0].embedding
# build filter
where_filter = None
if filter_url:
where_filter = {"source_url": {"$eq": filter_url}}
# search
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=n_results,
where=where_filter,
include=["documents", "metadatas", "distances"],
)
# format results
formatted = []
for i in range(len(results["documents"][0])):
formatted.append({
"text": results["documents"][0][i],
"metadata": results["metadatas"][0][i],
"similarity": 1 - results["distances"][0][i], # convert distance to similarity
})
return formatted
def get_stats(self) -> dict:
"""get collection statistics"""
return {
"total_chunks": self.collection.count(),
"collection_name": self.collection.name,
}
Step 4: Build the RAG Chatbot
# chatbot.py
from openai import OpenAI
import os
from typing import Optional
class RAGChatbot:
def __init__(self, vector_store, model: str = "gpt-4o-mini"):
self.vector_store = vector_store
self.openai = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
self.model = model
self.conversation_history = []
self.system_prompt = """you are a helpful assistant that answers questions based on the provided context.
rules:
- only answer based on the provided context. if the context does not contain enough information to answer, say so.
- cite your sources by mentioning the page title and URL when relevant.
- be concise but thorough.
- if the user asks about something not in the context, explain that you can only answer based on the available knowledge base."""
def ask(self, question: str, n_context: int = 5) -> dict:
"""ask a question and get a RAG-powered answer"""
# step 1: retrieve relevant context
search_results = self.vector_store.search(question, n_results=n_context)
# step 2: build context string
context = self._build_context(search_results)
# step 3: generate answer
messages = [
{"role": "system", "content": self.system_prompt},
]
# add conversation history for multi-turn support
for msg in self.conversation_history[-6:]: # keep last 3 exchanges
messages.append(msg)
# add the current question with context
user_message = f"""Context from knowledge base:
---
{context}
---
Question: {question}"""
messages.append({"role": "user", "content": user_message})
response = self.openai.chat.completions.create(
model=self.model,
messages=messages,
temperature=0.3,
max_tokens=1024,
)
answer = response.choices[0].message.content
# update conversation history
self.conversation_history.append({"role": "user", "content": question})
self.conversation_history.append({"role": "assistant", "content": answer})
return {
"answer": answer,
"sources": [{
"title": r["metadata"]["source_title"],
"url": r["metadata"]["source_url"],
"section": r["metadata"]["section"],
"similarity": round(r["similarity"], 3),
} for r in search_results],
"model": self.model,
"context_chunks_used": len(search_results),
}
def _build_context(self, search_results: list[dict]) -> str:
"""build a context string from search results"""
context_parts = []
for i, result in enumerate(search_results, 1):
source = f"[Source: {result['metadata']['source_title']}]"
section = result['metadata'].get('section', '')
if section:
source += f" [Section: {section}]"
context_parts.append(f"{source}\n{result['text']}")
return "\n\n---\n\n".join(context_parts)
def reset_conversation(self):
"""clear conversation history"""
self.conversation_history = []
print("conversation history cleared")
Step 5: Put It All Together
# main.py
import json
import os
from dotenv import load_dotenv
from scraper import SiteScraper
from processor import TextProcessor
from vector_store import VectorStore
from chatbot import RAGChatbot
load_dotenv()
def build_knowledge_base():
"""scrape a website and build the knowledge base"""
# step 1: scrape
print("=" * 60)
print("STEP 1: SCRAPING WEBSITE")
print("=" * 60)
scraper = SiteScraper(
base_url="https://docs.example.com",
proxy_url=os.getenv("PROXY_URL"),
max_pages=30,
delay_seconds=2.0,
)
pages = scraper.crawl()
scraper.save_to_json("scraped_data.json")
# step 2: process into chunks
print("\n" + "=" * 60)
print("STEP 2: PROCESSING INTO CHUNKS")
print("=" * 60)
processor = TextProcessor(chunk_size=500, chunk_overlap=50)
page_dicts = [{"url": p.url, "title": p.title, "content": p.content} for p in pages]
chunks = processor.process_pages(page_dicts)
# step 3: embed and store
print("\n" + "=" * 60)
print("STEP 3: CREATING EMBEDDINGS AND STORING")
print("=" * 60)
store = VectorStore(collection_name="my_knowledge_base")
store.add_chunks(chunks)
stats = store.get_stats()
print(f"\nknowledge base built: {stats['total_chunks']} chunks stored")
return store
def run_chatbot(store: VectorStore = None):
"""run the interactive chatbot"""
if store is None:
store = VectorStore(collection_name="my_knowledge_base")
chatbot = RAGChatbot(store)
print("\n" + "=" * 60)
print("RAG CHATBOT READY")
print("type 'quit' to exit, 'reset' to clear history")
print("=" * 60)
while True:
question = input("\nyou: ").strip()
if not question:
continue
if question.lower() == "quit":
break
if question.lower() == "reset":
chatbot.reset_conversation()
continue
result = chatbot.ask(question)
print(f"\nassistant: {result['answer']}")
print(f"\nsources ({len(result['sources'])}):")
for source in result['sources']:
print(f" - {source['title']} ({source['section']}) "
f"[similarity: {source['similarity']}]")
print(f" {source['url']}")
def load_existing_data():
"""load previously scraped data and build knowledge base"""
with open("scraped_data.json", "r") as f:
pages = json.load(f)
processor = TextProcessor(chunk_size=500, chunk_overlap=50)
chunks = processor.process_pages(pages)
store = VectorStore(collection_name="my_knowledge_base")
store.add_chunks(chunks)
return store
if __name__ == "__main__":
import sys
if len(sys.argv) > 1 and sys.argv[1] == "chat":
# just run the chatbot with existing knowledge base
run_chatbot()
elif len(sys.argv) > 1 and sys.argv[1] == "load":
# load existing scraped data
store = load_existing_data()
run_chatbot(store)
else:
# full pipeline: scrape, process, chat
store = build_knowledge_base()
run_chatbot(store)
Improving Retrieval Quality
Hybrid Search
combine vector similarity with keyword search for better results:
# add to VectorStore class
def hybrid_search(self, query: str, n_results: int = 5) -> list[dict]:
"""combine semantic search with keyword matching"""
# semantic search
semantic_results = self.search(query, n_results=n_results * 2)
# keyword search (simple term matching)
keywords = query.lower().split()
keyword_scores = {}
for result in semantic_results:
text_lower = result["text"].lower()
keyword_hits = sum(1 for kw in keywords if kw in text_lower)
keyword_ratio = keyword_hits / len(keywords)
keyword_scores[result["text"][:50]] = keyword_ratio
# combine scores: 70% semantic, 30% keyword
for result in semantic_results:
key = result["text"][:50]
kw_score = keyword_scores.get(key, 0)
result["combined_score"] = (
0.7 * result["similarity"] + 0.3 * kw_score
)
# sort by combined score and return top n
semantic_results.sort(key=lambda x: x["combined_score"], reverse=True)
return semantic_results[:n_results]
Query Expansion
improve retrieval by expanding the user’s query with related terms:
def expand_query(self, original_query: str) -> str:
"""use LLM to generate an expanded search query"""
response = self.openai.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "user",
"content": f"""Generate 3 alternative phrasings of this search query to improve retrieval.
Return only the expanded query as a single string.
Original: {original_query}"""
}],
temperature=0.7,
max_tokens=200,
)
expanded = response.choices[0].message.content
return f"{original_query} {expanded}"
Scaling the System
Keeping the Knowledge Base Fresh
set up periodic re-scraping to keep your chatbot current:
# refresh.py
import schedule
import time
def refresh_knowledge_base():
"""re-scrape and update the knowledge base"""
scraper = SiteScraper(
base_url="https://docs.example.com",
proxy_url=os.getenv("PROXY_URL"),
max_pages=30,
)
pages = scraper.crawl()
processor = TextProcessor()
chunks = processor.process_pages(
[{"url": p.url, "title": p.title, "content": p.content} for p in pages]
)
# recreate the collection with fresh data
store = VectorStore(collection_name="my_knowledge_base")
store.client.delete_collection("my_knowledge_base")
store.collection = store.client.create_collection(
name="my_knowledge_base",
metadata={"hnsw:space": "cosine"}
)
store.add_chunks(chunks)
print("knowledge base refreshed")
# run refresh daily at 3 AM
schedule.every().day.at("03:00").do(refresh_knowledge_base)
while True:
schedule.run_pending()
time.sleep(60)
Multi-Source Knowledge Base
scrape multiple websites into the same knowledge base:
sources = [
{"url": "https://docs.example.com", "name": "Documentation"},
{"url": "https://blog.example.com", "name": "Blog"},
{"url": "https://community.example.com/faq", "name": "FAQ"},
]
all_pages = []
for source in sources:
scraper = SiteScraper(
base_url=source["url"],
proxy_url=os.getenv("PROXY_URL"),
max_pages=20,
)
pages = scraper.crawl()
all_pages.extend(pages)
processor = TextProcessor()
page_dicts = [{"url": p.url, "title": p.title, "content": p.content} for p in all_pages]
chunks = processor.process_pages(page_dicts)
store = VectorStore()
store.add_chunks(chunks)
Cost Estimation
here is what this system costs to run:
| component | cost per 1000 pages | notes |
|---|---|---|
| proxy (scraping) | 1 to 5 USD | depends on proxy provider and page count |
| embeddings (text-embedding-3-small) | ~0.02 USD | very cheap at 0.02 USD per 1M tokens |
| vector storage (ChromaDB) | free | self-hosted, open source |
| chat completions (GPT-4o-mini) | ~0.01 USD per query | depends on context size |
the entire pipeline for scraping 100 pages and serving hundreds of queries costs well under 10 USD.
Conclusion
building a RAG chatbot powered by web scraping combines two powerful capabilities: automated data collection and intelligent question answering. the pipeline described here is production-ready but also extensible. you can swap ChromaDB for Pinecone or Weaviate, replace OpenAI with a local model, or add more sophisticated chunking strategies.
the key insight is that the quality of your RAG chatbot depends far more on the quality of your scraping and chunking pipeline than on the LLM you choose. clean, well-structured text chunks with good metadata make retrieval more accurate, which in turn makes the generated answers more useful.
start with a small knowledge base (20 to 30 pages), test your chatbot thoroughly, and scale from there. the proxy investment ensures reliable scraping even for sites with moderate anti-bot protection, and the vector database handles the heavy lifting of finding relevant context at query time.