Building scraping pipelines with Prefect 3 in 2026

Building scraping pipelines with Prefect 3 in 2026

Building scraping pipelines with Prefect 3 is the right answer when your scraping has grown beyond a single Scrapy project but does not yet need full Apache Airflow’s complexity. Prefect 3 (released 2024) is the third major version of Prefect’s orchestration platform, and it brought meaningful changes from Prefect 2: simpler async support, faster task execution, work pools that decouple scheduling from execution, and a redesigned dashboard. For scraping specifically, Prefect 3 fits the pattern of “fetch URLs, parse them, store results, retry failures, schedule it all” with much less ceremony than Airflow.

This guide covers Prefect 3 for scraping pipelines in 2026: flows and tasks, work pools and workers, retries and concurrency control, scheduling, and a complete production example that fetches URLs, runs them through Playwright when needed, parses results, and stores to Postgres. By the end you will have a working pipeline pattern you can adapt to your specific targets.

Why Prefect for scraping

Prefect’s strengths for scraper pipelines:

  • Python-native: flows and tasks are decorated Python functions, no DAG syntax to learn
  • Async-friendly: tasks can be async def and run concurrently
  • Built-in retries: configurable per task with exponential backoff
  • Work pools and workers: decouple “what to run” from “where to run it”
  • Cron, interval, and event-based scheduling: flexible
  • Observability: dashboard, logs, run history all built in
  • Cloud or self-hosted: free OSS or hosted Prefect Cloud
  • Mature: 6+ years in production, rich ecosystem

For Prefect’s official documentation, see docs.prefect.io.

Where Prefect does not lead

  • For very high frequency tasks (1000+ tasks per second), Prefect adds latency
  • For data-engineering-heavy workloads with many integrations, Dagster has more native connectors
  • For purely scheduled cron jobs without orchestration, simpler tools (cron itself, GitHub Actions) suffice

For scraping, Prefect’s mid-frequency, retry-rich, observable model fits well.

For Dagster comparison, see building scraping pipelines with Dagster.

Installing Prefect 3

pip install prefect>=3.0
prefect --version  # 3.x

Start the local Prefect server:

prefect server start
# Dashboard at http://localhost:4200

For production, use Prefect Cloud (managed) or self-host the server with PostgreSQL.

A first scraping flow

# scraper_flow.py
from prefect import flow, task
import httpx

@task(retries=3, retry_delay_seconds=10)
async def fetch_url(url: str) -> dict:
    async with httpx.AsyncClient(timeout=30) as client:
        resp = await client.get(url, headers={
            "User-Agent": "Mozilla/5.0 ...",
        })
        resp.raise_for_status()
        return {"url": url, "status": resp.status_code, "html": resp.text}

@task
def parse_titles(html: str) -> list[str]:
    from selectolax.parser import HTMLParser
    tree = HTMLParser(html)
    return [n.text(strip=True) for n in tree.css("h2.title")]

@flow(log_prints=True)
async def scrape_titles(urls: list[str]):
    for url in urls:
        result = await fetch_url(url)
        titles = parse_titles(result["html"])
        print(f"{url}: {len(titles)} titles")
    return "Done"

if __name__ == "__main__":
    import asyncio
    asyncio.run(scrape_titles([
        "https://example.com/page1",
        "https://example.com/page2",
    ]))

Run it:

python scraper_flow.py

The flow appears in the Prefect dashboard with full task history, retries, and logs.

Concurrency: parallel task execution

For parallel scraping, use Prefect’s task mapping or asyncio.gather:

from prefect import flow, task
import httpx
import asyncio

@task(retries=3, retry_delay_seconds=10)
async def fetch_url(url: str) -> dict:
    async with httpx.AsyncClient(timeout=30) as client:
        resp = await client.get(url)
        return {"url": url, "status": resp.status_code, "html": resp.text}

@flow(log_prints=True)
async def scrape_parallel(urls: list[str], concurrency: int = 10):
    semaphore = asyncio.Semaphore(concurrency)

    async def fetch_with_semaphore(url):
        async with semaphore:
            return await fetch_url(url)

    results = await asyncio.gather(
        *[fetch_with_semaphore(url) for url in urls],
        return_exceptions=True,
    )
    return results

Or using Prefect’s .map() for static parallelism:

@flow
def scrape_parallel_static(urls: list[str]):
    results = fetch_url.map(urls)
    return results

fetch_url.map(urls) schedules one task per URL and runs them via Prefect’s task runner. The default ConcurrentTaskRunner runs tasks in threads or processes; for async tasks use the ThreadPoolTaskRunner.

