Azure Functions for Serverless Web Scraping: the Complete Guide

Azure Functions for Serverless Web Scraping: the Complete Guide

serverless architectures have become the preferred way to run web scrapers for teams that want to avoid managing servers, scale automatically, and only pay for actual compute usage. Azure Functions is Microsoft’s serverless platform, and it works surprisingly well for web scraping workloads once you understand its constraints and configure it properly.

this guide walks through building production-ready web scrapers on Azure Functions, from basic HTTP scrapers to complex workflows with proxy rotation, scheduling, and data pipeline integration.

Why Azure Functions for Scraping

Azure Functions offers several advantages over running scrapers on dedicated servers or VMs:

  • zero server management. no patching, no capacity planning, no SSH
  • automatic scaling. Azure spins up instances as needed and scales to zero when idle
  • cost efficiency. you pay only for execution time, not idle capacity. the consumption plan includes 1 million free executions per month
  • built-in scheduling. timer triggers replace cron jobs with better monitoring
  • integration with Azure services. direct connections to Blob Storage, Cosmos DB, Queue Storage, and Event Grid
  • managed identity. access other Azure resources without managing credentials

Limitations to Understand

before choosing Azure Functions for scraping, know these constraints:

constraintconsumption planpremium plan
execution timeout5 min (default), 10 min max30 min (default), 60 min max
memory1.5 GB3.5 to 14 GB
outbound connections~600 active~600 active
cold start1 to 10 seconds~1 second
deployment size1 GB1 GB

for scraping, the biggest limitation is the execution timeout. if your scraping job takes longer than 10 minutes on the consumption plan, you need either the premium plan or an architectural change (splitting the job into smaller functions).

Setting Up the Project

Prerequisites

# install Azure Functions Core Tools
npm install -g azure-functions-core-tools@4

# install Azure CLI
brew install azure-cli  # macOS
# or: curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash  # Linux

# login to Azure
az login

Create the Function App

# create a new Python function app
func init scraper-functions --python
cd scraper-functions

# create a virtual environment
python -m venv .venv
source .venv/bin/activate

# install dependencies
pip install httpx selectolax playwright azure-storage-blob

Project Structure

scraper-functions/
  scrape_prices/
    __init__.py
    function.json
  scrape_listings/
    __init__.py
    function.json
  process_results/
    __init__.py
    function.json
  shared/
    __init__.py
    proxy.py
    parser.py
    storage.py
  requirements.txt
  host.json
  local.settings.json

Configure host.json

{
  "version": "2.0",
  "logging": {
    "logLevel": {
      "default": "Information",
      "Host.Results": "Error"
    }
  },
  "extensions": {
    "http": {
      "routePrefix": "api",
      "maxOutstandingRequests": 200,
      "maxConcurrentRequests": 100
    }
  },
  "functionTimeout": "00:10:00"
}

Configure local.settings.json

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "PROXY_GATEWAY": "http://gate.proxyservice.com:7777",
    "PROXY_USERNAME": "your_user",
    "PROXY_PASSWORD": "your_pass",
    "STORAGE_CONNECTION_STRING": "your_blob_storage_connection_string"
  }
}

Function 1: Basic HTTP Scraper

let us start with a timer-triggered function that scrapes a list of URLs on a schedule.

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "timer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "0 0 6 * * *"
    }
  ]
}

init.py

import azure.functions as func
import httpx
import json
import logging
import os
from datetime import datetime
from selectolax.parser import HTMLParser

# proxy configuration
PROXY_GATEWAY = os.environ.get("PROXY_GATEWAY")
PROXY_USER = os.environ.get("PROXY_USERNAME")
PROXY_PASS = os.environ.get("PROXY_PASSWORD")


def get_proxy_url(country: str = "us") -> str:
    """build a geo-targeted proxy URL"""
    username = f"{PROXY_USER}-country-{country}"
    return f"http://{username}:{PROXY_PASS}@{PROXY_GATEWAY}"


