Data Pipeline ETL for Scraped Data: Clean and Transform

Data Pipeline ETL for Scraped Data: Clean and Transform

Raw scraped data is messy — duplicate entries, inconsistent formats, missing fields, and encoding issues. An ETL (Extract, Transform, Load) pipeline cleans and structures this data before it reaches your database or analytics tools.

Pipeline Architecture

Raw Scraped Data (JSON/HTML)
    |
    v
EXTRACT: Parse & normalize raw data
    |
    v
TRANSFORM: Clean, validate, deduplicate
    |
    v
LOAD: Store in database/warehouse
    |
    v
VERIFY: Data quality checks

Python ETL Pipeline

import json
import hashlib
import re
from datetime import datetime
from typing import List, Dict, Optional
from dataclasses import dataclass
import psycopg2
from psycopg2.extras import execute_values

@dataclass
class CleanedRecord:
    url: str
    domain: str
    title: str
    price: Optional[float]
    currency: str
    description: str
    scraped_at: datetime
    content_hash: str
    raw_data: dict

class ScrapingETL:
    def __init__(self, db_url: str):
        self.conn = psycopg2.connect(db_url)
        self.stats = {"extracted": 0, "transformed": 0, "loaded": 0, "duplicates": 0, "errors": 0}

    def extract(self, raw_file: str) -> List[dict]:
        with open(raw_file) as f:
            records = json.load(f)
        self.stats["extracted"] = len(records)
        return records

    def transform(self, records: List[dict]) -> List[CleanedRecord]:
        cleaned = []
        seen_hashes = set()

        for record in records:
            try:
                # Normalize and clean
                title = self._clean_text(record.get("title", ""))
                price = self._extract_price(record.get("price", ""))
                currency = self._extract_currency(record.get("price", ""))
                description = self._clean_text(record.get("description", ""))

                # Deduplication
                content_hash = hashlib.md5(f"{title}{price}{record.get('url', '')}".encode()).hexdigest()
                if content_hash in seen_hashes:
                    self.stats["duplicates"] += 1
                    continue
                seen_hashes.add(content_hash)

                # Validation
                if not title or not record.get("url"):
                    self.stats["errors"] += 1
                    continue

                cleaned.append(CleanedRecord(
                    url=record["url"],
                    domain=record["url"].split("/")[2] if "/" in record["url"] else "",
                    title=title,
                    price=price,
                    currency=currency,
                    description=description[:1000],
                    scraped_at=datetime.fromisoformat(record.get("scraped_at", datetime.now().isoformat())),
                    content_hash=content_hash,
                    raw_data=record,
                ))
                self.stats["transformed"] += 1
            except Exception as e:
                self.stats["errors"] += 1

        return cleaned

    def load(self, records: List[CleanedRecord]):
        cursor = self.conn.cursor()
        values = [
            (r.url, r.domain, r.title, r.price, r.currency, r.description, r.scraped_at, r.content_hash, json.dumps(r.raw_data))
            for r in records
        ]
        execute_values(cursor, """
            INSERT INTO scraped_data (url, domain, title, price, currency, description, scraped_at, content_hash, raw_data)
            VALUES %s
            ON CONFLICT (content_hash) DO NOTHING
        """, values)
        self.conn.commit()
        self.stats["loaded"] = cursor.rowcount

    def _clean_text(self, text: str) -> str:
        if not text:
            return ""
        text = re.sub(r'\s+', ' ', text).strip()
        text = text.encode('utf-8', errors='ignore').decode('utf-8')
        return text

    def _extract_price(self, text: str) -> Optional[float]:
        if not text:
            return None
        match = re.search(r'[\d,]+\.?\d*', str(text).replace(',', ''))
        return float(match.group()) if match else None

    def _extract_currency(self, text: str) -> str:
        currencies = {"$": "USD", "EUR": "EUR", "GBP": "GBP", "\u00a3": "GBP"}
        for symbol, code in currencies.items():
            if symbol in str(text):
                return code
        return "USD"

    def run(self, input_file: str):
        print(f"Starting ETL pipeline for {input_file}")
        records = self.extract(input_file)
        cleaned = self.transform(records)
        self.load(cleaned)
        print(f"ETL complete: {json.dumps(self.stats, indent=2)}")

# Usage
etl = ScrapingETL("postgresql://user:pass@localhost/scraping")
etl.run("scraped_products.json")

FAQ

How do I handle deduplication across multiple scraping runs?

Use content hashes (hash of key fields) with database unique constraints. The ON CONFLICT DO NOTHING clause in PostgreSQL prevents duplicate inserts. For updates, use ON CONFLICT DO UPDATE to merge new data.

What database should I use for scraped data?

PostgreSQL with JSONB columns is excellent for semi-structured scraped data. For very large datasets (10M+ rows), consider TimescaleDB for time-series data or ClickHouse for analytics. See our scraping results database guide for schema design.

How do I validate scraped data quality?

Add validation rules in the transform step: required fields, value ranges (price > 0), format checks (valid URLs), and cross-field consistency. Track validation failure rates — a sudden increase often indicates the target site has changed structure.

