Building scraping pipelines with Dagster in 2026
Building scraping pipelines with Dagster is the right answer when scraping is one part of a larger data engineering pipeline. Dagster reframes orchestration around data assets rather than tasks: instead of writing “fetch URLs, parse, store” as separate steps, you declare “products dataset” as a software-defined asset that depends on “raw HTML pages” which depends on “URL list,” and Dagster figures out the execution graph. For pure scraping with no downstream analytics, this is overkill. For scraping that feeds into ML models, dashboards, or warehouses, Dagster’s asset-aware approach pays off in lineage, partitioning, and freshness tracking.
This guide covers Dagster 1.7+ in 2026 for scraping pipelines: software-defined assets, jobs, sensors, partitioning by time and host, IO managers, and a complete end-to-end example. Code is Python 3.12. By the end you will know whether Dagster fits your scraping use case and how to deploy a working pipeline.
Why Dagster for scraping
Dagster’s strengths for scraper-heavy pipelines:
- Software-defined assets: declare data products instead of orchestration steps
- Lineage tracking: see how data flows from raw HTML to final dashboard
- Partition support: per-day, per-host, per-customer partitions with backfill
- IO managers: separate “what to compute” from “where to store it”
- Sensors: trigger pipelines based on external events (file arrival, API webhook)
- Strong type system: Pydantic-style configs and asset metadata
- Asset checks: data quality assertions per asset
- Mature: 6+ years in production, used by Robinhood, Duolingo, etc.
For Dagster’s official documentation, see docs.dagster.io.
Where Dagster does not lead
- For simple scheduled scraping with no downstream pipeline, Prefect or cron are simpler
- For very high-frequency tasks (1000s/sec), the asset model adds overhead
- For teams without data engineering experience, the asset abstraction takes adjustment
For Prefect comparison, see building scraping pipelines with Prefect 3.
Installing Dagster
pip install dagster dagster-webserver
dagster --version # 1.7+
# Initialize a project
dagster project scaffold --name my-scraper
cd my-scraper
Start the dev server:
dagster dev
# UI at http://localhost:3000
A first asset: scraping a URL
# my_scraper/assets.py
from dagster import asset, AssetExecutionContext
import httpx
@asset
def homepage_html(context: AssetExecutionContext) -> str:
resp = httpx.get("https://example.com", headers={
"User-Agent": "Mozilla/5.0 ...",
})
resp.raise_for_status()
context.log.info(f"Fetched {len(resp.text)} bytes")
return resp.text
@asset
def homepage_titles(homepage_html: str) -> list[str]:
from selectolax.parser import HTMLParser
tree = HTMLParser(homepage_html)
return [n.text(strip=True) for n in tree.css("h2")]
Two assets: homepage_html (the raw HTML) and homepage_titles (parsed titles, depends on homepage_html). Dagster figures out the dependency from the function signature.
In the Dagster UI, you see a graph with two nodes connected by a line. Click “Materialize all” and Dagster fetches the page and parses it.
Partitioned assets: scrape per day, per host
For real scraping, you usually want partitions: one materialization per day, per host, or per customer. Dagster partitions track which slices have been materialized and which need backfilling.
from dagster import asset, DailyPartitionsDefinition, AssetExecutionContext
from datetime import datetime
import httpx
daily_partitions = DailyPartitionsDefinition(start_date="2026-01-01")
@asset(partitions_def=daily_partitions)
def daily_news_html(context: AssetExecutionContext) -> str:
date_str = context.partition_key # YYYY-MM-DD
url = f"https://news.example.com/archive/{date_str}"
resp = httpx.get(url)
resp.raise_for_status()
return resp.text
Now daily_news_html has one materialization per day. Run today’s: Dagster fetches today’s URL. Backfill last 30 days: Dagster fetches each day’s URL.
For multi-dimensional partitions (per-day AND per-host):
from dagster import (
asset, MultiPartitionsDefinition, StaticPartitionsDefinition,
DailyPartitionsDefinition,
)
hosts = StaticPartitionsDefinition(["example.com", "news.example.com", "shop.example.com"])
date_partitions = DailyPartitionsDefinition(start_date="2026-01-01")
multi_partitions = MultiPartitionsDefinition({"date": date_partitions, "host": hosts})
@asset(partitions_def=multi_partitions)
def host_daily_pages(context: AssetExecutionContext) -> str:
keys = context.partition_key.keys_by_dimension
date_str = keys["date"]
host = keys["host"]
url = f"https://{host}/archive/{date_str}"
resp = httpx.get(url)
return resp.text
This creates a 2D grid of partitions. Backfill specific cells (date X host) without re-running everything.
IO managers: separating compute from storage
In simple cases, the asset function returns a Python value and Dagster pickles it to local disk. For production, use IO managers to write to Postgres, S3, or your warehouse.
from dagster import IOManager, io_manager, OutputContext, InputContext
import boto3
import json
import io
class S3IOManager(IOManager):
def __init__(self, bucket: str):
self.bucket = bucket
self.s3 = boto3.client("s3")
def _key(self, context):
# asset_key is a list of strings
path = "/".join(context.asset_key.path)
if context.has_partition_key:
path = f"{path}/{context.partition_key}"
return f"{path}.json"
def handle_output(self, context: OutputContext, obj):
key = self._key(context)
body = json.dumps(obj).encode() if not isinstance(obj, bytes) else obj
self.s3.put_object(Bucket=self.bucket, Key=key, Body=body)
context.log.info(f"Wrote to s3://{self.bucket}/{key}")
def load_input(self, context: InputContext):
key = self._key(context)
resp = self.s3.get_object(Bucket=self.bucket, Key=key)
return json.loads(resp["Body"].read())
@io_manager(config_schema={"bucket": str})
def s3_io_manager(init_context):
return S3IOManager(bucket=init_context.resource_config["bucket"])
Wire it in:
from dagster import Definitions
defs = Definitions(
assets=[homepage_html, homepage_titles],
resources={"io_manager": s3_io_manager.configured({"bucket": "my-scraper-data"})},
)
Now every asset’s output goes to S3 instead of local disk. Same code, different storage.
Browser scraping with Playwright as an asset
from dagster import asset, AssetExecutionContext
from playwright.sync_api import sync_playwright
@asset(partitions_def=daily_partitions)
def dynamic_page_html(context: AssetExecutionContext) -> str:
date_str = context.partition_key
url = f"https://example.com/dynamic?date={date_str}"
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto(url, wait_until="networkidle")
html = page.content()
browser.close()
return html
Same pattern: Playwright runs inside the asset function. Dagster orchestrates.
Jobs: grouping assets
Jobs run sets of assets together. Define a job that materializes a related group:
from dagster import define_asset_job, AssetSelection
scrape_news_job = define_asset_job(
"scrape_news",
selection=AssetSelection.assets("daily_news_html", "daily_news_articles"),
)
Schedule it:
from dagster import ScheduleDefinition
daily_news_schedule = ScheduleDefinition(
job=scrape_news_job,
cron_schedule="0 6 * * *", # 6am daily
)
Sensors: event-driven materialization
Sensors trigger asset materialization based on external events:
from dagster import sensor, RunRequest, SkipReason
from datetime import datetime
import httpx
@sensor(job=scrape_news_job)
def new_url_sensor(context):
# Check an API for new URLs
resp = httpx.get("https://api.example.com/new-urls")
new_urls = resp.json()
if not new_urls:
return SkipReason("No new URLs")
return RunRequest(
run_key=str(datetime.utcnow().timestamp()),
run_config={"ops": {"daily_news_html": {"config": {"urls": new_urls}}}},
)
Dagster runs the sensor function periodically (default 30 seconds). When it returns a RunRequest, the job triggers.
Asset checks: data quality
Built-in data quality assertions per asset:
from dagster import asset, AssetCheckResult, asset_check
@asset
def products_data(homepage_html: str) -> list[dict]:
# Parse and return products
return [{"title": "...", "price": "..."}]
@asset_check(asset=products_data)
def check_min_products(context, products_data):
count = len(products_data)
return AssetCheckResult(
passed=count >= 10,
metadata={"count": count, "expected_min": 10},
)
@asset_check(asset=products_data)
def check_no_missing_prices(context, products_data):
missing = sum(1 for p in products_data if not p.get("price"))
return AssetCheckResult(
passed=missing == 0,
metadata={"missing_count": missing},
)
After every materialization, Dagster runs the checks and shows pass/fail in the UI. Fail an SLA, fail a build, alert the team.
Comparison: Dagster vs Prefect for scraping
| dimension | Dagster | Prefect 3 |
|---|---|---|
| paradigm | software-defined assets | flows and tasks |
| best for | data pipelines with downstream ETL | mid-complexity workflows |
| async support | sync-first, async via Op syntax | excellent async |
| partitioning | rich (multi-dimensional, backfill) | basic |
| asset lineage | first-class | via DAGs |
| hosted option | Dagster Cloud | Prefect Cloud |
| learning curve | medium-high | low-medium |
| dashboard | excellent | excellent |
Pick Dagster when scraping feeds analytics, ML, or BI. Pick Prefect when scraping is the end product or when async-heavy.
Complete production example
# my_scraper/definitions.py
from dagster import (
asset, AssetExecutionContext, Definitions,
DailyPartitionsDefinition, define_asset_job, ScheduleDefinition,
AssetSelection, IOManager, io_manager, Resource,
)
from dagster_aws.s3 import S3PickleIOManager, S3Resource
import httpx
from selectolax.parser import HTMLParser
import json
from typing import Optional
daily_partitions = DailyPartitionsDefinition(start_date="2026-01-01")
@asset(partitions_def=daily_partitions, group_name="ingestion")
def raw_pages(context: AssetExecutionContext) -> dict[str, str]:
"""Fetch all configured pages for the partition date."""
date_str = context.partition_key
urls = [
f"https://example.com/products?date={date_str}&page={i}"
for i in range(1, 11)
]
pages = {}
for url in urls:
try:
resp = httpx.get(url, timeout=30)
resp.raise_for_status()
pages[url] = resp.text
context.log.info(f"Fetched {url}: {len(resp.text)} bytes")
except Exception as e:
context.log.error(f"Failed {url}: {e}")
return pages
@asset(partitions_def=daily_partitions, group_name="parsing")
def parsed_products(context: AssetExecutionContext, raw_pages: dict) -> list[dict]:
"""Parse products from all raw pages."""
products = []
for url, html in raw_pages.items():
tree = HTMLParser(html)
for el in tree.css("article.product"):
title_el = el.css_first("h2")
price_el = el.css_first(".price")
link_el = el.css_first("a")
products.append({
"title": title_el.text(strip=True) if title_el else None,
"price": price_el.text(strip=True) if price_el else None,
"url": link_el.attributes.get("href") if link_el else None,
"source_url": url,
"scraped_date": context.partition_key,
})
context.log.info(f"Parsed {len(products)} products")
return products
@asset(partitions_def=daily_partitions, group_name="storage")
def products_in_db(context: AssetExecutionContext, parsed_products: list) -> int:
"""Write parsed products to Postgres."""
import asyncpg
import asyncio
async def write():
conn = await asyncpg.connect("postgresql://user:pass@localhost/scraper")
try:
await conn.executemany(
"""INSERT INTO products
(title, price, url, source_url, scraped_date)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (url, scraped_date) DO NOTHING""",
[(p["title"], p["price"], p["url"], p["source_url"], p["scraped_date"])
for p in parsed_products],
)
return len(parsed_products)
finally:
await conn.close()
count = asyncio.run(write())
context.log.info(f"Stored {count} products to DB")
return count
@asset_check(asset=parsed_products)
def check_minimum_products(context, parsed_products):
return AssetCheckResult(
passed=len(parsed_products) >= 50,
metadata={"count": len(parsed_products), "expected_min": 50},
)
scrape_job = define_asset_job(
"daily_scrape",
selection=AssetSelection.assets("raw_pages", "parsed_products", "products_in_db"),
)
scrape_schedule = ScheduleDefinition(
job=scrape_job,
cron_schedule="0 6 * * *",
execution_timezone="UTC",
)
defs = Definitions(
assets=[raw_pages, parsed_products, products_in_db],
asset_checks=[check_minimum_products],
jobs=[scrape_job],
schedules=[scrape_schedule],
resources={
# Add IO managers, DB connections, etc.
},
)
This pipeline:
- Fetches raw pages daily, partitioned by date
- Parses products from raw HTML
- Stores to Postgres with idempotent insert
- Asserts minimum product count
- Runs daily at 6am UTC
In the Dagster UI, you see the asset graph, materialization history, and check pass/fail per partition.
Deployment
For production:
- Dagster Cloud (managed): easiest, paid per user + execution
- Self-hosted on Kubernetes: dagster-helm chart, full control
- Self-hosted on a single VM: docker-compose with PostgreSQL, fine for small teams
For Kubernetes:
# values.yaml for dagster-helm
dagsterDaemon:
enabled: true
runLauncher:
type: K8sRunLauncher
postgresql:
enabled: true
Dagster spawns a Kubernetes pod per run, isolating resources.
Operational checklist
For production Dagster scraping in 2026:
- Dagster 1.7+ with Python 3.11+
- Dagster Cloud or self-hosted on Kubernetes
- Partitions defined for backfill capability
- IO managers for production storage (S3, Postgres)
- Asset checks for data quality
- Sensors for event-driven workflows
- Schedules for cron-style runs
- Type hints on all asset functions
- Resource configs separate from code
For broader pipeline patterns, see building scraping pipelines with Prefect 3 and distributed scraping with Apache Kafka.
Common pitfalls
- Asset name conflicts: two assets with the same key error out. Use
key_prefixto namespace. - Partition explosion: multi-dimensional partitions can produce millions of cells. Plan partitioning carefully.
- Sync code in async contexts: Dagster is sync-first; for async, use
asyncio.runinside the asset. - Long-running assets: Dagster default timeout is short. Configure execution timeout per asset.
- IO manager mismatches: changing IO managers mid-flight can cause input/output type mismatches.
FAQ
Q: should I use Dagster or Prefect for pure scraping?
For pure scraping, Prefect’s simpler model is usually a better fit. Dagster pays off when scraping is part of a larger ETL or ML pipeline.
Q: can I migrate from Airflow to Dagster?
Yes, with effort. The asset model is different from DAGs. Most teams migrate one pipeline at a time over months.
Q: does Dagster work with Scrapy?
Yes. Wrap Scrapy invocations as assets. The orchestration value is at the pipeline level, not within individual spiders.
Q: how does cost compare to Prefect?
Self-hosted: similar (just compute). Cloud: Dagster Cloud is more expensive per user, but pricing for execution is comparable.
Q: is Dagster overkill for a 5-spider project?
Probably yes. For 5 unrelated scrapers, cron + Python scripts is fine. For 5 scrapers feeding a unified analytics warehouse, Dagster shines.
Common pitfalls in production Dagster scraping
The first failure mode is the partition backfill blast radius. Dagster’s backfill UI lets you select multiple partitions and re-materialize them. A typo in the partition selector (selecting “all 2026 partitions” when you meant “this week’s partitions”) triggers 365 simultaneous materialization runs, which queues 365 Kubernetes pods, exhausts your cluster’s pod limit, and starves all other pipelines for hours. The fix is to enable backfill concurrency limits in the Dagster instance config, capping simultaneous backfill runs at a sane number:
# dagster.yaml
run_coordinator:
module: dagster._core.run_coordinator
class: QueuedRunCoordinator
config:
max_concurrent_runs: 25
tag_concurrency_limits:
- key: dagster/backfill
limit: 5
This caps backfills at 5 concurrent runs while leaving 20 slots for scheduled and ad-hoc runs. Without this, a single mis-clicked backfill can take down your entire pipeline cluster.
The second pitfall is asset definition import-time side effects. Dagster imports your asset definitions on every code-server reload to build the asset graph. If your asset definition file contains import-time code that fetches from an external service (loading site configs from a database at module import), every code reload triggers that fetch. With 50 developers reloading code 100 times per day, you generate 5000 daily fetches that the upstream service was never designed for, and you sometimes get throttled by your own configuration store. The fix is to defer all external calls to inside the asset function or use a Dagster Resource that lazy-loads:
from dagster import resource, asset, ConfigurableResource
class SiteConfigResource(ConfigurableResource):
config_url: str
_cache: dict | None = None
def get_configs(self) -> dict:
if self._cache is None:
self._cache = httpx.get(self.config_url).json()
return self._cache
@asset
def raw_pages(context, site_configs: SiteConfigResource):
configs = site_configs.get_configs() # only fetched when asset runs
# ...
The third pitfall is the IO manager memory bloat on large asset values. Dagster’s default IO managers (S3PickleIOManager, FilesystemIOManager) serialize the entire asset value to storage between assets. A raw_pages asset returning 10,000 HTML pages of 50KB each is 500MB of data. The pickle serialize-deserialize cycle runs in a single process, peaking at 1GB+ resident memory. Workers OOM-kill, the run fails, and Dagster retries it, blowing through your retry budget. The fix is custom IO managers that stream rather than buffer, or asset partitioning that keeps individual asset values under 100MB:
class StreamingS3IOManager(IOManager):
def handle_output(self, context, obj):
# obj is an iterator, not a list
s3_key = f"{context.asset_key.path[-1]}/{context.partition_key}.jsonl"
with self.s3_client.open(s3_key, "wb") as f:
for item in obj:
f.write(json.dumps(item).encode() + b"\n")
def load_input(self, context):
s3_key = f"{context.asset_key.path[-1]}/{context.partition_key}.jsonl"
return (json.loads(line) for line in self.s3_client.iter_lines(s3_key))
Then design your assets to yield items rather than return lists. Memory usage drops from O(N) to O(1) per asset, regardless of partition size.
Real-world example: Dagster + dbt for ecommerce intelligence
A scraping team built a Dagster pipeline that scraped 30 ecommerce sites daily and fed a dbt-managed warehouse for downstream analytics. The architecture treated scraped data as Dagster assets that materialized into Snowflake, and dbt models as downstream Dagster assets that ran SQL transformations against those tables:
from dagster import asset, AssetExecutionContext, DailyPartitionsDefinition
from dagster_dbt import DbtCliResource, dbt_assets, get_asset_key_for_model
from pathlib import Path
DBT_PROJECT_DIR = Path(__file__).parent / "ecom_dbt"
daily = DailyPartitionsDefinition(start_date="2026-01-01")
@asset(partitions_def=daily, group_name="raw_scrape")
def raw_amazon_products(context: AssetExecutionContext):
date_str = context.partition_key
products = scrape_amazon(date=date_str)
write_to_snowflake("raw.amazon_products", products)
context.add_output_metadata({
"row_count": len(products),
"preview": products[:5],
})
@asset(partitions_def=daily, group_name="raw_scrape")
def raw_walmart_products(context: AssetExecutionContext):
date_str = context.partition_key
products = scrape_walmart(date=date_str)
write_to_snowflake("raw.walmart_products", products)
context.add_output_metadata({"row_count": len(products)})
@dbt_assets(
manifest=DBT_PROJECT_DIR / "target" / "manifest.json",
partitions_def=daily,
)
def ecom_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource):
yield from dbt.cli(["build"], context=context).stream()
The dbt project contained models like staging_amazon_products (cleans the raw scrape), mart_competitive_pricing (joins all sites for cross-comparison), and ml_features_pricing (rolling windows for the pricing prediction model).
In the Dagster UI, the asset lineage showed the full chain: scrape job -> raw tables -> staging models -> mart models -> ML feature tables. When the pricing prediction model returned anomalous results, the team traced the lineage backward and discovered the root cause was a 24-hour gap in the Walmart scrape (the site had pushed an HTML structure change that broke the parser). The lineage view turned a 4-hour debugging session into a 15-minute one.
Asset checks at each layer caught quality issues before they propagated:
@asset_check(asset=raw_amazon_products)
def amazon_row_count_check(context, raw_amazon_products):
rows = query_snowflake(
f"SELECT COUNT(*) FROM raw.amazon_products WHERE scraped_date = '{context.partition_key}'"
)[0][0]
return AssetCheckResult(
passed=rows >= 5000,
severity=AssetCheckSeverity.ERROR if rows < 1000 else AssetCheckSeverity.WARN,
metadata={"row_count": rows, "expected_min": 5000},
)
A check failure marked the partition as unhealthy in the UI and prevented downstream dbt models from running until the issue was resolved. Over six months in production, asset checks caught 47 distinct data quality regressions before they reached the analytics layer.
Comparison: Dagster patterns by scraping scale
A reference table of Dagster patterns that work well at different scraping volumes:
| volume | pattern | partition strategy | runtime | notes |
|---|---|---|---|---|
| <100 URLs/day | single-asset, no partitions | none | local Dagster | simplest |
| 1K-100K URLs/day | per-site assets, daily partitions | DailyPartitionsDefinition | self-hosted | sweet spot |
| 100K-10M URLs/day | per-site assets, hourly partitions | HourlyPartitionsDefinition + multi-dim | k8s with autoscale | needs IO manager tuning |
| 10M+ URLs/day | partitioned ops within assets | DynamicPartitionsDefinition | Dagster Cloud Hybrid + k8s | requires custom IO managers |
| Realtime triggers | sensors + asset reconciliation | none, sensor-driven | self-hosted | use Prefect instead if pure realtime |
For most scraping workloads, the 1K-100K URLs/day band with DailyPartitionsDefinition is the right starting point. Migrate up the table only when partitioning becomes the bottleneck. Migrate down (to a simpler tool) if Dagster’s overhead exceeds your benefit.
Detection: when Dagster is the wrong choice for scraping
Five signals that your scraping workload should NOT live on Dagster:
- Pure scraping with no downstream ETL: if all you do is scrape and dump to JSON files, Dagster’s asset model is overhead. Use Prefect or cron + scripts.
- Heavy async workload (1000+ concurrent requests): Dagster is sync-first. Heavy async work runs better in Prefect or Celery.
- Sub-minute scheduling: Dagster’s minimum schedule interval is one minute. Sub-minute requires sensors or external triggers.
- Fluid pipeline shape that changes per run: Dagster’s asset model assumes a stable graph. Per-run dynamic shape is awkward.
- Real-time event-driven only: pure event-driven workloads work better in event-streaming systems (Kafka, NATS) than in orchestrators.
If three or more of these apply, Prefect or a simpler tool is the right answer.
Performance tuning: parallel asset execution
Dagster’s default behavior runs assets sequentially within a job. For scraping where assets are independent (different sites), enable multi-process or k8s execution:
from dagster import multiprocess_executor, define_asset_job
scrape_job = define_asset_job(
"all_sites",
selection="*",
executor_def=multiprocess_executor.configured({
"max_concurrent": 8,
}),
)
This runs up to 8 site-scraping assets in parallel within one job run. For Kubernetes-based execution, use k8s_job_executor which spawns one pod per asset:
from dagster_k8s import k8s_job_executor
scrape_job = define_asset_job(
"all_sites",
executor_def=k8s_job_executor.configured({
"max_concurrent": 30,
"container_config": {
"resources": {
"requests": {"memory": "1Gi", "cpu": "500m"},
"limits": {"memory": "4Gi", "cpu": "2000m"},
},
},
}),
)
Per-asset isolation prevents one runaway scrape from OOM-killing the entire job. The trade-off is pod startup latency (5-15 seconds per asset), which is fine for hourly or daily jobs but punitive for sub-minute schedules.
Wrapping up
Dagster excels when scraping is one node in a larger data pipeline. The software-defined asset model gives lineage, partitioning, and freshness tracking that pure orchestrators do not. For teams that already think in terms of data products and downstream consumers, it pays off. Pair this with our building scraping pipelines with Prefect 3 and distributed scraping with Apache Kafka writeups for the full pipeline picture, and browse the dev-tools-projects category on DRT for related infrastructure deep-dives.