def scrape_page(url: str, proxy_country: str = "us") -> dict:
    """scrape a single URL through a proxy"""
    proxy_url = get_proxy_url(proxy_country)

    with httpx.Client(
        proxy=proxy_url,
        timeout=30.0,
        headers={
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                          "AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36",
            "Accept-Language": "en-US,en;q=0.9",
        },
        follow_redirects=True,
    ) as client:
        response = client.get(url)

        if response.status_code != 200:
            return {
                "url": url,
                "error": f"HTTP {response.status_code}",
                "scraped_at": datetime.utcnow().isoformat(),
            }

        return {
            "url": url,
            "html": response.text,
            "status_code": response.status_code,
            "scraped_at": datetime.utcnow().isoformat(),
        }


def parse_product_page(html: str) -> list[dict]:
    """extract product data from HTML"""
    tree = HTMLParser(html)
    products = []

    for card in tree.css(".product-card"):
        name_el = card.css_first(".product-name")
        price_el = card.css_first(".product-price")
        rating_el = card.css_first(".rating")

        products.append({
            "name": name_el.text().strip() if name_el else "",
            "price": price_el.text().strip() if price_el else "",
            "rating": rating_el.attributes.get("data-value", "") if rating_el else "",
        })

    return products


def main(timer: func.TimerRequest) -> None:
    """timer-triggered scraping function"""
    if timer.past_due:
        logging.warning("the timer is past due")

    logging.info("starting scheduled scrape job")

    # define targets
    targets = [
        {"url": "https://example.com/products?page=1", "country": "us"},
        {"url": "https://example.com/products?page=2", "country": "us"},
        {"url": "https://example.com/products?page=3", "country": "us"},
    ]

    all_products = []
    errors = []

    for target in targets:
        try:
            result = scrape_page(target["url"], target["country"])

            if "error" in result:
                errors.append(result)
                continue

            products = parse_product_page(result["html"])
            for product in products:
                product["source_url"] = target["url"]
                product["scraped_at"] = result["scraped_at"]
            all_products.extend(products)

            logging.info(f"scraped {len(products)} products from {target['url']}")

        except Exception as e:
            logging.error(f"error scraping {target['url']}: {str(e)}")
            errors.append({
                "url": target["url"],
                "error": str(e),
            })

    # save results to Azure Blob Storage
    save_to_blob(all_products, errors)

    logging.info(
        f"scrape complete: {len(all_products)} products, {len(errors)} errors"
    )


def save_to_blob(products: list[dict], errors: list[dict]):
    """save scraping results to Azure Blob Storage"""
    from azure.storage.blob import BlobServiceClient

    connection_string = os.environ.get("STORAGE_CONNECTION_STRING")
    if not connection_string:
        logging.warning("no storage connection string, skipping blob save")
        return

    blob_service = BlobServiceClient.from_connection_string(connection_string)
    container = blob_service.get_container_client("scraping-results")

    # create container if it does not exist
    try:
        container.create_container()
    except Exception:
        pass  # container already exists

    # save products
    timestamp = datetime.utcnow().strftime("%Y%m%d_%H%M%S")
    blob_name = f"products/{timestamp}.json"
    blob_client = container.get_blob_client(blob_name)
    blob_client.upload_blob(
        json.dumps(products, indent=2),
        overwrite=True
    )

    # save errors if any
    if errors:
        error_blob = container.get_blob_client(f"errors/{timestamp}.json")
        error_blob.upload_blob(
            json.dumps(errors, indent=2),
            overwrite=True
        )

Function 2: Queue-Based Distributed Scraper

for large scraping jobs, use a fan-out pattern with Azure Queue Storage. one function generates URLs, another processes them individually.

URL Generator Function

# generate_urls/__init__.py
import azure.functions as func
import json
import logging

def main(timer: func.TimerRequest, msg: func.Out[func.QueueMessage]) -> None:
    """generate scraping tasks and add them to a queue"""
    logging.info("generating scraping tasks")

    # generate URLs to scrape
    tasks = []
    for page in range(1, 101):  # 100 pages
        tasks.append({
            "url": f"https://example.com/listings?page={page}",
            "proxy_country": "us",
            "parser": "listings",
        })

    # add each task to the queue
    for task in tasks:
        msg.set(json.dumps(task))

    logging.info(f"added {len(tasks)} tasks to queue")
{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "timer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "0 0 */6 * * *"
    },
    {
      "name": "msg",
      "type": "queue",
      "direction": "out",
      "queueName": "scrape-tasks",
      "connection": "AzureWebJobsStorage"
    }
  ]
}

