Web Scraping ETL Pipeline with Apache Airflow

Web Scraping ETL Pipeline with Apache Airflow

Apache Airflow turns ad-hoc scraping scripts into reliable, scheduled, monitored pipelines. Instead of running scrapers manually or with cron, Airflow orchestrates the entire ETL process: extract data from websites through proxies, transform it into clean formats, and load it into databases or data warehouses.

Why Airflow for Scraping

  • Scheduling — run scrapers hourly, daily, or on custom schedules
  • Dependency management — scrape pages in order (listings before details)
  • Retry logic — automatic retries with configurable backoff
  • Monitoring — web UI shows task status, logs, and execution history
  • Alerting — email or Slack notifications on failure
  • Parallelism — run multiple scraping tasks concurrently

Architecture

Airflow Scheduler → DAG → Task 1: Scrape listings
                         → Task 2: Scrape details (depends on Task 1)
                         → Task 3: Transform data
                         → Task 4: Load to database
                         → Task 5: Generate report

Setup

pip install apache-airflow apache-airflow-providers-postgres httpx selectolax

Initialize Airflow:

export AIRFLOW_HOME=~/airflow
airflow db init
airflow users create --role Admin --username admin --email admin@example.com \
    --firstname Admin --lastname User --password admin
airflow webserver -p 8080 &
airflow scheduler &

DAG Definition

# dags/scraping_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.models import Variable
from datetime import datetime, timedelta
import json

default_args = {
    'owner': 'scraping-team',
    'depends_on_past': False,
    'start_date': datetime(2026, 1, 1),
    'email': ['alerts@example.com'],
    'email_on_failure': True,
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'retry_exponential_backoff': True,
    'max_retry_delay': timedelta(minutes=30),
}

dag = DAG(
    'product_scraping_pipeline',
    default_args=default_args,
    description='Scrape product listings, extract details, load to database',
    schedule_interval='0 6 * * *',  # Daily at 6 AM
    catchup=False,
    max_active_runs=1,
    tags=['scraping', 'etl'],
)

Extract: Scraping Tasks

# dags/tasks/extract.py
import httpx
import asyncio
import json
import random
from selectolax.parser import HTMLParser

PROXIES = [
    "http://user:pass@proxy1.example.com:8080",
    "http://user:pass@proxy2.example.com:8080",
    "http://user:pass@proxy3.example.com:8080",
]

async def scrape_listing_page(url: str, proxy: str) -> list:
    async with httpx.AsyncClient(
        proxy=proxy,
        timeout=30,
        follow_redirects=True,
    ) as client:
        response = await client.get(url, headers={
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                         'AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36'
        })

    tree = HTMLParser(response.text)
    products = []

    for item in tree.css('.product-card'):
        title = item.css_first('.title')
        link = item.css_first('a')
        price = item.css_first('.price')

        if title and link:
            products.append({
                'title': title.text(strip=True),
                'url': link.attributes.get('href', ''),
                'price': price.text(strip=True) if price else '',
            })

    return products


def scrape_listings(**context):
    """Airflow task: scrape product listing pages."""
    base_url = Variable.get('scrape_target_url')
    max_pages = int(Variable.get('scrape_max_pages', 10))

    all_products = []

    async def run():
        for page in range(1, max_pages + 1):
            url = f"{base_url}?page={page}"
            proxy = random.choice(PROXIES)

            try:
                products = await scrape_listing_page(url, proxy)
                all_products.extend(products)
                print(f"Page {page}: {len(products)} products")
                await asyncio.sleep(2)  # Rate limiting
            except Exception as e:
                print(f"Page {page} failed: {e}")

    asyncio.run(run())

    # Push results to XCom for downstream tasks
    context['ti'].xcom_push(key='listings', value=all_products)
    print(f"Total products scraped: {len(all_products)}")
    return len(all_products)


async def scrape_product_detail(url: str, proxy: str) -> dict:
    async with httpx.AsyncClient(
        proxy=proxy,
        timeout=30,
    ) as client:
        response = await client.get(url, headers={
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                         'AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36'
        })

    tree = HTMLParser(response.text)

    return {
        'url': url,
        'description': (tree.css_first('.description') or
                        type('', (), {'text': lambda s, **k: ''})()
                        ).text(strip=True),
        'specs': {
            tag.css_first('.spec-name').text(strip=True):
            tag.css_first('.spec-value').text(strip=True)
            for tag in tree.css('.spec-row')
            if tag.css_first('.spec-name') and tag.css_first('.spec-value')
        },
        'images': [
            img.attributes.get('src', '')
            for img in tree.css('.product-image img')
        ],
        'rating': (tree.css_first('.rating') or
                   type('', (), {'text': lambda s, **k: ''})()
                   ).text(strip=True),
    }


def scrape_details(**context):
    """Airflow task: scrape product detail pages."""
    listings = context['ti'].xcom_pull(key='listings', task_ids='scrape_listings')

    details = []

    async def run():
        semaphore = asyncio.Semaphore(5)

        async def fetch(product):
            async with semaphore:
                proxy = random.choice(PROXIES)
                try:
                    detail = await scrape_product_detail(product['url'], proxy)
                    detail['title'] = product['title']
                    detail['list_price'] = product['price']
                    details.append(detail)
                except Exception as e:
                    print(f"Failed {product['url']}: {e}")
                await asyncio.sleep(1)

        tasks = [fetch(p) for p in listings]
        await asyncio.gather(*tasks)

    asyncio.run(run())

    context['ti'].xcom_push(key='details', value=details)
    print(f"Scraped {len(details)} product details")

