Building scraping pipelines with Dagster in 2026

Building scraping pipelines with Dagster in 2026

Building scraping pipelines with Dagster is the right answer when scraping is one part of a larger data engineering pipeline. Dagster reframes orchestration around data assets rather than tasks: instead of writing “fetch URLs, parse, store” as separate steps, you declare “products dataset” as a software-defined asset that depends on “raw HTML pages” which depends on “URL list,” and Dagster figures out the execution graph. For pure scraping with no downstream analytics, this is overkill. For scraping that feeds into ML models, dashboards, or warehouses, Dagster’s asset-aware approach pays off in lineage, partitioning, and freshness tracking.

This guide covers Dagster 1.7+ in 2026 for scraping pipelines: software-defined assets, jobs, sensors, partitioning by time and host, IO managers, and a complete end-to-end example. Code is Python 3.12. By the end you will know whether Dagster fits your scraping use case and how to deploy a working pipeline.

Why Dagster for scraping

Dagster’s strengths for scraper-heavy pipelines:

  • Software-defined assets: declare data products instead of orchestration steps
  • Lineage tracking: see how data flows from raw HTML to final dashboard
  • Partition support: per-day, per-host, per-customer partitions with backfill
  • IO managers: separate “what to compute” from “where to store it”
  • Sensors: trigger pipelines based on external events (file arrival, API webhook)
  • Strong type system: Pydantic-style configs and asset metadata
  • Asset checks: data quality assertions per asset
  • Mature: 6+ years in production, used by Robinhood, Duolingo, etc.

For Dagster’s official documentation, see docs.dagster.io.

Where Dagster does not lead

  • For simple scheduled scraping with no downstream pipeline, Prefect or cron are simpler
  • For very high-frequency tasks (1000s/sec), the asset model adds overhead
  • For teams without data engineering experience, the asset abstraction takes adjustment

For Prefect comparison, see building scraping pipelines with Prefect 3.

Installing Dagster

pip install dagster dagster-webserver
dagster --version  # 1.7+

# Initialize a project
dagster project scaffold --name my-scraper
cd my-scraper

Start the dev server:

dagster dev
# UI at http://localhost:3000

A first asset: scraping a URL

# my_scraper/assets.py
from dagster import asset, AssetExecutionContext
import httpx

@asset
def homepage_html(context: AssetExecutionContext) -> str:
    resp = httpx.get("https://example.com", headers={
        "User-Agent": "Mozilla/5.0 ...",
    })
    resp.raise_for_status()
    context.log.info(f"Fetched {len(resp.text)} bytes")
    return resp.text

@asset
def homepage_titles(homepage_html: str) -> list[str]:
    from selectolax.parser import HTMLParser
    tree = HTMLParser(homepage_html)
    return [n.text(strip=True) for n in tree.css("h2")]

Two assets: homepage_html (the raw HTML) and homepage_titles (parsed titles, depends on homepage_html). Dagster figures out the dependency from the function signature.

In the Dagster UI, you see a graph with two nodes connected by a line. Click “Materialize all” and Dagster fetches the page and parses it.

Partitioned assets: scrape per day, per host

For real scraping, you usually want partitions: one materialization per day, per host, or per customer. Dagster partitions track which slices have been materialized and which need backfilling.

from dagster import asset, DailyPartitionsDefinition, AssetExecutionContext
from datetime import datetime
import httpx

daily_partitions = DailyPartitionsDefinition(start_date="2026-01-01")

@asset(partitions_def=daily_partitions)
def daily_news_html(context: AssetExecutionContext) -> str:
    date_str = context.partition_key  # YYYY-MM-DD
    url = f"https://news.example.com/archive/{date_str}"
    resp = httpx.get(url)
    resp.raise_for_status()
    return resp.text

Now daily_news_html has one materialization per day. Run today’s: Dagster fetches today’s URL. Backfill last 30 days: Dagster fetches each day’s URL.

For multi-dimensional partitions (per-day AND per-host):

from dagster import (
    asset, MultiPartitionsDefinition, StaticPartitionsDefinition,
    DailyPartitionsDefinition,
)

hosts = StaticPartitionsDefinition(["example.com", "news.example.com", "shop.example.com"])
date_partitions = DailyPartitionsDefinition(start_date="2026-01-01")

