Build a Scraping Pipeline with Apache Airflow

Build a Scraping Pipeline with Apache Airflow

Apache Airflow transforms ad-hoc scraping scripts into reliable, scheduled data pipelines. Instead of running scrapers manually or with simple cron jobs, Airflow provides dependency management, retry logic, monitoring, and alerting — everything needed for production-grade web scraping operations.

Why Airflow for Scraping?

FeatureCron JobAirflow
SchedulingBasicAdvanced (cron + catchup)
DependenciesNoneFull DAG support
RetriesManualAutomatic with backoff
MonitoringLog filesWeb UI dashboard
AlertingDIYBuilt-in email/Slack
ParallelismManualConfigurable
HistoryNoneFull execution history

Sample DAG: E-Commerce Price Monitoring

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
import requests
import json

default_args = {
    "owner": "scraping-team",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "execution_timeout": timedelta(hours=1),
}

dag = DAG(
    "ecommerce_price_monitor",
    default_args=default_args,
    description="Scrape product prices daily",
    schedule_interval="0 6 * * *",  # 6 AM daily
    start_date=days_ago(1),
    catchup=False,
    tags=["scraping", "ecommerce"],
)

def scrape_prices(**context):
    """Scrape product prices through proxy."""
    proxy = {
        "http": "http://user:pass@proxy.example.com:8080",
        "https": "http://user:pass@proxy.example.com:8080",
    }
    products = load_product_urls()
    results = []

    for url in products:
        try:
            response = requests.get(url, proxies=proxy, timeout=15)
            price = extract_price(response.text)
            results.append({"url": url, "price": price, "timestamp": context["ts"]})
        except Exception as e:
            results.append({"url": url, "error": str(e)})

    # Push to XCom for downstream tasks
    context["ti"].xcom_push(key="prices", value=results)
    return len(results)

def validate_data(**context):
    """Validate scraped data quality."""
    prices = context["ti"].xcom_pull(key="prices", task_ids="scrape")
    valid = [p for p in prices if "price" in p and p["price"] > 0]
    invalid = len(prices) - len(valid)

    if invalid / len(prices) > 0.2:
        raise ValueError(f"Too many invalid prices: {invalid}/{len(prices)}")

    context["ti"].xcom_push(key="valid_prices", value=valid)

def store_results(**context):
    """Store validated prices in database."""
    prices = context["ti"].xcom_pull(key="valid_prices", task_ids="validate")
    # Store in your database
    save_to_database(prices)

def send_alerts(**context):
    """Send price change alerts."""
    prices = context["ti"].xcom_pull(key="valid_prices", task_ids="validate")
    changes = detect_price_changes(prices)
    if changes:
        send_notification(changes)

scrape_task = PythonOperator(
    task_id="scrape",
    python_callable=scrape_prices,
    dag=dag,
)

validate_task = PythonOperator(
    task_id="validate",
    python_callable=validate_data,
    dag=dag,
)

store_task = PythonOperator(
    task_id="store",
    python_callable=store_results,
    dag=dag,
)

alert_task = PythonOperator(
    task_id="alert",
    python_callable=send_alerts,
    dag=dag,
)

scrape_task >> validate_task >> [store_task, alert_task]

Docker Compose for Airflow + Scraping

version: '3.8'
services:
  airflow-webserver:
    image: apache/airflow:2.8.1
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    ports:
      - "8080:8080"
    volumes:
      - ./dags:/opt/airflow/dags
      - ./plugins:/opt/airflow/plugins
    depends_on:
      - postgres

  airflow-scheduler:
    image: apache/airflow:2.8.1
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    volumes:
      - ./dags:/opt/airflow/dags
    depends_on:
      - postgres

  postgres:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow

Best Practices

  1. Separate scraping from processing — Keep scraping tasks independent from data transformation tasks
  2. Use XCom sparingly — For large datasets, write to files or databases instead of XCom
  3. Set appropriate timeouts — Scraping tasks should have generous timeouts for proxy retries
  4. Monitor task duration — Set up alerts for tasks that run longer than expected
  5. Use connection pools — Store proxy credentials in Airflow Connections, not hardcoded

FAQ

Is Airflow overkill for simple scraping?

For a single scraper running daily, cron is simpler. Airflow shines when you have multiple interdependent scrapers, need retry logic, or want a monitoring dashboard. If you have 5+ scraping jobs, Airflow pays for itself in operational simplicity.

Can Airflow handle concurrent scraping?

Yes. Use the CeleryExecutor or KubernetesExecutor for parallel task execution across multiple workers. Each worker can run scraping tasks independently with its own proxy configuration.

How do I handle proxy rotation in Airflow?

Create a custom Airflow hook or use environment variables to configure proxy pools. The scraping tasks can use a proxy rotator internally while Airflow manages the scheduling and orchestration.

Implementation Best Practices

Error Handling and Retry Logic

Production scraping tools must handle failures gracefully. Implement exponential backoff with jitter:

import random
import time

def retry_with_backoff(func, max_retries=3, base_delay=1):
    for attempt in range(max_retries):
        try:
            return func()
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
            print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.1f}s")
            time.sleep(delay)

Logging Configuration

Set up structured logging for debugging and monitoring:

import logging
import json
from datetime import datetime