Transform: Data Cleaning

# dags/tasks/transform.py
import re
from datetime import datetime

def transform_data(**context):
    """Clean and normalize scraped data."""
    details = context['ti'].xcom_pull(key='details', task_ids='scrape_details')

    transformed = []
    for item in details:
        cleaned = {
            'title': item['title'].strip(),
            'url': item['url'],
            'description': item.get('description', ''),
            'price_raw': item.get('list_price', ''),
            'price_numeric': extract_price(item.get('list_price', '')),
            'rating': extract_rating(item.get('rating', '')),
            'image_count': len(item.get('images', [])),
            'primary_image': item.get('images', [None])[0],
            'specs': item.get('specs', {}),
            'scraped_at': datetime.utcnow().isoformat(),
        }

        # Validation
        if cleaned['title'] and cleaned['price_numeric'] is not None:
            transformed.append(cleaned)
        else:
            print(f"Skipped invalid item: {item.get('url', 'unknown')}")

    context['ti'].xcom_push(key='transformed', value=transformed)
    print(f"Transformed {len(transformed)} items (dropped {len(details) - len(transformed)})")


def extract_price(price_str: str):
    match = re.search(r'[\d,]+\.?\d*', price_str.replace(',', ''))
    return float(match.group()) if match else None


def extract_rating(rating_str: str):
    match = re.search(r'[\d.]+', rating_str)
    return float(match.group()) if match else None

Load: Database Insertion

# dags/tasks/load.py
import json
import psycopg2
from psycopg2.extras import execute_values

def load_to_database(**context):
    """Load transformed data into PostgreSQL."""
    data = context['ti'].xcom_pull(key='transformed', task_ids='transform_data')

    conn = psycopg2.connect(
        host='localhost',
        database='scraping_db',
        user='scraper',
        password='password',
    )

    try:
        cursor = conn.cursor()

        values = [
            (
                item['title'],
                item['url'],
                item['description'],
                item['price_numeric'],
                item['rating'],
                item['primary_image'],
                json.dumps(item['specs']),
                item['scraped_at'],
            )
            for item in data
        ]

        execute_values(
            cursor,
            """
            INSERT INTO products
            (title, url, description, price, rating, image_url, specs, scraped_at)
            VALUES %s
            ON CONFLICT (url) DO UPDATE SET
                price = EXCLUDED.price,
                rating = EXCLUDED.rating,
                scraped_at = EXCLUDED.scraped_at
            """,
            values,
        )

        conn.commit()
        print(f"Loaded {len(data)} products to database")

    finally:
        conn.close()

Wire Up the DAG

# Back in dags/scraping_pipeline.py

from tasks.extract import scrape_listings, scrape_details
from tasks.transform import transform_data
from tasks.load import load_to_database

task_scrape_listings = PythonOperator(
    task_id='scrape_listings',
    python_callable=scrape_listings,
    dag=dag,
)

task_scrape_details = PythonOperator(
    task_id='scrape_details',
    python_callable=scrape_details,
    dag=dag,
)

task_transform = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    dag=dag,
)

task_load = PythonOperator(
    task_id='load_to_database',
    python_callable=load_to_database,
    dag=dag,
)

# Define dependencies
task_scrape_listings >> task_scrape_details >> task_transform >> task_load

Monitoring and Alerts

Configure Slack alerts for pipeline failures:

from airflow.providers.slack.notifications.slack_notifier import SlackNotifier

dag = DAG(
    'product_scraping_pipeline',
    default_args=default_args,
    on_failure_callback=SlackNotifier(
        slack_conn_id='slack_webhook',
        text='Scraping pipeline failed: {{ task_instance.task_id }}',
    ),
)

Internal Links

FAQ

Is Airflow overkill for simple scraping jobs?

For a single scraper running once a day, cron is simpler. Airflow becomes valuable when you have multiple scrapers with dependencies, need retry logic, want monitoring through a web UI, or need to coordinate scraping with downstream data processing.

How do I handle large datasets in XCom?

XCom stores data in Airflow’s metadata database and has size limits. For large datasets, write results to files (S3, GCS, local disk) and pass the file path through XCom instead of the data itself. Use Airflow’s TaskFlow API for cleaner data passing.

What happens if a scraping task fails mid-run?

Airflow retries the entire task. If a task scrapes 100 pages and fails on page 50, the retry starts from page 1. To avoid re-scraping, implement checkpointing — save progress to a file or database, and resume from the last successful page on retry.

How many concurrent scraping tasks can Airflow run?

This depends on your Airflow executor. The LocalExecutor runs tasks in parallel processes on one machine (default 32 parallel tasks). The CeleryExecutor distributes tasks across multiple worker machines for higher concurrency.

Can I trigger scraping pipelines manually?

Yes. The Airflow web UI has a “Trigger DAG” button. You can also trigger via the CLI (airflow dags trigger) or the REST API. Pass runtime parameters to customize the scraping target or scope for ad-hoc runs.


Related Reading

Scroll to Top