multi_partitions = MultiPartitionsDefinition({"date": date_partitions, "host": hosts})

@asset(partitions_def=multi_partitions)
def host_daily_pages(context: AssetExecutionContext) -> str:
    keys = context.partition_key.keys_by_dimension
    date_str = keys["date"]
    host = keys["host"]
    url = f"https://{host}/archive/{date_str}"
    resp = httpx.get(url)
    return resp.text

This creates a 2D grid of partitions. Backfill specific cells (date X host) without re-running everything.

IO managers: separating compute from storage

In simple cases, the asset function returns a Python value and Dagster pickles it to local disk. For production, use IO managers to write to Postgres, S3, or your warehouse.

from dagster import IOManager, io_manager, OutputContext, InputContext
import boto3
import json
import io

class S3IOManager(IOManager):
    def __init__(self, bucket: str):
        self.bucket = bucket
        self.s3 = boto3.client("s3")

    def _key(self, context):
        # asset_key is a list of strings
        path = "/".join(context.asset_key.path)
        if context.has_partition_key:
            path = f"{path}/{context.partition_key}"
        return f"{path}.json"

    def handle_output(self, context: OutputContext, obj):
        key = self._key(context)
        body = json.dumps(obj).encode() if not isinstance(obj, bytes) else obj
        self.s3.put_object(Bucket=self.bucket, Key=key, Body=body)
        context.log.info(f"Wrote to s3://{self.bucket}/{key}")

    def load_input(self, context: InputContext):
        key = self._key(context)
        resp = self.s3.get_object(Bucket=self.bucket, Key=key)
        return json.loads(resp["Body"].read())

@io_manager(config_schema={"bucket": str})
def s3_io_manager(init_context):
    return S3IOManager(bucket=init_context.resource_config["bucket"])

Wire it in:

from dagster import Definitions

defs = Definitions(
    assets=[homepage_html, homepage_titles],
    resources={"io_manager": s3_io_manager.configured({"bucket": "my-scraper-data"})},
)

Now every asset’s output goes to S3 instead of local disk. Same code, different storage.

Browser scraping with Playwright as an asset

from dagster import asset, AssetExecutionContext
from playwright.sync_api import sync_playwright

@asset(partitions_def=daily_partitions)
def dynamic_page_html(context: AssetExecutionContext) -> str:
    date_str = context.partition_key
    url = f"https://example.com/dynamic?date={date_str}"

    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        page = browser.new_page()
        page.goto(url, wait_until="networkidle")
        html = page.content()
        browser.close()
    return html

Same pattern: Playwright runs inside the asset function. Dagster orchestrates.

Jobs: grouping assets

Jobs run sets of assets together. Define a job that materializes a related group:

from dagster import define_asset_job, AssetSelection

scrape_news_job = define_asset_job(
    "scrape_news",
    selection=AssetSelection.assets("daily_news_html", "daily_news_articles"),
)

Schedule it:

from dagster import ScheduleDefinition

daily_news_schedule = ScheduleDefinition(
    job=scrape_news_job,
    cron_schedule="0 6 * * *",  # 6am daily
)

Sensors: event-driven materialization

Sensors trigger asset materialization based on external events:

from dagster import sensor, RunRequest, SkipReason
from datetime import datetime
import httpx

@sensor(job=scrape_news_job)
def new_url_sensor(context):
    # Check an API for new URLs
    resp = httpx.get("https://api.example.com/new-urls")
    new_urls = resp.json()

    if not new_urls:
        return SkipReason("No new URLs")

    return RunRequest(
        run_key=str(datetime.utcnow().timestamp()),
        run_config={"ops": {"daily_news_html": {"config": {"urls": new_urls}}}},
    )

Dagster runs the sensor function periodically (default 30 seconds). When it returns a RunRequest, the job triggers.

Asset checks: data quality

Built-in data quality assertions per asset:

from dagster import asset, AssetCheckResult, asset_check

@asset
def products_data(homepage_html: str) -> list[dict]:
    # Parse and return products
    return [{"title": "...", "price": "..."}]

@asset_check(asset=products_data)
def check_min_products(context, products_data):
    count = len(products_data)
    return AssetCheckResult(
        passed=count >= 10,
        metadata={"count": count, "expected_min": 10},
    )