Work pools and workers

Prefect 3 splits the runtime model into:

  • Flows / deployments: the work to run
  • Work pools: configuration for runtime environments (Docker, Kubernetes, process)
  • Workers: processes that pick up flow runs from work pools

This decouples scheduling from execution. You can have one Prefect server scheduling flow runs, with workers in different environments (laptop, Kubernetes cluster, EC2) picking up runs from the same work pool.

Create a work pool:

prefect work-pool create my-scraper-pool --type process
prefect worker start --pool my-scraper-pool

Deploy a flow to the pool:

from prefect import flow, serve

@flow
async def scrape_titles(urls: list[str]):
    # ...
    pass

if __name__ == "__main__":
    scrape_titles.serve(
        name="scraper-deployment",
        work_pool_name="my-scraper-pool",
        cron="0 * * * *",  # hourly
    )

Now the flow runs hourly on whatever worker picks it up.

Retries and error handling

Per-task retry config is the most common pattern:

@task(
    retries=5,
    retry_delay_seconds=[10, 30, 60, 120, 300],  # exponential backoff
)
async def fetch_url(url: str) -> dict:
    async with httpx.AsyncClient(timeout=30) as client:
        resp = await client.get(url)
        if resp.status_code >= 500:
            raise httpx.HTTPStatusError(
                f"Server error: {resp.status_code}",
                request=resp.request, response=resp,
            )
        if resp.status_code == 429:
            # Wait longer for rate limits
            await asyncio.sleep(60)
            raise httpx.HTTPStatusError("Rate limited", request=resp.request, response=resp)
        return {"url": url, "status": resp.status_code, "html": resp.text}

For fine-grained retry policies (only retry on certain exceptions, not others):

from prefect.tasks import task_input_hash

@task(
    retries=5,
    retry_delay_seconds=30,
    retry_condition_fn=lambda task, run_state: (
        run_state.is_failed() and "5" in str(run_state.message)
    ),
)
async def fetch_url(url: str) -> dict:
    # ...
    pass

This retries only on 5xx errors, not on 4xx (which usually means permanent failure).

Persistent state: caching task results

For tasks that should not re-run on the same input (deduplication), use Prefect’s caching:

from prefect.tasks import task_input_hash
from datetime import timedelta

@task(
    cache_key_fn=task_input_hash,
    cache_expiration=timedelta(hours=24),
)
async def fetch_url(url: str) -> dict:
    # ...
    pass

Now if you call fetch_url("https://example.com") twice within 24 hours, the second call returns the cached result without running. Useful when re-running flows during development.

Browser scraping with Playwright in Prefect

For pages requiring JavaScript:

from prefect import flow, task
from playwright.async_api import async_playwright

@task
async def fetch_with_browser(url: str) -> dict:
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        page = await browser.new_page()
        await page.goto(url, wait_until="networkidle")
        html = await page.content()
        await browser.close()
        return {"url": url, "html": html}

@flow
async def scrape_dynamic_pages(urls: list[str]):
    results = []
    for url in urls:
        results.append(await fetch_with_browser(url))
    return results

Each task call spawns a browser, which is expensive. For high-volume browser scraping, batch URLs per browser:

@task
async def fetch_batch_with_browser(urls: list[str]) -> list[dict]:
    results = []
    async with async_playwright() as p:
        browser = await p.chromium.launch(headless=True)
        for url in urls:
            page = await browser.new_page()
            try:
                await page.goto(url, timeout=30000)
                html = await page.content()
                results.append({"url": url, "html": html})
            except Exception as e:
                results.append({"url": url, "error": str(e)})
            finally:
                await page.close()
        await browser.close()
    return results

This amortizes browser startup over many URLs.

Scheduling

Prefect supports cron, interval, and event-based scheduling:

# Cron
scrape_flow.serve(
    name="hourly-scraper",
    cron="0 * * * *",
    work_pool_name="my-pool",
)

# Interval
scrape_flow.serve(
    name="every-15min",
    interval=timedelta(minutes=15),
    work_pool_name="my-pool",
)

# Event-based (triggered by webhooks, file events, etc.)
from prefect.events import DeploymentEventTrigger

scrape_flow.serve(
    name="on-event",
    triggers=[
        DeploymentEventTrigger(
            expect={"my.custom.event"},
            parameters={"urls": "{{ event.payload.urls }}"},
        )
    ],
)

Event-based scheduling is useful for “scrape this URL when a webhook fires” patterns.

A complete production example

End-to-end scraping pipeline:

# pipeline.py
from prefect import flow, task, get_run_logger
from prefect.tasks import task_input_hash
from datetime import timedelta
import asyncio
import httpx
from selectolax.parser import HTMLParser
import asyncpg
import json

DB_DSN = "postgresql://user:pass@localhost/scraper"

@task(retries=3, retry_delay_seconds=[10, 30, 60])
async def fetch_url(url: str) -> dict:
    async with httpx.AsyncClient(timeout=30) as client:
        resp = await client.get(url, headers={
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ...",
        })
        resp.raise_for_status()
        return {"url": url, "status": resp.status_code, "html": resp.text}

@task
def parse_products(html: str) -> list[dict]:
    tree = HTMLParser(html)
    products = []
    for el in tree.css("article.product"):
        products.append({
            "title": el.css_first("h2").text(strip=True) if el.css_first("h2") else None,
            "price": el.css_first(".price").text(strip=True) if el.css_first(".price") else None,
            "url": el.css_first("a").attributes.get("href") if el.css_first("a") else None,
        })
    return products

@task(retries=3)
async def store_products(products: list[dict]):
    logger = get_run_logger()
    if not products:
        return
    conn = await asyncpg.connect(DB_DSN)
    try:
        await conn.executemany(
            """INSERT INTO products (title, price, url)
               VALUES ($1, $2, $3)
               ON CONFLICT (url) DO UPDATE SET
                 title = EXCLUDED.title,
                 price = EXCLUDED.price""",
            [(p["title"], p["price"], p["url"]) for p in products],
        )
        logger.info(f"Stored {len(products)} products")
    finally:
        await conn.close()

@flow(log_prints=True)
async def scrape_pipeline(urls: list[str], concurrency: int = 5):
    semaphore = asyncio.Semaphore(concurrency)

    async def process_one(url: str):
        async with semaphore:
            try:
                result = await fetch_url(url)
                products = parse_products(result["html"])
                await store_products(products)
                return {"url": url, "ok": True, "count": len(products)}
            except Exception as e:
                return {"url": url, "ok": False, "error": str(e)}

    results = await asyncio.gather(*[process_one(url) for url in urls])

    success = [r for r in results if r["ok"]]
    failed = [r for r in results if not r["ok"]]

    print(f"Scraped {len(success)} successfully, {len(failed)} failed")
    return {"success": len(success), "failed": len(failed)}

if __name__ == "__main__":
    URLS = [f"https://example.com/products?page={i}" for i in range(1, 21)]
    asyncio.run(scrape_pipeline(URLS))

Deploy with cron:

if __name__ == "__main__":
    URLS = [f"https://example.com/products?page={i}" for i in range(1, 21)]
    scrape_pipeline.serve(
        name="hourly-products",
        cron="0 * * * *",
        parameters={"urls": URLS, "concurrency": 5},
        work_pool_name="my-pool",
    )

Comparison: Prefect vs other orchestrators

featurePrefect 3DagsterAirflow 2.x
Python-nativeyesyesDAG syntax
Async supportexcellentgoodpoor
Setup complexitylowmediumhigh
Observabilitygreat dashboardgreat dashboardOK dashboard
Hosted optionPrefect CloudDagster Cloudmany vendors
Maturityvery goodvery goodexcellent
Best formid-complexity workflowsdata engineeringenterprise complexity

For scraping pipelines specifically, Prefect’s async support and simpler API give it an edge. Dagster’s data-engineering features (asset-based, type system, integrations) are overkill for pure scraping but useful when you mix scraping with downstream ETL.

Deployment patterns

Production Prefect 3 deployments:

  • Prefect Cloud + workers on EC2/GCE: hosted control plane, your compute
  • Self-hosted Prefect server + Kubernetes workers: full self-host
  • Prefect Cloud + serverless workers (ECS, Lambda): pay-per-use compute
  • All-in-one VM: server + worker on same machine for small teams

For containerized workers:

FROM python:3.12-slim

RUN pip install prefect playwright httpx selectolax asyncpg
RUN playwright install chromium

COPY . /app
WORKDIR /app

CMD ["prefect", "worker", "start", "--pool", "my-scraper-pool"]

Run multiple worker containers behind the same pool for horizontal scaling.

Operational checklist

For production Prefect 3 scraping in 2026:

  • Prefect 3.x with Python 3.11+
  • Prefect Cloud or self-hosted server with PostgreSQL
  • Work pools for environment isolation
  • Multiple workers for horizontal scale
  • Configure retries with exponential backoff
  • Use task caching for development iteration
  • Async tasks for network-bound work
  • Browser scraping in batches per task
  • External storage (Postgres, S3) for results
  • Monitor flow run success rate, task retry counts
  • Alert on flows that have not completed within SLA