Queue Worker Function

# process_scrape/__init__.py
import azure.functions as func
import httpx
import json
import logging
import os
import time
from datetime import datetime

def main(msg: func.QueueMessage) -> None:
    """process a single scraping task from the queue"""
    task = json.loads(msg.get_body().decode("utf-8"))
    url = task["url"]
    proxy_country = task.get("proxy_country", "us")

    logging.info(f"processing scrape task: {url}")

    proxy_url = build_proxy_url(proxy_country)

    # retry logic
    max_retries = 3
    for attempt in range(max_retries):
        try:
            result = scrape_with_retry(url, proxy_url)

            if result["status_code"] == 200:
                # parse and save
                data = parse_response(result["html"], task.get("parser", "default"))
                save_result(url, data)
                logging.info(f"successfully scraped {url}: {len(data)} items")
                return

            elif result["status_code"] == 429:
                # rate limited, wait and retry
                wait_time = 10 * (attempt + 1)
                logging.warning(f"rate limited on {url}, waiting {wait_time}s")
                time.sleep(wait_time)
                continue

            elif result["status_code"] == 403:
                # blocked, try different proxy session
                proxy_url = build_proxy_url(proxy_country, new_session=True)
                continue

            else:
                logging.error(
                    f"unexpected status {result['status_code']} for {url}"
                )
                break

        except Exception as e:
            logging.error(f"attempt {attempt + 1} failed for {url}: {str(e)}")
            if attempt < max_retries - 1:
                time.sleep(5)

    # all retries exhausted
    logging.error(f"failed to scrape {url} after {max_retries} attempts")


def build_proxy_url(country: str, new_session: bool = False) -> str:
    """build proxy URL with optional session rotation"""
    gateway = os.environ["PROXY_GATEWAY"]
    username = os.environ["PROXY_USERNAME"]
    password = os.environ["PROXY_PASSWORD"]

    session_suffix = ""
    if new_session:
        import random
        session_suffix = f"-session-{random.randint(10000, 99999)}"

    return f"http://{username}-country-{country}{session_suffix}:{password}@{gateway}"


def scrape_with_retry(url: str, proxy_url: str) -> dict:
    """execute a scraping request"""
    with httpx.Client(
        proxy=proxy_url,
        timeout=25.0,
        headers={
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                          "AppleWebKit/537.36 Chrome/120.0.0.0",
            "Accept": "text/html,application/xhtml+xml",
            "Accept-Language": "en-US,en;q=0.9",
        },
        follow_redirects=True,
    ) as client:
        response = client.get(url)
        return {
            "html": response.text,
            "status_code": response.status_code,
            "headers": dict(response.headers),
        }


def parse_response(html: str, parser_type: str) -> list[dict]:
    """parse HTML based on parser type"""
    from selectolax.parser import HTMLParser
    tree = HTMLParser(html)

    if parser_type == "listings":
        return parse_listings(tree)
    elif parser_type == "products":
        return parse_products(tree)
    else:
        return [{"raw_text": tree.body.text() if tree.body else ""}]


def parse_listings(tree) -> list[dict]:
    """parse listing page HTML"""
    listings = []
    for card in tree.css(".listing-card"):
        title = card.css_first(".listing-title")
        price = card.css_first(".listing-price")
        location = card.css_first(".listing-location")

        listings.append({
            "title": title.text().strip() if title else "",
            "price": price.text().strip() if price else "",
            "location": location.text().strip() if location else "",
        })
    return listings


def parse_products(tree) -> list[dict]:
    """parse product page HTML"""
    products = []
    for item in tree.css(".product-item"):
        name = item.css_first(".product-name")
        price = item.css_first(".price")
        products.append({
            "name": name.text().strip() if name else "",
            "price": price.text().strip() if price else "",
        })
    return products