@asset_check(asset=products_data)
def check_no_missing_prices(context, products_data):
    missing = sum(1 for p in products_data if not p.get("price"))
    return AssetCheckResult(
        passed=missing == 0,
        metadata={"missing_count": missing},
    )

After every materialization, Dagster runs the checks and shows pass/fail in the UI. Fail an SLA, fail a build, alert the team.

Comparison: Dagster vs Prefect for scraping

dimensionDagsterPrefect 3
paradigmsoftware-defined assetsflows and tasks
best fordata pipelines with downstream ETLmid-complexity workflows
async supportsync-first, async via Op syntaxexcellent async
partitioningrich (multi-dimensional, backfill)basic
asset lineagefirst-classvia DAGs
hosted optionDagster CloudPrefect Cloud
learning curvemedium-highlow-medium
dashboardexcellentexcellent

Pick Dagster when scraping feeds analytics, ML, or BI. Pick Prefect when scraping is the end product or when async-heavy.

Complete production example

# my_scraper/definitions.py
from dagster import (
    asset, AssetExecutionContext, Definitions,
    DailyPartitionsDefinition, define_asset_job, ScheduleDefinition,
    AssetSelection, IOManager, io_manager, Resource,
)
from dagster_aws.s3 import S3PickleIOManager, S3Resource
import httpx
from selectolax.parser import HTMLParser
import json
from typing import Optional

daily_partitions = DailyPartitionsDefinition(start_date="2026-01-01")

@asset(partitions_def=daily_partitions, group_name="ingestion")
def raw_pages(context: AssetExecutionContext) -> dict[str, str]:
    """Fetch all configured pages for the partition date."""
    date_str = context.partition_key
    urls = [
        f"https://example.com/products?date={date_str}&page={i}"
        for i in range(1, 11)
    ]
    pages = {}
    for url in urls:
        try:
            resp = httpx.get(url, timeout=30)
            resp.raise_for_status()
            pages[url] = resp.text
            context.log.info(f"Fetched {url}: {len(resp.text)} bytes")
        except Exception as e:
            context.log.error(f"Failed {url}: {e}")
    return pages

@asset(partitions_def=daily_partitions, group_name="parsing")
def parsed_products(context: AssetExecutionContext, raw_pages: dict) -> list[dict]:
    """Parse products from all raw pages."""
    products = []
    for url, html in raw_pages.items():
        tree = HTMLParser(html)
        for el in tree.css("article.product"):
            title_el = el.css_first("h2")
            price_el = el.css_first(".price")
            link_el = el.css_first("a")
            products.append({
                "title": title_el.text(strip=True) if title_el else None,
                "price": price_el.text(strip=True) if price_el else None,
                "url": link_el.attributes.get("href") if link_el else None,
                "source_url": url,
                "scraped_date": context.partition_key,
            })
    context.log.info(f"Parsed {len(products)} products")
    return products

@asset(partitions_def=daily_partitions, group_name="storage")
def products_in_db(context: AssetExecutionContext, parsed_products: list) -> int:
    """Write parsed products to Postgres."""
    import asyncpg
    import asyncio

    async def write():
        conn = await asyncpg.connect("postgresql://user:pass@localhost/scraper")
        try:
            await conn.executemany(
                """INSERT INTO products
                   (title, price, url, source_url, scraped_date)
                   VALUES ($1, $2, $3, $4, $5)
                   ON CONFLICT (url, scraped_date) DO NOTHING""",
                [(p["title"], p["price"], p["url"], p["source_url"], p["scraped_date"])
                 for p in parsed_products],
            )
            return len(parsed_products)
        finally:
            await conn.close()

    count = asyncio.run(write())
    context.log.info(f"Stored {count} products to DB")
    return count

@asset_check(asset=parsed_products)
def check_minimum_products(context, parsed_products):
    return AssetCheckResult(
        passed=len(parsed_products) >= 50,
        metadata={"count": len(parsed_products), "expected_min": 50},
    )

scrape_job = define_asset_job(
    "daily_scrape",
    selection=AssetSelection.assets("raw_pages", "parsed_products", "products_in_db"),
)

