Building scraping pipelines with Prefect 3 in 2026
Building scraping pipelines with Prefect 3 is the right answer when your scraping has grown beyond a single Scrapy project but does not yet need full Apache Airflow’s complexity. Prefect 3 (released 2024) is the third major version of Prefect’s orchestration platform, and it brought meaningful changes from Prefect 2: simpler async support, faster task execution, work pools that decouple scheduling from execution, and a redesigned dashboard. For scraping specifically, Prefect 3 fits the pattern of “fetch URLs, parse them, store results, retry failures, schedule it all” with much less ceremony than Airflow.
This guide covers Prefect 3 for scraping pipelines in 2026: flows and tasks, work pools and workers, retries and concurrency control, scheduling, and a complete production example that fetches URLs, runs them through Playwright when needed, parses results, and stores to Postgres. By the end you will have a working pipeline pattern you can adapt to your specific targets.
Why Prefect for scraping
Prefect’s strengths for scraper pipelines:
- Python-native: flows and tasks are decorated Python functions, no DAG syntax to learn
- Async-friendly: tasks can be
async defand run concurrently - Built-in retries: configurable per task with exponential backoff
- Work pools and workers: decouple “what to run” from “where to run it”
- Cron, interval, and event-based scheduling: flexible
- Observability: dashboard, logs, run history all built in
- Cloud or self-hosted: free OSS or hosted Prefect Cloud
- Mature: 6+ years in production, rich ecosystem
For Prefect’s official documentation, see docs.prefect.io.
Where Prefect does not lead
- For very high frequency tasks (1000+ tasks per second), Prefect adds latency
- For data-engineering-heavy workloads with many integrations, Dagster has more native connectors
- For purely scheduled cron jobs without orchestration, simpler tools (cron itself, GitHub Actions) suffice
For scraping, Prefect’s mid-frequency, retry-rich, observable model fits well.
For Dagster comparison, see building scraping pipelines with Dagster.
Installing Prefect 3
pip install prefect>=3.0
prefect --version # 3.x
Start the local Prefect server:
prefect server start
# Dashboard at http://localhost:4200
For production, use Prefect Cloud (managed) or self-host the server with PostgreSQL.
A first scraping flow
# scraper_flow.py
from prefect import flow, task
import httpx
@task(retries=3, retry_delay_seconds=10)
async def fetch_url(url: str) -> dict:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url, headers={
"User-Agent": "Mozilla/5.0 ...",
})
resp.raise_for_status()
return {"url": url, "status": resp.status_code, "html": resp.text}
@task
def parse_titles(html: str) -> list[str]:
from selectolax.parser import HTMLParser
tree = HTMLParser(html)
return [n.text(strip=True) for n in tree.css("h2.title")]
@flow(log_prints=True)
async def scrape_titles(urls: list[str]):
for url in urls:
result = await fetch_url(url)
titles = parse_titles(result["html"])
print(f"{url}: {len(titles)} titles")
return "Done"
if __name__ == "__main__":
import asyncio
asyncio.run(scrape_titles([
"https://example.com/page1",
"https://example.com/page2",
]))
Run it:
python scraper_flow.py
The flow appears in the Prefect dashboard with full task history, retries, and logs.
Concurrency: parallel task execution
For parallel scraping, use Prefect’s task mapping or asyncio.gather:
from prefect import flow, task
import httpx
import asyncio
@task(retries=3, retry_delay_seconds=10)
async def fetch_url(url: str) -> dict:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url)
return {"url": url, "status": resp.status_code, "html": resp.text}
@flow(log_prints=True)
async def scrape_parallel(urls: list[str], concurrency: int = 10):
semaphore = asyncio.Semaphore(concurrency)
async def fetch_with_semaphore(url):
async with semaphore:
return await fetch_url(url)
results = await asyncio.gather(
*[fetch_with_semaphore(url) for url in urls],
return_exceptions=True,
)
return results
Or using Prefect’s .map() for static parallelism:
@flow
def scrape_parallel_static(urls: list[str]):
results = fetch_url.map(urls)
return results
fetch_url.map(urls) schedules one task per URL and runs them via Prefect’s task runner. The default ConcurrentTaskRunner runs tasks in threads or processes; for async tasks use the ThreadPoolTaskRunner.
Work pools and workers
Prefect 3 splits the runtime model into:
- Flows / deployments: the work to run
- Work pools: configuration for runtime environments (Docker, Kubernetes, process)
- Workers: processes that pick up flow runs from work pools
This decouples scheduling from execution. You can have one Prefect server scheduling flow runs, with workers in different environments (laptop, Kubernetes cluster, EC2) picking up runs from the same work pool.
Create a work pool:
prefect work-pool create my-scraper-pool --type process
prefect worker start --pool my-scraper-pool
Deploy a flow to the pool:
from prefect import flow, serve
@flow
async def scrape_titles(urls: list[str]):
# ...
pass
if __name__ == "__main__":
scrape_titles.serve(
name="scraper-deployment",
work_pool_name="my-scraper-pool",
cron="0 * * * *", # hourly
)
Now the flow runs hourly on whatever worker picks it up.
Retries and error handling
Per-task retry config is the most common pattern:
@task(
retries=5,
retry_delay_seconds=[10, 30, 60, 120, 300], # exponential backoff
)
async def fetch_url(url: str) -> dict:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url)
if resp.status_code >= 500:
raise httpx.HTTPStatusError(
f"Server error: {resp.status_code}",
request=resp.request, response=resp,
)
if resp.status_code == 429:
# Wait longer for rate limits
await asyncio.sleep(60)
raise httpx.HTTPStatusError("Rate limited", request=resp.request, response=resp)
return {"url": url, "status": resp.status_code, "html": resp.text}
For fine-grained retry policies (only retry on certain exceptions, not others):
from prefect.tasks import task_input_hash
@task(
retries=5,
retry_delay_seconds=30,
retry_condition_fn=lambda task, run_state: (
run_state.is_failed() and "5" in str(run_state.message)
),
)
async def fetch_url(url: str) -> dict:
# ...
pass
This retries only on 5xx errors, not on 4xx (which usually means permanent failure).
Persistent state: caching task results
For tasks that should not re-run on the same input (deduplication), use Prefect’s caching:
from prefect.tasks import task_input_hash
from datetime import timedelta
@task(
cache_key_fn=task_input_hash,
cache_expiration=timedelta(hours=24),
)
async def fetch_url(url: str) -> dict:
# ...
pass
Now if you call fetch_url("https://example.com") twice within 24 hours, the second call returns the cached result without running. Useful when re-running flows during development.
Browser scraping with Playwright in Prefect
For pages requiring JavaScript:
from prefect import flow, task
from playwright.async_api import async_playwright
@task
async def fetch_with_browser(url: str) -> dict:
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
page = await browser.new_page()
await page.goto(url, wait_until="networkidle")
html = await page.content()
await browser.close()
return {"url": url, "html": html}
@flow
async def scrape_dynamic_pages(urls: list[str]):
results = []
for url in urls:
results.append(await fetch_with_browser(url))
return results
Each task call spawns a browser, which is expensive. For high-volume browser scraping, batch URLs per browser:
@task
async def fetch_batch_with_browser(urls: list[str]) -> list[dict]:
results = []
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
for url in urls:
page = await browser.new_page()
try:
await page.goto(url, timeout=30000)
html = await page.content()
results.append({"url": url, "html": html})
except Exception as e:
results.append({"url": url, "error": str(e)})
finally:
await page.close()
await browser.close()
return results
This amortizes browser startup over many URLs.
Scheduling
Prefect supports cron, interval, and event-based scheduling:
# Cron
scrape_flow.serve(
name="hourly-scraper",
cron="0 * * * *",
work_pool_name="my-pool",
)
# Interval
scrape_flow.serve(
name="every-15min",
interval=timedelta(minutes=15),
work_pool_name="my-pool",
)
# Event-based (triggered by webhooks, file events, etc.)
from prefect.events import DeploymentEventTrigger
scrape_flow.serve(
name="on-event",
triggers=[
DeploymentEventTrigger(
expect={"my.custom.event"},
parameters={"urls": "{{ event.payload.urls }}"},
)
],
)
Event-based scheduling is useful for “scrape this URL when a webhook fires” patterns.
A complete production example
End-to-end scraping pipeline:
# pipeline.py
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
import asyncio
import httpx
from selectolax.parser import HTMLParser
import asyncpg
import json
DB_DSN = "postgresql://user:pass@localhost/scraper"
@task(retries=3, retry_delay_seconds=[10, 30, 60])
async def fetch_url(url: str) -> dict:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url, headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ...",
})
resp.raise_for_status()
return {"url": url, "status": resp.status_code, "html": resp.text}
@task
def parse_products(html: str) -> list[dict]:
tree = HTMLParser(html)
products = []
for el in tree.css("article.product"):
products.append({
"title": el.css_first("h2").text(strip=True) if el.css_first("h2") else None,
"price": el.css_first(".price").text(strip=True) if el.css_first(".price") else None,
"url": el.css_first("a").attributes.get("href") if el.css_first("a") else None,
})
return products
@task(retries=3)
async def store_products(products: list[dict]):
logger = get_run_logger()
if not products:
return
conn = await asyncpg.connect(DB_DSN)
try:
await conn.executemany(
"""INSERT INTO products (title, price, url)
VALUES ($1, $2, $3)
ON CONFLICT (url) DO UPDATE SET
title = EXCLUDED.title,
price = EXCLUDED.price""",
[(p["title"], p["price"], p["url"]) for p in products],
)
logger.info(f"Stored {len(products)} products")
finally:
await conn.close()
@flow(log_prints=True)
async def scrape_pipeline(urls: list[str], concurrency: int = 5):
semaphore = asyncio.Semaphore(concurrency)
async def process_one(url: str):
async with semaphore:
try:
result = await fetch_url(url)
products = parse_products(result["html"])
await store_products(products)
return {"url": url, "ok": True, "count": len(products)}
except Exception as e:
return {"url": url, "ok": False, "error": str(e)}
results = await asyncio.gather(*[process_one(url) for url in urls])
success = [r for r in results if r["ok"]]
failed = [r for r in results if not r["ok"]]
print(f"Scraped {len(success)} successfully, {len(failed)} failed")
return {"success": len(success), "failed": len(failed)}
if __name__ == "__main__":
URLS = [f"https://example.com/products?page={i}" for i in range(1, 21)]
asyncio.run(scrape_pipeline(URLS))
Deploy with cron:
if __name__ == "__main__":
URLS = [f"https://example.com/products?page={i}" for i in range(1, 21)]
scrape_pipeline.serve(
name="hourly-products",
cron="0 * * * *",
parameters={"urls": URLS, "concurrency": 5},
work_pool_name="my-pool",
)
Comparison: Prefect vs other orchestrators
| feature | Prefect 3 | Dagster | Airflow 2.x |
|---|---|---|---|
| Python-native | yes | yes | DAG syntax |
| Async support | excellent | good | poor |
| Setup complexity | low | medium | high |
| Observability | great dashboard | great dashboard | OK dashboard |
| Hosted option | Prefect Cloud | Dagster Cloud | many vendors |
| Maturity | very good | very good | excellent |
| Best for | mid-complexity workflows | data engineering | enterprise complexity |
For scraping pipelines specifically, Prefect’s async support and simpler API give it an edge. Dagster’s data-engineering features (asset-based, type system, integrations) are overkill for pure scraping but useful when you mix scraping with downstream ETL.
Deployment patterns
Production Prefect 3 deployments:
- Prefect Cloud + workers on EC2/GCE: hosted control plane, your compute
- Self-hosted Prefect server + Kubernetes workers: full self-host
- Prefect Cloud + serverless workers (ECS, Lambda): pay-per-use compute
- All-in-one VM: server + worker on same machine for small teams
For containerized workers:
FROM python:3.12-slim
RUN pip install prefect playwright httpx selectolax asyncpg
RUN playwright install chromium
COPY . /app
WORKDIR /app
CMD ["prefect", "worker", "start", "--pool", "my-scraper-pool"]
Run multiple worker containers behind the same pool for horizontal scaling.
Operational checklist
For production Prefect 3 scraping in 2026:
- Prefect 3.x with Python 3.11+
- Prefect Cloud or self-hosted server with PostgreSQL
- Work pools for environment isolation
- Multiple workers for horizontal scale
- Configure retries with exponential backoff
- Use task caching for development iteration
- Async tasks for network-bound work
- Browser scraping in batches per task
- External storage (Postgres, S3) for results
- Monitor flow run success rate, task retry counts
- Alert on flows that have not completed within SLA
For broader pipeline patterns, see building scraping pipelines with Dagster and distributed scraping with Apache Kafka.
Common pitfalls
- Sync code in async flows: blocks the event loop. Use async libraries throughout or wrap sync code in
asyncio.to_thread. - Too many tasks per flow: 1000+ tasks creates dashboard noise and overhead. Batch URLs into chunks per task.
- Browser launch per task: expensive. Batch URLs through one browser instance.
- Long-running tasks: Prefect’s task heartbeat timeout is configurable but default is short. For tasks running >5 minutes, set explicit timeout.
- Not using work pools: running flows directly works locally but does not scale. Move to work pools early.
FAQ
Q: do I need Prefect Cloud or can I self-host?
You can self-host the server. Prefect Cloud adds managed control plane, alerting, and UI hosting, but the OSS server has the full feature set. For most teams under 10 users, self-hosting is fine.
Q: how does Prefect 3 differ from Prefect 2?
Faster task execution, simpler async, work pools (replaces work queues + agents from v2), better dashboard. The migration from v2 to v3 is straightforward for typical flows.
Q: can I use Prefect with Scrapy?
Yes. Wrap Scrapy spider invocations as Prefect tasks. The orchestration value is in scheduling, retries, and downstream processing. See our Scrapy + Playwright integration writeup for Scrapy-specific patterns.
Q: is Prefect overkill for a single scraper?
Yes. For a single cron-scheduled scraper, just use cron. Prefect pays off when you have multiple scrapers, retries, dependencies between flows, and want observability across all of them.
Q: how does cost compare?
Self-hosted: just compute cost, $50-200/month for a small team. Prefect Cloud: usage-based, starts at $0 (free tier), grows with task runs. For most teams under 100k task runs/month, Prefect Cloud is cheaper than DIY observability tooling.
Common pitfalls in production Prefect 3 scraping
The first failure mode is the Postgres backend overload from high-frequency task runs. Prefect 3 stores every task run state transition in its backend Postgres database. A scraping flow that processes 10,000 URLs as 10,000 individual tasks generates roughly 60,000 state-transition rows per flow run (PENDING, RUNNING, COMPLETED, plus retries). Daily flows produce 1.8M rows per month, and the backend’s task_run and task_run_state tables balloon to tens of GB within a quarter. Queries slow down, the UI stalls, and the worker heartbeat misses cause spurious flow failures. The fix is twofold: enable PREFECT_API_DATABASE_TIMEOUT=30 to fail fast on slow queries rather than hanging, and run a weekly cleanup job that deletes flow run records older than 30 days:
from prefect import flow
from prefect.client.orchestration import get_client
from datetime import datetime, timedelta
@flow
async def cleanup_old_runs():
cutoff = datetime.utcnow() - timedelta(days=30)
async with get_client() as client:
flow_runs = await client.read_flow_runs(
flow_run_filter={"end_time": {"before_": cutoff}},
limit=1000,
)
for fr in flow_runs:
await client.delete_flow_run(fr.id)
Schedule this nightly. Without it, expect to manually TRUNCATE the task_run table every few months once it crosses 50GB.
The second pitfall is async semaphore leakage across task retries. Prefect’s @task(retries=3) decorator retries a failed task by re-running it as a new attempt within the same flow context. If your task uses an asyncio.Semaphore to bound concurrency, the semaphore is acquired in attempt 1, the task fails, attempt 2 runs but the semaphore was never released because the failure path skipped the release. After three failed attempts on five tasks, your concurrency limit is permanently reduced by 15 slots. The fix is to use try/finally around every semaphore acquire, or better, use async with semaphore: which guarantees release even on exception:
async def process_one(url: str):
async with semaphore: # guaranteed release
result = await fetch_url(url)
return parse(result)
The third pitfall is the work pool concurrency limit being misinterpreted. The concurrency_limit on a work pool caps the number of flow runs executing simultaneously, not the number of tasks within those flows. A work pool with concurrency_limit=5 and a flow that runs 100 tasks in parallel will happily run 5 flows × 100 tasks = 500 simultaneous tasks. If your worker box has 8 CPUs and 16GB RAM, this oversubscription causes thrashing. Set both flow-level and task-level concurrency: cap the work pool at concurrency_limit=N AND cap async-gather inside each flow with a semaphore at a value that keeps total simultaneous tasks within hardware capacity.
Real-world example: 200-site daily scraping pipeline
A scraping team built a Prefect 3 pipeline that scraped 200 ecommerce sites daily, processing roughly 4 million product pages per day. The architecture used three flow types: a discovery flow that enumerated category URLs per site, a scrape flow that fetched and parsed product pages, and a load flow that wrote results to the data warehouse:
from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import asyncio
import httpx
@task(retries=2, retry_delay_seconds=30)
async def discover_category_urls(site_config: dict) -> list[str]:
async with httpx.AsyncClient(timeout=20) as client:
resp = await client.get(site_config["sitemap_url"])
resp.raise_for_status()
return parse_sitemap_for_categories(resp.text, site_config["url_pattern"])
@task(retries=3, retry_delay_seconds=[10, 30, 60])
async def scrape_product_batch(urls: list[str], site_config: dict) -> list[dict]:
semaphore = asyncio.Semaphore(site_config.get("concurrency", 5))
async def fetch_one(url):
async with semaphore:
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(url, headers=site_config["headers"])
if resp.status_code == 200:
return parse_product(resp.text, site_config)
except Exception:
return None
results = await asyncio.gather(*[fetch_one(u) for u in urls])
return [r for r in results if r]
@flow(name="site-scrape", log_prints=True)
async def scrape_one_site(site_config: dict):
category_urls = await discover_category_urls(site_config)
# Batch into chunks of 50 URLs per task
batches = [category_urls[i:i+50] for i in range(0, len(category_urls), 50)]
batch_results = await asyncio.gather(
*[scrape_product_batch(b, site_config) for b in batches]
)
all_products = [p for batch in batch_results for p in batch]
print(f"site={site_config['name']} products={len(all_products)}")
await load_to_warehouse(all_products, site_config["name"])
@flow(name="all-sites-daily")
async def scrape_all_sites():
site_configs = await load_site_configs()
# Run sites in parallel but cap at 20 simultaneous sites
semaphore = asyncio.Semaphore(20)
async def run_site(cfg):
async with semaphore:
return await scrape_one_site(cfg)
await asyncio.gather(*[run_site(cfg) for cfg in site_configs])
Operational metrics over six months in production:
- Average daily flow duration: 4.2 hours
- p99 task duration: 18 seconds
- Task retry rate: 3.1 percent (mostly network blips)
- Daily backend Postgres growth: 2.8 GB
- Worker count: 4 (8 vCPU, 16GB each)
- Backend Postgres: db.r6g.xlarge ($420/month)
- Total Prefect-side infrastructure cost: $1,680/month
The team initially ran the discovery and scrape flows together but split them into separate flows after observing that discovery failures (sitemap parsing errors) blocked entire site processing. With separate flows, a discovery failure leaves yesterday’s category URLs as the working set for scraping, gracefully degrading rather than blocking. The lesson: orchestrator design choices about flow boundaries directly affect failure isolation.
Comparison: Prefect 3 task patterns by use case
A reference table for choosing the right Prefect 3 pattern per scraping scenario:
| use case | task pattern | concurrency | retry strategy |
|---|---|---|---|
| sitemap discovery | single task per site | 20 sites in parallel | 2 retries, 30s delay |
| product scraping | batch task (50 URLs each) | 5 batches per site | 3 retries, exponential backoff |
| browser scraping | batch task (10 URLs per browser) | 3 browsers per site | 2 retries, 60s delay |
| API enrichment | single task per record | 50 records in parallel | 5 retries, jittered exponential |
| warehouse load | single task per batch (1000 rows) | 1 sequential per site | 5 retries, 5min delay |
| email notification | terminal task | 1 | no retry |
| screenshot capture | batch task (5 URLs per browser) | 2 per site | 1 retry |
| schema validation | sync task with to_thread | inline | no retry |
For browser-heavy work, prefer fewer batches with more URLs per batch to amortize browser launch cost. For HTTP-heavy work, prefer smaller batches with more parallelism to keep latency low.
Wrapping up
Prefect 3 hits the right balance for scraping pipelines: enough orchestration for retries, scheduling, and observability without the heavy ceremony of Airflow. The async support is a real differentiator for I/O-bound scraping. Pair this with our building scraping pipelines with Dagster and Scrapy + Playwright integration writeups for the full pipeline picture, and browse the dev-tools-projects category on DRT for related infrastructure deep-dives.