Building a Data Warehouse from Scraped Data
Design and build a data warehouse for scraped data — schema design, ETL pipelines, incremental loading, and analytics queries with DuckDB/PostgreSQL. This hands-on tutorial walks you through the complete implementation with Python, from architecture design to a working tool you can deploy today.
Architecture Overview
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Data Source │ ──→ │ Processor │ ──→ │ Storage │
│ (Scraper) │ │ (Pipeline) │ │ (Database) │
└──────────────┘ └──────────────┘ └──────────────┘
│ │ │
└─────────────────────┼─────────────────────┘
│
┌────────┴────────┐
│ Dashboard / │
│ API Output │
└─────────────────┘Prerequisites
# Create project
mkdir building-data-warehouse-scraped-data && cd building-data-warehouse-scraped-data
python -m venv venv && source venv/bin/activate
# Install dependencies
pip install httpx beautifulsoup4 lxml asyncio
pip install fastapi uvicorn # For API/dashboard
pip install sqlalchemy # For database
pip install schedule # For schedulingCore Implementation
import asyncio
import httpx
import json
import time
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import List, Optional
from bs4 import BeautifulSoup
@dataclass
class ScrapedItem:
source: str
title: str
url: str
data: dict
scraped_at: str = ""
def __post_init__(self):
self.scraped_at = self.scraped_at or datetime.utcnow().isoformat()
class BuildingDataWarehouseScrapedData:
"""Main application class."""
def __init__(self, proxy_url: Optional[str] = None):
self.proxy = proxy_url
self.results: List[ScrapedItem] = []
self.client_kwargs = {
'timeout': 30,
'headers': {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 (KHTML, like Gecko) '
'Chrome/120.0.0.0 Safari/537.36',
},
}
if proxy_url:
self.client_kwargs['proxy'] = proxy_url
async def scrape_source(self, url: str) -> List[ScrapedItem]:
"""Scrape data from a single source."""
async with httpx.AsyncClient(**self.client_kwargs) as client:
response = await client.get(url)
if response.status_code != 200:
print(f"Error {response.status_code} for {url}")
return []
soup = BeautifulSoup(response.text, 'lxml')
items = self._parse_page(soup, url)
self.results.extend(items)
return items
def _parse_page(self, soup: BeautifulSoup, source_url: str) -> List[ScrapedItem]:
"""Parse HTML and extract items. Customize for your target."""
items = []
for element in soup.select('article, .item, .listing, .result'):
title_el = element.select_one('h2, h3, .title, .name')
link_el = element.select_one('a[href]')
if title_el:
items.append(ScrapedItem(
source=source_url,
title=title_el.get_text(strip=True),
url=link_el['href'] if link_el else '',
data={},
))
return items
async def run(self, sources: List[str], concurrency: int = 5):
"""Run the scraper across multiple sources."""
semaphore = asyncio.Semaphore(concurrency)
async def scrape_with_limit(url):
async with semaphore:
return await self.scrape_source(url)
tasks = [scrape_with_limit(url) for url in sources]
await asyncio.gather(*tasks, return_exceptions=True)
print(f"Scraped {len(self.results)} items from {len(sources)} sources")
return self.results
def export_json(self, filename: str = "output.json"):
"""Export results to JSON."""
with open(filename, 'w') as f:
json.dump([asdict(r) for r in self.results], f, indent=2)
print(f"Exported {len(self.results)} items to {filename}")
def export_csv(self, filename: str = "output.csv"):
"""Export results to CSV."""
import csv
if not self.results:
return
with open(filename, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=asdict(self.results[0]).keys())
writer.writeheader()
for item in self.results:
writer.writerow(asdict(item))
print(f"Exported {len(self.results)} items to {filename}")Database Storage
from sqlalchemy import create_engine, Column, String, JSON, DateTime
from sqlalchemy.orm import declarative_base, Session
from datetime import datetime
Base = declarative_base()
class StoredItem(Base):
__tablename__ = 'items'
id = Column(String, primary_key=True)
source = Column(String, index=True)
title = Column(String)
url = Column(String, unique=True)
data = Column(JSON)
scraped_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
class Storage:
def __init__(self, db_url='sqlite:///scraped_data.db'):
self.engine = create_engine(db_url)
Base.metadata.create_all(self.engine)
def save_items(self, items: List[ScrapedItem]):
with Session(self.engine) as session:
saved = 0
for item in items:
existing = session.query(StoredItem).filter_by(url=item.url).first()
if not existing:
db_item = StoredItem(
id=item.url,
source=item.source,
title=item.title,
url=item.url,
data=item.data,
)
session.add(db_item)
saved += 1
else:
existing.data = item.data
existing.updated_at = datetime.utcnow()
session.commit()
print(f"Saved {saved} new items, updated {len(items) - saved} existing")API Endpoint
from fastapi import FastAPI, Query
from fastapi.responses import JSONResponse
app = FastAPI(title="Building a Data Warehouse from Scraped Data")
storage = Storage()
@app.get("/api/items")
async def get_items(
source: Optional[str] = None,
limit: int = Query(default=50, le=500),
offset: int = 0,
):
with Session(storage.engine) as session:
query = session.query(StoredItem)
if source:
query = query.filter(StoredItem.source == source)
total = query.count()
items = query.offset(offset).limit(limit).all()
return {
"total": total,
"items": [
{
"title": item.title,
"url": item.url,
"source": item.source,
"data": item.data,
"scraped_at": item.scraped_at.isoformat(),
}
for item in items
],
}
@app.post("/api/scrape")
async def trigger_scrape(sources: List[str]):
scraper = BuildingDataWarehouseScrapedData(
proxy_url="http://user:pass@proxy.example.com:8080"
)
results = await scraper.run(sources)
storage.save_items(results)
return {"scraped": len(results)}
# Run: uvicorn main:app --reloadScheduling
import schedule
import time
def scheduled_scrape():
sources = [
'https://example.com/listings',
'https://example.com/products',
]
scraper = BuildingDataWarehouseScrapedData(
proxy_url="http://user:pass@proxy.example.com:8080"
)
results = asyncio.run(scraper.run(sources))
storage = Storage()
storage.save_items(results)
# Run every hour
schedule.every(1).hours.do(scheduled_scrape)
while True:
schedule.run_pending()
time.sleep(60)Deployment with Docker
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]# docker-compose.yml
version: '3.8'
services:
app:
build: .
ports:
- "8000:8000"
environment:
- PROXY_URL=http://user:pass@proxy.example.com:8080
- DATABASE_URL=sqlite:///data/scraped.db
volumes:
- ./data:/app/data
scheduler:
build: .
command: python scheduler.py
environment:
- PROXY_URL=http://user:pass@proxy.example.com:8080
- DATABASE_URL=sqlite:///data/scraped.db
volumes:
- ./data:/app/dataInternal Links
- Web Scraping Architecture — design patterns for production tools
- Proxy Connection Pooling — optimize proxy usage
- Data Deduplication — clean your scraped data
- Monitoring Web Scrapers — add observability
- Building a Proxy Checker — verify proxy health
FAQ
What proxy type works best for this tool?
For scraping public websites and directories, datacenter proxies offer the best cost-to-performance ratio. If targets have anti-bot protection, use residential proxies. Start with a small proxy pool and scale based on your needs.
How do I handle websites that change their HTML structure?
Build robust selectors using multiple fallback patterns. Monitor data quality metrics — when extraction rates drop, it signals a structure change. Use CSS selectors with data attributes when available, as they are more stable than class names.
Can I run this tool on a schedule?
Yes. Use Python’s schedule library for simple scheduling, or cron jobs on Linux, or deploy to a cloud service with built-in scheduling (AWS Lambda + EventBridge, Google Cloud Scheduler). Docker Compose makes it easy to run both the API and scheduler.
How do I avoid getting blocked?
Rotate user agents, use residential proxies for protected sites, respect robots.txt, add delays between requests (1-3 seconds), and avoid scraping during peak hours. Start slow and increase rate gradually while monitoring for blocks.
What database should I use for storing scraped data?
SQLite works for small projects (< 1M records). PostgreSQL handles larger datasets with better concurrent access. For time-series data (price tracking), consider TimescaleDB. For document-like data, MongoDB works well.
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
Related Reading
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)