scrape_schedule = ScheduleDefinition(
    job=scrape_job,
    cron_schedule="0 6 * * *",
    execution_timezone="UTC",
)

defs = Definitions(
    assets=[raw_pages, parsed_products, products_in_db],
    asset_checks=[check_minimum_products],
    jobs=[scrape_job],
    schedules=[scrape_schedule],
    resources={
        # Add IO managers, DB connections, etc.
    },
)

This pipeline:

  1. Fetches raw pages daily, partitioned by date
  2. Parses products from raw HTML
  3. Stores to Postgres with idempotent insert
  4. Asserts minimum product count
  5. Runs daily at 6am UTC

In the Dagster UI, you see the asset graph, materialization history, and check pass/fail per partition.

Deployment

For production:

  • Dagster Cloud (managed): easiest, paid per user + execution
  • Self-hosted on Kubernetes: dagster-helm chart, full control
  • Self-hosted on a single VM: docker-compose with PostgreSQL, fine for small teams

For Kubernetes:

# values.yaml for dagster-helm
dagsterDaemon:
  enabled: true
runLauncher:
  type: K8sRunLauncher
postgresql:
  enabled: true

Dagster spawns a Kubernetes pod per run, isolating resources.

Operational checklist

For production Dagster scraping in 2026:

  • Dagster 1.7+ with Python 3.11+
  • Dagster Cloud or self-hosted on Kubernetes
  • Partitions defined for backfill capability
  • IO managers for production storage (S3, Postgres)
  • Asset checks for data quality
  • Sensors for event-driven workflows
  • Schedules for cron-style runs
  • Type hints on all asset functions
  • Resource configs separate from code

For broader pipeline patterns, see building scraping pipelines with Prefect 3 and distributed scraping with Apache Kafka.

Common pitfalls

  • Asset name conflicts: two assets with the same key error out. Use key_prefix to namespace.
  • Partition explosion: multi-dimensional partitions can produce millions of cells. Plan partitioning carefully.
  • Sync code in async contexts: Dagster is sync-first; for async, use asyncio.run inside the asset.
  • Long-running assets: Dagster default timeout is short. Configure execution timeout per asset.
  • IO manager mismatches: changing IO managers mid-flight can cause input/output type mismatches.

FAQ

Q: should I use Dagster or Prefect for pure scraping?
For pure scraping, Prefect’s simpler model is usually a better fit. Dagster pays off when scraping is part of a larger ETL or ML pipeline.

Q: can I migrate from Airflow to Dagster?
Yes, with effort. The asset model is different from DAGs. Most teams migrate one pipeline at a time over months.

Q: does Dagster work with Scrapy?
Yes. Wrap Scrapy invocations as assets. The orchestration value is at the pipeline level, not within individual spiders.

Q: how does cost compare to Prefect?
Self-hosted: similar (just compute). Cloud: Dagster Cloud is more expensive per user, but pricing for execution is comparable.

Q: is Dagster overkill for a 5-spider project?
Probably yes. For 5 unrelated scrapers, cron + Python scripts is fine. For 5 scrapers feeding a unified analytics warehouse, Dagster shines.

Common pitfalls in production Dagster scraping

The first failure mode is the partition backfill blast radius. Dagster’s backfill UI lets you select multiple partitions and re-materialize them. A typo in the partition selector (selecting “all 2026 partitions” when you meant “this week’s partitions”) triggers 365 simultaneous materialization runs, which queues 365 Kubernetes pods, exhausts your cluster’s pod limit, and starves all other pipelines for hours. The fix is to enable backfill concurrency limits in the Dagster instance config, capping simultaneous backfill runs at a sane number:

# dagster.yaml
run_coordinator:
  module: dagster._core.run_coordinator
  class: QueuedRunCoordinator
  config:
    max_concurrent_runs: 25
    tag_concurrency_limits:
      - key: dagster/backfill
        limit: 5

This caps backfills at 5 concurrent runs while leaving 20 slots for scheduled and ad-hoc runs. Without this, a single mis-clicked backfill can take down your entire pipeline cluster.

