Web Scraping ETL Pipeline with Apache Airflow
Apache Airflow turns ad-hoc scraping scripts into reliable, scheduled, monitored pipelines. Instead of running scrapers manually or with cron, Airflow orchestrates the entire ETL process: extract data from websites through proxies, transform it into clean formats, and load it into databases or data warehouses.
Why Airflow for Scraping
- Scheduling — run scrapers hourly, daily, or on custom schedules
- Dependency management — scrape pages in order (listings before details)
- Retry logic — automatic retries with configurable backoff
- Monitoring — web UI shows task status, logs, and execution history
- Alerting — email or Slack notifications on failure
- Parallelism — run multiple scraping tasks concurrently
Architecture
Airflow Scheduler → DAG → Task 1: Scrape listings
→ Task 2: Scrape details (depends on Task 1)
→ Task 3: Transform data
→ Task 4: Load to database
→ Task 5: Generate reportSetup
pip install apache-airflow apache-airflow-providers-postgres httpx selectolaxInitialize Airflow:
export AIRFLOW_HOME=~/airflow
airflow db init
airflow users create --role Admin --username admin --email admin@example.com \
--firstname Admin --lastname User --password admin
airflow webserver -p 8080 &
airflow scheduler &DAG Definition
# dags/scraping_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from airflow.models import Variable
from datetime import datetime, timedelta
import json
default_args = {
'owner': 'scraping-team',
'depends_on_past': False,
'start_date': datetime(2026, 1, 1),
'email': ['alerts@example.com'],
'email_on_failure': True,
'retries': 3,
'retry_delay': timedelta(minutes=5),
'retry_exponential_backoff': True,
'max_retry_delay': timedelta(minutes=30),
}
dag = DAG(
'product_scraping_pipeline',
default_args=default_args,
description='Scrape product listings, extract details, load to database',
schedule_interval='0 6 * * *', # Daily at 6 AM
catchup=False,
max_active_runs=1,
tags=['scraping', 'etl'],
)Extract: Scraping Tasks
# dags/tasks/extract.py
import httpx
import asyncio
import json
import random
from selectolax.parser import HTMLParser
PROXIES = [
"http://user:pass@proxy1.example.com:8080",
"http://user:pass@proxy2.example.com:8080",
"http://user:pass@proxy3.example.com:8080",
]
async def scrape_listing_page(url: str, proxy: str) -> list:
async with httpx.AsyncClient(
proxy=proxy,
timeout=30,
follow_redirects=True,
) as client:
response = await client.get(url, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36'
})
tree = HTMLParser(response.text)
products = []
for item in tree.css('.product-card'):
title = item.css_first('.title')
link = item.css_first('a')
price = item.css_first('.price')
if title and link:
products.append({
'title': title.text(strip=True),
'url': link.attributes.get('href', ''),
'price': price.text(strip=True) if price else '',
})
return products
def scrape_listings(**context):
"""Airflow task: scrape product listing pages."""
base_url = Variable.get('scrape_target_url')
max_pages = int(Variable.get('scrape_max_pages', 10))
all_products = []
async def run():
for page in range(1, max_pages + 1):
url = f"{base_url}?page={page}"
proxy = random.choice(PROXIES)
try:
products = await scrape_listing_page(url, proxy)
all_products.extend(products)
print(f"Page {page}: {len(products)} products")
await asyncio.sleep(2) # Rate limiting
except Exception as e:
print(f"Page {page} failed: {e}")
asyncio.run(run())
# Push results to XCom for downstream tasks
context['ti'].xcom_push(key='listings', value=all_products)
print(f"Total products scraped: {len(all_products)}")
return len(all_products)
async def scrape_product_detail(url: str, proxy: str) -> dict:
async with httpx.AsyncClient(
proxy=proxy,
timeout=30,
) as client:
response = await client.get(url, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36'
})
tree = HTMLParser(response.text)
return {
'url': url,
'description': (tree.css_first('.description') or
type('', (), {'text': lambda s, **k: ''})()
).text(strip=True),
'specs': {
tag.css_first('.spec-name').text(strip=True):
tag.css_first('.spec-value').text(strip=True)
for tag in tree.css('.spec-row')
if tag.css_first('.spec-name') and tag.css_first('.spec-value')
},
'images': [
img.attributes.get('src', '')
for img in tree.css('.product-image img')
],
'rating': (tree.css_first('.rating') or
type('', (), {'text': lambda s, **k: ''})()
).text(strip=True),
}
def scrape_details(**context):
"""Airflow task: scrape product detail pages."""
listings = context['ti'].xcom_pull(key='listings', task_ids='scrape_listings')
details = []
async def run():
semaphore = asyncio.Semaphore(5)
async def fetch(product):
async with semaphore:
proxy = random.choice(PROXIES)
try:
detail = await scrape_product_detail(product['url'], proxy)
detail['title'] = product['title']
detail['list_price'] = product['price']
details.append(detail)
except Exception as e:
print(f"Failed {product['url']}: {e}")
await asyncio.sleep(1)
tasks = [fetch(p) for p in listings]
await asyncio.gather(*tasks)
asyncio.run(run())
context['ti'].xcom_push(key='details', value=details)
print(f"Scraped {len(details)} product details")Transform: Data Cleaning
# dags/tasks/transform.py
import re
from datetime import datetime
def transform_data(**context):
"""Clean and normalize scraped data."""
details = context['ti'].xcom_pull(key='details', task_ids='scrape_details')
transformed = []
for item in details:
cleaned = {
'title': item['title'].strip(),
'url': item['url'],
'description': item.get('description', ''),
'price_raw': item.get('list_price', ''),
'price_numeric': extract_price(item.get('list_price', '')),
'rating': extract_rating(item.get('rating', '')),
'image_count': len(item.get('images', [])),
'primary_image': item.get('images', [None])[0],
'specs': item.get('specs', {}),
'scraped_at': datetime.utcnow().isoformat(),
}
# Validation
if cleaned['title'] and cleaned['price_numeric'] is not None:
transformed.append(cleaned)
else:
print(f"Skipped invalid item: {item.get('url', 'unknown')}")
context['ti'].xcom_push(key='transformed', value=transformed)
print(f"Transformed {len(transformed)} items (dropped {len(details) - len(transformed)})")
def extract_price(price_str: str):
match = re.search(r'[\d,]+\.?\d*', price_str.replace(',', ''))
return float(match.group()) if match else None
def extract_rating(rating_str: str):
match = re.search(r'[\d.]+', rating_str)
return float(match.group()) if match else NoneLoad: Database Insertion
# dags/tasks/load.py
import json
import psycopg2
from psycopg2.extras import execute_values
def load_to_database(**context):
"""Load transformed data into PostgreSQL."""
data = context['ti'].xcom_pull(key='transformed', task_ids='transform_data')
conn = psycopg2.connect(
host='localhost',
database='scraping_db',
user='scraper',
password='password',
)
try:
cursor = conn.cursor()
values = [
(
item['title'],
item['url'],
item['description'],
item['price_numeric'],
item['rating'],
item['primary_image'],
json.dumps(item['specs']),
item['scraped_at'],
)
for item in data
]
execute_values(
cursor,
"""
INSERT INTO products
(title, url, description, price, rating, image_url, specs, scraped_at)
VALUES %s
ON CONFLICT (url) DO UPDATE SET
price = EXCLUDED.price,
rating = EXCLUDED.rating,
scraped_at = EXCLUDED.scraped_at
""",
values,
)
conn.commit()
print(f"Loaded {len(data)} products to database")
finally:
conn.close()Wire Up the DAG
# Back in dags/scraping_pipeline.py
from tasks.extract import scrape_listings, scrape_details
from tasks.transform import transform_data
from tasks.load import load_to_database
task_scrape_listings = PythonOperator(
task_id='scrape_listings',
python_callable=scrape_listings,
dag=dag,
)
task_scrape_details = PythonOperator(
task_id='scrape_details',
python_callable=scrape_details,
dag=dag,
)
task_transform = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
dag=dag,
)
task_load = PythonOperator(
task_id='load_to_database',
python_callable=load_to_database,
dag=dag,
)
# Define dependencies
task_scrape_listings >> task_scrape_details >> task_transform >> task_loadMonitoring and Alerts
Configure Slack alerts for pipeline failures:
from airflow.providers.slack.notifications.slack_notifier import SlackNotifier
dag = DAG(
'product_scraping_pipeline',
default_args=default_args,
on_failure_callback=SlackNotifier(
slack_conn_id='slack_webhook',
text='Scraping pipeline failed: {{ task_instance.task_id }}',
),
)Internal Links
- Building a Distributed Scraping System with Redis — distributed scraping
- Creating a Web Scraping Dashboard with Grafana — monitor scraping metrics
- Building a Rate-Limited Scraper with Asyncio — rate limiting in scrapers
- Creating a Scraping API with FastAPI — API-first scraping
- Web Scraping Python Guide — scraping fundamentals
FAQ
Is Airflow overkill for simple scraping jobs?
For a single scraper running once a day, cron is simpler. Airflow becomes valuable when you have multiple scrapers with dependencies, need retry logic, want monitoring through a web UI, or need to coordinate scraping with downstream data processing.
How do I handle large datasets in XCom?
XCom stores data in Airflow’s metadata database and has size limits. For large datasets, write results to files (S3, GCS, local disk) and pass the file path through XCom instead of the data itself. Use Airflow’s TaskFlow API for cleaner data passing.
What happens if a scraping task fails mid-run?
Airflow retries the entire task. If a task scrapes 100 pages and fails on page 50, the retry starts from page 1. To avoid re-scraping, implement checkpointing — save progress to a file or database, and resume from the last successful page on retry.
How many concurrent scraping tasks can Airflow run?
This depends on your Airflow executor. The LocalExecutor runs tasks in parallel processes on one machine (default 32 parallel tasks). The CeleryExecutor distributes tasks across multiple worker machines for higher concurrency.
Can I trigger scraping pipelines manually?
Yes. The Airflow web UI has a “Trigger DAG” button. You can also trigger via the CLI (airflow dags trigger) or the REST API. Pass runtime parameters to customize the scraping target or scope for ad-hoc runs.
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
Related Reading
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)