For broader pipeline patterns, see building scraping pipelines with Dagster and distributed scraping with Apache Kafka.

Common pitfalls

  • Sync code in async flows: blocks the event loop. Use async libraries throughout or wrap sync code in asyncio.to_thread.
  • Too many tasks per flow: 1000+ tasks creates dashboard noise and overhead. Batch URLs into chunks per task.
  • Browser launch per task: expensive. Batch URLs through one browser instance.
  • Long-running tasks: Prefect’s task heartbeat timeout is configurable but default is short. For tasks running >5 minutes, set explicit timeout.
  • Not using work pools: running flows directly works locally but does not scale. Move to work pools early.

FAQ

Q: do I need Prefect Cloud or can I self-host?
You can self-host the server. Prefect Cloud adds managed control plane, alerting, and UI hosting, but the OSS server has the full feature set. For most teams under 10 users, self-hosting is fine.

Q: how does Prefect 3 differ from Prefect 2?
Faster task execution, simpler async, work pools (replaces work queues + agents from v2), better dashboard. The migration from v2 to v3 is straightforward for typical flows.

Q: can I use Prefect with Scrapy?
Yes. Wrap Scrapy spider invocations as Prefect tasks. The orchestration value is in scheduling, retries, and downstream processing. See our Scrapy + Playwright integration writeup for Scrapy-specific patterns.

Q: is Prefect overkill for a single scraper?
Yes. For a single cron-scheduled scraper, just use cron. Prefect pays off when you have multiple scrapers, retries, dependencies between flows, and want observability across all of them.

Q: how does cost compare?
Self-hosted: just compute cost, $50-200/month for a small team. Prefect Cloud: usage-based, starts at $0 (free tier), grows with task runs. For most teams under 100k task runs/month, Prefect Cloud is cheaper than DIY observability tooling.

Common pitfalls in production Prefect 3 scraping

The first failure mode is the Postgres backend overload from high-frequency task runs. Prefect 3 stores every task run state transition in its backend Postgres database. A scraping flow that processes 10,000 URLs as 10,000 individual tasks generates roughly 60,000 state-transition rows per flow run (PENDING, RUNNING, COMPLETED, plus retries). Daily flows produce 1.8M rows per month, and the backend’s task_run and task_run_state tables balloon to tens of GB within a quarter. Queries slow down, the UI stalls, and the worker heartbeat misses cause spurious flow failures. The fix is twofold: enable PREFECT_API_DATABASE_TIMEOUT=30 to fail fast on slow queries rather than hanging, and run a weekly cleanup job that deletes flow run records older than 30 days:

from prefect import flow
from prefect.client.orchestration import get_client
from datetime import datetime, timedelta

@flow
async def cleanup_old_runs():
    cutoff = datetime.utcnow() - timedelta(days=30)
    async with get_client() as client:
        flow_runs = await client.read_flow_runs(
            flow_run_filter={"end_time": {"before_": cutoff}},
            limit=1000,
        )
        for fr in flow_runs:
            await client.delete_flow_run(fr.id)

Schedule this nightly. Without it, expect to manually TRUNCATE the task_run table every few months once it crosses 50GB.

The second pitfall is async semaphore leakage across task retries. Prefect’s @task(retries=3) decorator retries a failed task by re-running it as a new attempt within the same flow context. If your task uses an asyncio.Semaphore to bound concurrency, the semaphore is acquired in attempt 1, the task fails, attempt 2 runs but the semaphore was never released because the failure path skipped the release. After three failed attempts on five tasks, your concurrency limit is permanently reduced by 15 slots. The fix is to use try/finally around every semaphore acquire, or better, use async with semaphore: which guarantees release even on exception:

async def process_one(url: str):
    async with semaphore:  # guaranteed release
        result = await fetch_url(url)
        return parse(result)

The third pitfall is the work pool concurrency limit being misinterpreted. The concurrency_limit on a work pool caps the number of flow runs executing simultaneously, not the number of tasks within those flows. A work pool with concurrency_limit=5 and a flow that runs 100 tasks in parallel will happily run 5 flows × 100 tasks = 500 simultaneous tasks. If your worker box has 8 CPUs and 16GB RAM, this oversubscription causes thrashing. Set both flow-level and task-level concurrency: cap the work pool at concurrency_limit=N AND cap async-gather inside each flow with a semaphore at a value that keeps total simultaneous tasks within hardware capacity.

Real-world example: 200-site daily scraping pipeline