Implementation Best Practices

Error Handling and Retry Logic

Production scraping tools must handle failures gracefully. Implement exponential backoff with jitter:

import random
import time

def retry_with_backoff(func, max_retries=3, base_delay=1):
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s")
            time.sleep(delay)

Logging Configuration

Set up structured logging for debugging and monitoring:

import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
        }
        if record.exc_info:
            log_entry["exception"] = self.formatException(record.exc_info)
        return json.dumps(log_entry)

# Setup
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger("scraper")
logger.addHandler(handler)
logger.setLevel(logging.INFO)

Configuration Management

Use environment variables and config files for flexibility:

import os
from dataclasses import dataclass

@dataclass
class ScraperConfig:
    proxy_url: str = os.getenv("PROXY_URL", "")
    concurrent_workers: int = int(os.getenv("CONCURRENT_WORKERS", "10"))
    request_timeout: int = int(os.getenv("REQUEST_TIMEOUT", "15"))
    max_retries: int = int(os.getenv("MAX_RETRIES", "3"))
    rate_limit_per_second: float = float(os.getenv("RATE_LIMIT", "5"))
    output_format: str = os.getenv("OUTPUT_FORMAT", "json")
    database_url: str = os.getenv("DATABASE_URL", "sqlite:///results.db")
    log_level: str = os.getenv("LOG_LEVEL", "INFO")

    @classmethod
    def from_yaml(cls, filepath: str):
        import yaml
        with open(filepath) as f:
            config = yaml.safe_load(f)
        return cls(**{k: v for k, v in config.items() if hasattr(cls, k)})

Rate Limiting

Implement token bucket rate limiting to respect target sites:

import asyncio
import time

class RateLimiter:
    def __init__(self, rate: float, burst: int = 1):
        self.rate = rate  # requests per second
        self.burst = burst
        self.tokens = burst
        self.last_refill = time.time()
        self._lock = asyncio.Lock()

    async def acquire(self):
        async with self._lock:
            now = time.time()
            elapsed = now - self.last_refill
            self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
            self.last_refill = now

            if self.tokens >= 1:
                self.tokens -= 1
                return
            else:
                wait_time = (1 - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                self.tokens = 0

Data Validation

Validate scraped data before storage:

from typing import Optional, List
import re

class DataValidator:
    @staticmethod
    def validate_url(url: str) -> bool:
        pattern = re.compile(
            r'^https?://'
            r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'
            r'localhost|'
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
            r'(?::\d+)?'
            r'(?:/?|[/?]\S+)$', re.IGNORECASE)
        return bool(pattern.match(url))

    @staticmethod
    def validate_price(price: Optional[float]) -> bool:
        if price is None:
            return True
        return 0 < price < 1_000_000

    @staticmethod
    def validate_text(text: str, min_length: int = 1, max_length: int = 10000) -> bool:
        return min_length <= len(text.strip()) <= max_length

    def validate_record(self, record: dict) -> tuple:
        errors = []
        if "url" in record and not self.validate_url(record["url"]):
            errors.append("invalid URL")
        if "price" in record and not self.validate_price(record.get("price")):
            errors.append("invalid price")
        if "title" in record and not self.validate_text(record.get("title", ""), 1, 500):
            errors.append("invalid title length")
        return len(errors) == 0, errors

Deployment

Running as a Service

# Using systemd
sudo cat > /etc/systemd/system/scraper.service << EOF
[Unit]
Description=Web Scraping Service
After=network.target

[Service]
Type=simple
User=scraper
WorkingDir=/opt/scraper
ExecStart=/opt/scraper/venv/bin/python main.py
Restart=always
RestartSec=10
Environment=PROXY_URL=http://user:pass@proxy:8080
Environment=LOG_LEVEL=INFO

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl enable scraper
sudo systemctl start scraper

Docker Deployment

FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

HEALTHCHECK --interval=30s --timeout=10s CMD python -c "import requests; requests.get('http://localhost:8000/health')"

CMD ["python", "main.py"]

Testing

Write tests for your scraping tools:

import pytest
import asyncio

class TestProxyIntegration:
    def test_proxy_connectivity(self):
        import requests
        proxy = {"http": "http://user:pass@proxy:8080", "https": "http://user:pass@proxy:8080"}
        response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
        assert response.status_code == 200
        assert "origin" in response.json()

    def test_proxy_rotation(self):
        ips = set()
        for _ in range(5):
            import requests
            proxy = {"http": "http://user:pass@rotating-proxy:8080"}
            response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
            ips.add(response.json()["origin"])
        assert len(ips) > 1, "Proxy should rotate IPs"

    def test_data_validation(self):
        validator = DataValidator()
        valid, errors = validator.validate_record({
            "url": "https://example.com",
            "title": "Test Product",
            "price": 29.99,
        })
        assert valid
        assert len(errors) == 0

For proxy infrastructure guidance, see our proxy pool management guide and web scraping proxy overview.


Related Reading

Scroll to Top