def save_result(url: str, data: list[dict]):
    """save parsed data to blob storage"""
    from azure.storage.blob import BlobServiceClient

    connection_string = os.environ.get("STORAGE_CONNECTION_STRING")
    if not connection_string:
        return

    blob_service = BlobServiceClient.from_connection_string(connection_string)
    container = blob_service.get_container_client("scraping-results")

    # create a unique blob name from the URL
    import hashlib
    url_hash = hashlib.md5(url.encode()).hexdigest()[:12]
    timestamp = datetime.utcnow().strftime("%Y%m%d")
    blob_name = f"data/{timestamp}/{url_hash}.json"

    blob_client = container.get_blob_client(blob_name)
    blob_client.upload_blob(
        json.dumps({"url": url, "data": data, "count": len(data)}, indent=2),
        overwrite=True,
    )
{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "msg",
      "type": "queueTrigger",
      "direction": "in",
      "queueName": "scrape-tasks",
      "connection": "AzureWebJobsStorage",
      "maxDequeueCount": 3
    }
  ]
}

Function 3: HTTP-Triggered On-Demand Scraper

sometimes you need to scrape a page on demand via an API call:

# scrape_on_demand/__init__.py
import azure.functions as func
import httpx
import json
import logging
import os
from selectolax.parser import HTMLParser

def main(req: func.HttpRequest) -> func.HttpResponse:
    """HTTP-triggered scraper for on-demand requests"""
    url = req.params.get("url")
    if not url:
        try:
            body = req.get_json()
            url = body.get("url")
        except ValueError:
            pass

    if not url:
        return func.HttpResponse(
            json.dumps({"error": "provide a 'url' parameter"}),
            status_code=400,
            mimetype="application/json",
        )

    country = req.params.get("country", "us")
    selector = req.params.get("selector", "body")

    logging.info(f"on-demand scrape: {url}")

    try:
        # build proxy URL
        proxy_url = (
            f"http://{os.environ['PROXY_USERNAME']}-country-{country}"
            f":{os.environ['PROXY_PASSWORD']}"
            f"@{os.environ['PROXY_GATEWAY']}"
        )

        with httpx.Client(
            proxy=proxy_url,
            timeout=25.0,
            headers={
                "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                              "AppleWebKit/537.36 Chrome/120.0.0.0",
            },
        ) as client:
            response = client.get(url, follow_redirects=True)

        tree = HTMLParser(response.text)
        elements = tree.css(selector)

        results = []
        for el in elements:
            results.append({
                "tag": el.tag,
                "text": el.text().strip(),
                "html": el.html,
            })

        return func.HttpResponse(
            json.dumps({
                "url": url,
                "status_code": response.status_code,
                "results": results,
                "count": len(results),
            }, indent=2),
            mimetype="application/json",
        )

    except Exception as e:
        logging.error(f"scrape error: {str(e)}")
        return func.HttpResponse(
            json.dumps({"error": str(e)}),
            status_code=500,
            mimetype="application/json",
        )

Proxy Management for Azure Functions

Handling Outbound IP Restrictions

Azure Functions on the consumption plan use a shared pool of outbound IPs. this creates two problems for scraping:

  1. other Azure Functions users may have already gotten those IPs blacklisted
  2. you cannot predict which outbound IP will be used

the solution is always routing through a proxy:

# shared/proxy.py
import os
import random

class AzureProxyManager:
    def __init__(self):
        self.gateway = os.environ.get("PROXY_GATEWAY", "")
        self.username = os.environ.get("PROXY_USERNAME", "")
        self.password = os.environ.get("PROXY_PASSWORD", "")
        self.session_counter = 0

    def get_proxy(self, country: str = "us", sticky: bool = False) -> str:
        """get a proxy URL for the given country"""
        username = f"{self.username}-country-{country}"

        if sticky:
            self.session_counter += 1
            session_id = f"{self.session_counter}-{random.randint(1000, 9999)}"
            username += f"-session-{session_id}"

        return f"http://{username}:{self.password}@{self.gateway}"

    def get_rotating_proxy(self, country: str = "us") -> str:
        """get a proxy that rotates IPs on each request"""
        return self.get_proxy(country, sticky=False)

    def get_sticky_proxy(self, country: str = "us") -> str:
        """get a proxy that maintains the same IP for multiple requests"""
        return self.get_proxy(country, sticky=True)

Connection Pool Management

Azure Functions has a limit on outbound connections. reuse HTTP clients instead of creating new ones per request:

# shared/http_client.py
import httpx
from functools import lru_cache