A scraping team built a Prefect 3 pipeline that scraped 200 ecommerce sites daily, processing roughly 4 million product pages per day. The architecture used three flow types: a discovery flow that enumerated category URLs per site, a scrape flow that fetched and parsed product pages, and a load flow that wrote results to the data warehouse:

from prefect import flow, task
from prefect.tasks import task_input_hash
from datetime import timedelta
import asyncio
import httpx

@task(retries=2, retry_delay_seconds=30)
async def discover_category_urls(site_config: dict) -> list[str]:
    async with httpx.AsyncClient(timeout=20) as client:
        resp = await client.get(site_config["sitemap_url"])
        resp.raise_for_status()
        return parse_sitemap_for_categories(resp.text, site_config["url_pattern"])

@task(retries=3, retry_delay_seconds=[10, 30, 60])
async def scrape_product_batch(urls: list[str], site_config: dict) -> list[dict]:
    semaphore = asyncio.Semaphore(site_config.get("concurrency", 5))
    async def fetch_one(url):
        async with semaphore:
            try:
                async with httpx.AsyncClient(timeout=15) as client:
                    resp = await client.get(url, headers=site_config["headers"])
                    if resp.status_code == 200:
                        return parse_product(resp.text, site_config)
            except Exception:
                return None
    results = await asyncio.gather(*[fetch_one(u) for u in urls])
    return [r for r in results if r]

@flow(name="site-scrape", log_prints=True)
async def scrape_one_site(site_config: dict):
    category_urls = await discover_category_urls(site_config)
    # Batch into chunks of 50 URLs per task
    batches = [category_urls[i:i+50] for i in range(0, len(category_urls), 50)]
    batch_results = await asyncio.gather(
        *[scrape_product_batch(b, site_config) for b in batches]
    )
    all_products = [p for batch in batch_results for p in batch]
    print(f"site={site_config['name']} products={len(all_products)}")
    await load_to_warehouse(all_products, site_config["name"])

@flow(name="all-sites-daily")
async def scrape_all_sites():
    site_configs = await load_site_configs()
    # Run sites in parallel but cap at 20 simultaneous sites
    semaphore = asyncio.Semaphore(20)
    async def run_site(cfg):
        async with semaphore:
            return await scrape_one_site(cfg)
    await asyncio.gather(*[run_site(cfg) for cfg in site_configs])

Operational metrics over six months in production:

  • Average daily flow duration: 4.2 hours
  • p99 task duration: 18 seconds
  • Task retry rate: 3.1 percent (mostly network blips)
  • Daily backend Postgres growth: 2.8 GB
  • Worker count: 4 (8 vCPU, 16GB each)
  • Backend Postgres: db.r6g.xlarge ($420/month)
  • Total Prefect-side infrastructure cost: $1,680/month

The team initially ran the discovery and scrape flows together but split them into separate flows after observing that discovery failures (sitemap parsing errors) blocked entire site processing. With separate flows, a discovery failure leaves yesterday’s category URLs as the working set for scraping, gracefully degrading rather than blocking. The lesson: orchestrator design choices about flow boundaries directly affect failure isolation.

Comparison: Prefect 3 task patterns by use case

A reference table for choosing the right Prefect 3 pattern per scraping scenario:

use casetask patternconcurrencyretry strategy
sitemap discoverysingle task per site20 sites in parallel2 retries, 30s delay
product scrapingbatch task (50 URLs each)5 batches per site3 retries, exponential backoff
browser scrapingbatch task (10 URLs per browser)3 browsers per site2 retries, 60s delay
API enrichmentsingle task per record50 records in parallel5 retries, jittered exponential
warehouse loadsingle task per batch (1000 rows)1 sequential per site5 retries, 5min delay
email notificationterminal task1no retry
screenshot capturebatch task (5 URLs per browser)2 per site1 retry
schema validationsync task with to_threadinlineno retry

For browser-heavy work, prefer fewer batches with more URLs per batch to amortize browser launch cost. For HTTP-heavy work, prefer smaller batches with more parallelism to keep latency low.

Wrapping up

Prefect 3 hits the right balance for scraping pipelines: enough orchestration for retries, scheduling, and observability without the heavy ceremony of Airflow. The async support is a real differentiator for I/O-bound scraping. Pair this with our building scraping pipelines with Dagster and Scrapy + Playwright integration writeups for the full pipeline picture, and browse the dev-tools-projects category on DRT for related infrastructure deep-dives.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top
message me on telegram

Resources

Proxy Signals Podcast
Operator-level insights on mobile proxies and access infrastructure.

Multi-Account Proxies: Setup, Types, Tools & Mistakes (2026)