The second pitfall is asset definition import-time side effects. Dagster imports your asset definitions on every code-server reload to build the asset graph. If your asset definition file contains import-time code that fetches from an external service (loading site configs from a database at module import), every code reload triggers that fetch. With 50 developers reloading code 100 times per day, you generate 5000 daily fetches that the upstream service was never designed for, and you sometimes get throttled by your own configuration store. The fix is to defer all external calls to inside the asset function or use a Dagster Resource that lazy-loads:

from dagster import resource, asset, ConfigurableResource

class SiteConfigResource(ConfigurableResource):
    config_url: str
    _cache: dict | None = None

    def get_configs(self) -> dict:
        if self._cache is None:
            self._cache = httpx.get(self.config_url).json()
        return self._cache

@asset
def raw_pages(context, site_configs: SiteConfigResource):
    configs = site_configs.get_configs()  # only fetched when asset runs
    # ...

The third pitfall is the IO manager memory bloat on large asset values. Dagster’s default IO managers (S3PickleIOManager, FilesystemIOManager) serialize the entire asset value to storage between assets. A raw_pages asset returning 10,000 HTML pages of 50KB each is 500MB of data. The pickle serialize-deserialize cycle runs in a single process, peaking at 1GB+ resident memory. Workers OOM-kill, the run fails, and Dagster retries it, blowing through your retry budget. The fix is custom IO managers that stream rather than buffer, or asset partitioning that keeps individual asset values under 100MB:

class StreamingS3IOManager(IOManager):
    def handle_output(self, context, obj):
        # obj is an iterator, not a list
        s3_key = f"{context.asset_key.path[-1]}/{context.partition_key}.jsonl"
        with self.s3_client.open(s3_key, "wb") as f:
            for item in obj:
                f.write(json.dumps(item).encode() + b"\n")

    def load_input(self, context):
        s3_key = f"{context.asset_key.path[-1]}/{context.partition_key}.jsonl"
        return (json.loads(line) for line in self.s3_client.iter_lines(s3_key))

Then design your assets to yield items rather than return lists. Memory usage drops from O(N) to O(1) per asset, regardless of partition size.

Real-world example: Dagster + dbt for ecommerce intelligence

A scraping team built a Dagster pipeline that scraped 30 ecommerce sites daily and fed a dbt-managed warehouse for downstream analytics. The architecture treated scraped data as Dagster assets that materialized into Snowflake, and dbt models as downstream Dagster assets that ran SQL transformations against those tables:

from dagster import asset, AssetExecutionContext, DailyPartitionsDefinition
from dagster_dbt import DbtCliResource, dbt_assets, get_asset_key_for_model
from pathlib import Path

DBT_PROJECT_DIR = Path(__file__).parent / "ecom_dbt"
daily = DailyPartitionsDefinition(start_date="2026-01-01")

@asset(partitions_def=daily, group_name="raw_scrape")
def raw_amazon_products(context: AssetExecutionContext):
    date_str = context.partition_key
    products = scrape_amazon(date=date_str)
    write_to_snowflake("raw.amazon_products", products)
    context.add_output_metadata({
        "row_count": len(products),
        "preview": products[:5],
    })

@asset(partitions_def=daily, group_name="raw_scrape")
def raw_walmart_products(context: AssetExecutionContext):
    date_str = context.partition_key
    products = scrape_walmart(date=date_str)
    write_to_snowflake("raw.walmart_products", products)
    context.add_output_metadata({"row_count": len(products)})

@dbt_assets(
    manifest=DBT_PROJECT_DIR / "target" / "manifest.json",
    partitions_def=daily,
)
def ecom_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
    yield from dbt.cli(["build"], context=context).stream()

The dbt project contained models like staging_amazon_products (cleans the raw scrape), mart_competitive_pricing (joins all sites for cross-comparison), and ml_features_pricing (rolling windows for the pricing prediction model).

In the Dagster UI, the asset lineage showed the full chain: scrape job -> raw tables -> staging models -> mart models -> ML feature tables. When the pricing prediction model returned anomalous results, the team traced the lineage backward and discovered the root cause was a 24-hour gap in the Walmart scrape (the site had pushed an HTML structure change that broke the parser). The lineage view turned a 4-hour debugging session into a 15-minute one.

Asset checks at each layer caught quality issues before they propagated:

@asset_check(asset=raw_amazon_products)
def amazon_row_count_check(context, raw_amazon_products):
    rows = query_snowflake(
        f"SELECT COUNT(*) FROM raw.amazon_products WHERE scraped_date = '{context.partition_key}'"
    )[0][0]
    return AssetCheckResult(
        passed=rows >= 5000,
        severity=AssetCheckSeverity.ERROR if rows < 1000 else AssetCheckSeverity.WARN,
        metadata={"row_count": rows, "expected_min": 5000},
    )

A check failure marked the partition as unhealthy in the UI and prevented downstream dbt models from running until the issue was resolved. Over six months in production, asset checks caught 47 distinct data quality regressions before they reached the analytics layer.

Comparison: Dagster patterns by scraping scale

A reference table of Dagster patterns that work well at different scraping volumes:

volumepatternpartition strategyruntimenotes
<100 URLs/daysingle-asset, no partitionsnonelocal Dagstersimplest
1K-100K URLs/dayper-site assets, daily partitionsDailyPartitionsDefinitionself-hostedsweet spot
100K-10M URLs/dayper-site assets, hourly partitionsHourlyPartitionsDefinition + multi-dimk8s with autoscaleneeds IO manager tuning
10M+ URLs/daypartitioned ops within assetsDynamicPartitionsDefinitionDagster Cloud Hybrid + k8srequires custom IO managers
Realtime triggerssensors + asset reconciliationnone, sensor-drivenself-hosteduse Prefect instead if pure realtime

For most scraping workloads, the 1K-100K URLs/day band with DailyPartitionsDefinition is the right starting point. Migrate up the table only when partitioning becomes the bottleneck. Migrate down (to a simpler tool) if Dagster’s overhead exceeds your benefit.

Detection: when Dagster is the wrong choice for scraping

Five signals that your scraping workload should NOT live on Dagster:

  1. Pure scraping with no downstream ETL: if all you do is scrape and dump to JSON files, Dagster’s asset model is overhead. Use Prefect or cron + scripts.
  2. Heavy async workload (1000+ concurrent requests): Dagster is sync-first. Heavy async work runs better in Prefect or Celery.
  3. Sub-minute scheduling: Dagster’s minimum schedule interval is one minute. Sub-minute requires sensors or external triggers.
  4. Fluid pipeline shape that changes per run: Dagster’s asset model assumes a stable graph. Per-run dynamic shape is awkward.
  5. Real-time event-driven only: pure event-driven workloads work better in event-streaming systems (Kafka, NATS) than in orchestrators.

If three or more of these apply, Prefect or a simpler tool is the right answer.

Performance tuning: parallel asset execution

Dagster’s default behavior runs assets sequentially within a job. For scraping where assets are independent (different sites), enable multi-process or k8s execution:

from dagster import multiprocess_executor, define_asset_job

scrape_job = define_asset_job(
    "all_sites",
    selection="*",
    executor_def=multiprocess_executor.configured({
        "max_concurrent": 8,
    }),
)

This runs up to 8 site-scraping assets in parallel within one job run. For Kubernetes-based execution, use k8s_job_executor which spawns one pod per asset:

from dagster_k8s import k8s_job_executor

scrape_job = define_asset_job(
    "all_sites",
    executor_def=k8s_job_executor.configured({
        "max_concurrent": 30,
        "container_config": {
            "resources": {
                "requests": {"memory": "1Gi", "cpu": "500m"},
                "limits": {"memory": "4Gi", "cpu": "2000m"},
            },
        },
    }),
)

Per-asset isolation prevents one runaway scrape from OOM-killing the entire job. The trade-off is pod startup latency (5-15 seconds per asset), which is fine for hourly or daily jobs but punitive for sub-minute schedules.

Wrapping up

Dagster excels when scraping is one node in a larger data pipeline. The software-defined asset model gives lineage, partitioning, and freshness tracking that pure orchestrators do not. For teams that already think in terms of data products and downstream consumers, it pays off. Pair this with our building scraping pipelines with Prefect 3 and distributed scraping with Apache Kafka writeups for the full pipeline picture, and browse the dev-tools-projects category on DRT for related infrastructure deep-dives.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top
message me on telegram

Resources

Proxy Signals Podcast
Operator-level insights on mobile proxies and access infrastructure.

Multi-Account Proxies: Setup, Types, Tools & Mistakes (2026)