@lru_cache(maxsize=1)
def get_shared_client() -> httpx.Client:
    """return a shared HTTP client to avoid connection exhaustion"""
    return httpx.Client(
        timeout=30.0,
        limits=httpx.Limits(
            max_connections=50,
            max_keepalive_connections=20,
        ),
        headers={
            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                          "AppleWebKit/537.36 Chrome/120.0.0.0",
        },
    )

Deployment

Deploy to Azure

# create Azure resources
az group create --name scraper-rg --location eastus

az storage account create \
  --name scraperstorage \
  --resource-group scraper-rg \
  --location eastus \
  --sku Standard_LRS

az functionapp create \
  --name my-scraper-functions \
  --resource-group scraper-rg \
  --storage-account scraperstorage \
  --consumption-plan-location eastus \
  --runtime python \
  --runtime-version 3.11 \
  --functions-version 4

# set application settings (proxy credentials)
az functionapp config appsettings set \
  --name my-scraper-functions \
  --resource-group scraper-rg \
  --settings \
    PROXY_GATEWAY="http://gate.proxyservice.com:7777" \
    PROXY_USERNAME="your_user" \
    PROXY_PASSWORD="your_pass"

# deploy the function app
func azure functionapp publish my-scraper-functions

Using Azure Key Vault for Proxy Credentials

for production, store proxy credentials in Key Vault instead of app settings:

# create key vault
az keyvault create \
  --name scraper-vault \
  --resource-group scraper-rg \
  --location eastus

# add secrets
az keyvault secret set --vault-name scraper-vault --name ProxyUsername --value "your_user"
az keyvault secret set --vault-name scraper-vault --name ProxyPassword --value "your_pass"

# grant function app access to key vault
az functionapp identity assign --name my-scraper-functions --resource-group scraper-rg

# reference secrets in app settings
az functionapp config appsettings set \
  --name my-scraper-functions \
  --resource-group scraper-rg \
  --settings \
    PROXY_USERNAME="@Microsoft.KeyVault(VaultName=scraper-vault;SecretName=ProxyUsername)" \
    PROXY_PASSWORD="@Microsoft.KeyVault(VaultName=scraper-vault;SecretName=ProxyPassword)"

Monitoring and Alerting

Application Insights Integration

Azure Functions automatically integrates with Application Insights:

import logging

# structured logging for monitoring
def log_scrape_metrics(url: str, status: str, duration_ms: float, items: int):
    """log scraping metrics for Application Insights"""
    logging.info(
        "scrape_complete",
        extra={
            "custom_dimensions": {
                "url": url,
                "status": status,
                "duration_ms": duration_ms,
                "items_extracted": items,
            }
        }
    )

Setting Up Alerts

# alert when scraping error rate exceeds 10%
az monitor metrics alert create \
  --name "high-scrape-error-rate" \
  --resource-group scraper-rg \
  --scopes "/subscriptions/{sub-id}/resourceGroups/scraper-rg/providers/Microsoft.Web/sites/my-scraper-functions" \
  --condition "count requests/failed > 10" \
  --window-size 5m \
  --evaluation-frequency 1m

Cost Optimization

Consumption Plan Pricing

for most scraping workloads, the consumption plan is the most cost-effective:

  • first 1 million executions per month: free
  • additional executions: 0.20 USD per million
  • execution time: 0.000016 USD per GB-second

a typical scraper that runs 1,000 pages per day at 5 seconds per page:
– monthly executions: 30,000
– monthly compute: 30,000 * 5s * 0.5GB = 75,000 GB-seconds
– estimated cost: approximately 1.20 USD per month (well within free tier)

Tips for Reducing Costs

  1. use queue-based processing to keep individual function executions short
  2. cache results in Blob Storage to avoid re-scraping unchanged pages
  3. implement conditional scraping that checks if content has changed before doing a full scrape
  4. optimize timeout settings to fail fast on unresponsive targets

Conclusion

Azure Functions provides a solid foundation for serverless web scraping that scales from a few pages per day to thousands without managing any infrastructure. the queue-based fan-out pattern handles large scraping jobs elegantly, and the integration with Azure Blob Storage gives you a built-in data lake for results.

the key to success is working within the platform’s constraints: keep individual function executions short, use queues for coordination, and always route through proxies since Azure’s shared outbound IPs are likely already flagged by anti-bot systems.

start with a simple timer-triggered function, validate your parsing logic, then graduate to the queue-based architecture when you need to scale up.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top