class JSONFormatter(logging.Formatter):
    def format(self, record):
        log_entry = {
            "timestamp": datetime.utcnow().isoformat(),
            "level": record.levelname,
            "message": record.getMessage(),
            "module": record.module,
            "function": record.funcName,
        }
        if record.exc_info:
            log_entry["exception"] = self.formatException(record.exc_info)
        return json.dumps(log_entry)

# Setup
handler = logging.StreamHandler()
handler.setFormatter(JSONFormatter())
logger = logging.getLogger("scraper")
logger.addHandler(handler)
logger.setLevel(logging.INFO)

Configuration Management

Use environment variables and config files for flexibility:

import os
from dataclasses import dataclass

@dataclass
class ScraperConfig:
    proxy_url: str = os.getenv("PROXY_URL", "")
    concurrent_workers: int = int(os.getenv("CONCURRENT_WORKERS", "10"))
    request_timeout: int = int(os.getenv("REQUEST_TIMEOUT", "15"))
    max_retries: int = int(os.getenv("MAX_RETRIES", "3"))
    rate_limit_per_second: float = float(os.getenv("RATE_LIMIT", "5"))
    output_format: str = os.getenv("OUTPUT_FORMAT", "json")
    database_url: str = os.getenv("DATABASE_URL", "sqlite:///results.db")
    log_level: str = os.getenv("LOG_LEVEL", "INFO")

    @classmethod
    def from_yaml(cls, filepath: str):
        import yaml
        with open(filepath) as f:
            config = yaml.safe_load(f)
        return cls(**{k: v for k, v in config.items() if hasattr(cls, k)})

Rate Limiting

Implement token bucket rate limiting to respect target sites:

import asyncio
import time

class RateLimiter:
    def __init__(self, rate: float, burst: int = 1):
        self.rate = rate  # requests per second
        self.burst = burst
        self.tokens = burst
        self.last_refill = time.time()
        self._lock = asyncio.Lock()

    async def acquire(self):
        async with self._lock:
            now = time.time()
            elapsed = now - self.last_refill
            self.tokens = min(self.burst, self.tokens + elapsed * self.rate)
            self.last_refill = now

            if self.tokens >= 1:
                self.tokens -= 1
                return
            else:
                wait_time = (1 - self.tokens) / self.rate
                await asyncio.sleep(wait_time)
                self.tokens = 0

Data Validation

Validate scraped data before storage:

from typing import Optional, List
import re

class DataValidator:
    @staticmethod
    def validate_url(url: str) -> bool:
        pattern = re.compile(
            r'^https?://'
            r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|'
            r'localhost|'
            r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})'
            r'(?::\d+)?'
            r'(?:/?|[/?]\S+)$', re.IGNORECASE)
        return bool(pattern.match(url))

    @staticmethod
    def validate_price(price: Optional[float]) -> bool:
        if price is None:
            return True
        return 0 < price < 1_000_000

    @staticmethod
    def validate_text(text: str, min_length: int = 1, max_length: int = 10000) -> bool:
        return min_length <= len(text.strip()) <= max_length

    def validate_record(self, record: dict) -> tuple:
        errors = []
        if "url" in record and not self.validate_url(record["url"]):
            errors.append("invalid URL")
        if "price" in record and not self.validate_price(record.get("price")):
            errors.append("invalid price")
        if "title" in record and not self.validate_text(record.get("title", ""), 1, 500):
            errors.append("invalid title length")
        return len(errors) == 0, errors

Deployment

Running as a Service

# Using systemd
sudo cat > /etc/systemd/system/scraper.service << EOF
[Unit]
Description=Web Scraping Service
After=network.target

[Service]
Type=simple
User=scraper
WorkingDir=/opt/scraper
ExecStart=/opt/scraper/venv/bin/python main.py
Restart=always
RestartSec=10
Environment=PROXY_URL=http://user:pass@proxy:8080
Environment=LOG_LEVEL=INFO

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl enable scraper
sudo systemctl start scraper

Docker Deployment

FROM python:3.12-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

HEALTHCHECK --interval=30s --timeout=10s CMD python -c "import requests; requests.get('http://localhost:8000/health')"

CMD ["python", "main.py"]

Testing

Write tests for your scraping tools:

import pytest
import asyncio

class TestProxyIntegration:
    def test_proxy_connectivity(self):
        import requests
        proxy = {"http": "http://user:pass@proxy:8080", "https": "http://user:pass@proxy:8080"}
        response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
        assert response.status_code == 200
        assert "origin" in response.json()

    def test_proxy_rotation(self):
        ips = set()
        for _ in range(5):
            import requests
            proxy = {"http": "http://user:pass@rotating-proxy:8080"}
            response = requests.get("https://httpbin.org/ip", proxies=proxy, timeout=10)
            ips.add(response.json()["origin"])
        assert len(ips) > 1, "Proxy should rotate IPs"

    def test_data_validation(self):
        validator = DataValidator()
        valid, errors = validator.validate_record({
            "url": "https://example.com",
            "title": "Test Product",
            "price": 29.99,
        })
        assert valid
        assert len(errors) == 0

For proxy infrastructure guidance, see our proxy pool management guide and web scraping proxy overview.


Related Reading

Scroll to Top