Building a Price Monitoring Bot with Proxies

Building a Price Monitoring Bot with Proxies

Price monitoring is one of the most common proxy use cases. E-commerce sites change prices constantly — sometimes hourly — based on demand, inventory, competition, and visitor profile. A price monitoring bot checks target products regularly, records price changes, and sends alerts when prices drop below a threshold.

Proxies are essential because e-commerce sites aggressively block repeated automated requests from the same IP.

Architecture

Scheduler (cron/APScheduler)
    ↓
Price Checker (async, multi-site)
    ↓ (through rotating proxies)
E-commerce sites (Amazon, Walmart, etc.)
    ↓
Price Extractor (site-specific parsers)
    ↓
Storage (SQLite) + Alerts (email/webhook)

Implementation

import asyncio
import httpx
import re
import json
import sqlite3
import time
import smtplib
from email.message import EmailMessage
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Callable
from datetime import datetime
from abc import ABC, abstractmethod
from selectolax.parser import HTMLParser
import logging

logger = logging.getLogger(__name__)

@dataclass
class Product:
    id: str
    name: str
    url: str
    site: str
    target_price: Optional[float] = None
    current_price: Optional[float] = None
    currency: str = "USD"
    in_stock: bool = True
    last_checked: str = ""
    tags: List[str] = field(default_factory=list)

@dataclass
class PriceRecord:
    product_id: str
    price: float
    currency: str
    in_stock: bool
    timestamp: str
    proxy_used: str = ""

@dataclass
class PriceAlert:
    product: Product
    old_price: float
    new_price: float
    change_percent: float
    alert_type: str  # "drop", "increase", "target_reached", "back_in_stock"


class SiteParser(ABC):
    """Base class for site-specific price parsers."""

    @abstractmethod
    def parse_price(self, html: str, url: str) -> dict:
        pass

    @abstractmethod
    def matches(self, url: str) -> bool:
        pass


class AmazonParser(SiteParser):
    def matches(self, url: str) -> bool:
        return 'amazon.' in url

    def parse_price(self, html: str, url: str) -> dict:
        tree = HTMLParser(html)

        price = None
        for selector in [
            '#priceblock_ourprice',
            '#priceblock_dealprice',
            '.a-price .a-offscreen',
            '#price_inside_buybox',
            '#newBuyBoxPrice',
            '.a-price-whole',
        ]:
            elem = tree.css_first(selector)
            if elem:
                price_text = elem.text(strip=True)
                price = self._extract_number(price_text)
                if price:
                    break

        title_elem = tree.css_first('#productTitle')
        title = title_elem.text(strip=True) if title_elem else ""

        in_stock = True
        avail = tree.css_first('#availability')
        if avail:
            avail_text = avail.text(strip=True).lower()
            if 'unavailable' in avail_text or 'out of stock' in avail_text:
                in_stock = False

        return {
            'price': price,
            'title': title,
            'in_stock': in_stock,
            'currency': 'USD',
        }

    def _extract_number(self, text: str) -> Optional[float]:
        match = re.search(r'[\d,]+\.?\d*', text.replace(',', ''))
        return float(match.group()) if match else None


class GenericParser(SiteParser):
    """Fallback parser using JSON-LD structured data."""

    def matches(self, url: str) -> bool:
        return True

    def parse_price(self, html: str, url: str) -> dict:
        tree = HTMLParser(html)

        # Try JSON-LD first
        for script in tree.css('script[type="application/ld+json"]'):
            try:
                data = json.loads(script.text())
                if isinstance(data, list):
                    data = data[0]
                if data.get('@type') == 'Product':
                    offers = data.get('offers', {})
                    if isinstance(offers, list):
                        offers = offers[0]
                    price = offers.get('price')
                    return {
                        'price': float(price) if price else None,
                        'title': data.get('name', ''),
                        'in_stock': offers.get('availability', '').lower() != 'outofstock',
                        'currency': offers.get('priceCurrency', 'USD'),
                    }
            except (json.JSONDecodeError, ValueError, TypeError):
                continue

        # Fallback: search for price patterns in meta tags
        price = None
        for meta in tree.css('meta[property="product:price:amount"]'):
            try:
                price = float(meta.attributes.get('content', ''))
                break
            except ValueError:
                continue

        title_elem = tree.css_first('title')
        title = title_elem.text(strip=True) if title_elem else ""

        return {
            'price': price,
            'title': title,
            'in_stock': True,
            'currency': 'USD',
        }


class PriceMonitor:
    def __init__(
        self,
        proxies: List[str],
        db_path: str = "price_monitor.db",
    ):
        self.proxies = proxies
        self.proxy_index = 0
        self.db_path = db_path
        self.products: Dict[str, Product] = {}
        self.parsers: List[SiteParser] = [
            AmazonParser(),
            GenericParser(),
        ]
        self.alert_handlers: List[Callable] = []

        self._init_db()

    def _init_db(self):
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS price_history (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                product_id TEXT NOT NULL,
                price REAL,
                currency TEXT,
                in_stock INTEGER,
                timestamp TEXT NOT NULL,
                proxy_used TEXT
            )
        """)
        conn.execute("""
            CREATE INDEX IF NOT EXISTS idx_product_time
            ON price_history(product_id, timestamp)
        """)
        conn.commit()
        conn.close()

    def add_product(self, product: Product):
        self.products[product.id] = product

    def add_alert_handler(self, handler: Callable):
        self.alert_handlers.append(handler)

    def _get_proxy(self) -> str:
        proxy = self.proxies[self.proxy_index % len(self.proxies)]
        self.proxy_index += 1
        return proxy

    def _get_parser(self, url: str) -> SiteParser:
        for parser in self.parsers:
            if parser.matches(url):
                return parser
        return self.parsers[-1]  # Generic fallback

    async def check_price(self, product: Product) -> Optional[PriceRecord]:
        proxy = self._get_proxy()
        parser = self._get_parser(product.url)

        try:
            async with httpx.AsyncClient(
                proxy=proxy,
                timeout=30,
                follow_redirects=True,
            ) as client:
                response = await client.get(product.url, headers={
                    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                                 'AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36',
                    'Accept': 'text/html,application/xhtml+xml',
                    'Accept-Language': 'en-US,en;q=0.9',
                })

            if response.status_code != 200:
                logger.warning(
                    f"HTTP {response.status_code} for {product.name}"
                )
                return None

            data = parser.parse_price(response.text, product.url)

            if data['price'] is None:
                logger.warning(f"Could not extract price for {product.name}")
                return None

            record = PriceRecord(
                product_id=product.id,
                price=data['price'],
                currency=data.get('currency', product.currency),
                in_stock=data.get('in_stock', True),
                timestamp=datetime.utcnow().isoformat(),
                proxy_used=proxy.split('@')[-1] if '@' in proxy else proxy,
            )

            # Check for price changes
            old_price = product.current_price
            product.current_price = record.price
            product.in_stock = record.in_stock
            product.last_checked = record.timestamp

            if old_price is not None and old_price != record.price:
                change_pct = (
                    (record.price - old_price) / old_price * 100
                )

                alert_type = "drop" if change_pct < 0 else "increase"
                if (product.target_price and
                    record.price <= product.target_price):
                    alert_type = "target_reached"

                alert = PriceAlert(
                    product=product,
                    old_price=old_price,
                    new_price=record.price,
                    change_percent=round(change_pct, 1),
                    alert_type=alert_type,
                )
                await self._send_alerts(alert)

            # Store in database
            self._save_record(record)

            return record

        except Exception as e:
            logger.error(f"Error checking {product.name}: {e}")
            return None

    async def check_all(self):
        print(f"Checking {len(self.products)} products...")
        start = time.monotonic()

        tasks = [
            self.check_price(product)
            for product in self.products.values()
        ]
        results = await asyncio.gather(*tasks)

        elapsed = time.monotonic() - start
        successful = sum(1 for r in results if r is not None)
        print(
            f"Checked {successful}/{len(self.products)} products "
            f"in {elapsed:.1f}s"
        )

        return results

    def _save_record(self, record: PriceRecord):
        conn = sqlite3.connect(self.db_path)
        conn.execute(
            """INSERT INTO price_history
            (product_id, price, currency, in_stock, timestamp, proxy_used)
            VALUES (?, ?, ?, ?, ?, ?)""",
            (
                record.product_id, record.price, record.currency,
                int(record.in_stock), record.timestamp, record.proxy_used,
            )
        )
        conn.commit()
        conn.close()

    async def _send_alerts(self, alert: PriceAlert):
        direction = "dropped" if alert.change_percent < 0 else "increased"
        message = (
            f"Price {direction} for {alert.product.name}: "
            f"${alert.old_price:.2f} → ${alert.new_price:.2f} "
            f"({alert.change_percent:+.1f}%)"
        )

        if alert.alert_type == "target_reached":
            message += f" — TARGET PRICE REACHED!"

        print(f"  ALERT: {message}")

        for handler in self.alert_handlers:
            try:
                await handler(alert, message)
            except Exception as e:
                logger.error(f"Alert handler error: {e}")

    def get_price_history(
        self, product_id: str, days: int = 30
    ) -> List[dict]:
        conn = sqlite3.connect(self.db_path)
        cursor = conn.execute(
            """SELECT price, currency, in_stock, timestamp
            FROM price_history
            WHERE product_id = ?
            AND timestamp > datetime('now', ?)
            ORDER BY timestamp""",
            (product_id, f'-{days} days')
        )
        results = [
            {
                'price': row[0],
                'currency': row[1],
                'in_stock': bool(row[2]),
                'timestamp': row[3],
            }
            for row in cursor.fetchall()
        ]
        conn.close()
        return results

    def get_summary(self) -> dict:
        products = []
        for p in self.products.values():
            history = self.get_price_history(p.id, days=7)
            prices = [h['price'] for h in history if h['price']]

            products.append({
                'id': p.id,
                'name': p.name,
                'current_price': p.current_price,
                'target_price': p.target_price,
                'in_stock': p.in_stock,
                'min_7d': min(prices) if prices else None,
                'max_7d': max(prices) if prices else None,
                'checks_7d': len(history),
            })

        return {'products': products}

Alert Handlers

# Webhook alert (Slack, Discord, etc.)
async def webhook_alert(alert: PriceAlert, message: str):
    async with httpx.AsyncClient() as client:
        await client.post(
            "https://hooks.slack.com/services/YOUR/WEBHOOK/URL",
            json={"text": message},
        )

# Email alert
async def email_alert(alert: PriceAlert, message: str):
    msg = EmailMessage()
    msg['Subject'] = f"Price Alert: {alert.product.name}"
    msg['From'] = "alerts@example.com"
    msg['To'] = "you@example.com"
    msg.set_content(message)

    with smtplib.SMTP('smtp.example.com', 587) as smtp:
        smtp.starttls()
        smtp.login("alerts@example.com", "password")
        smtp.send_message(msg)

Scheduling

# Run with APScheduler
from apscheduler.schedulers.asyncio import AsyncIOScheduler

async def main():
    monitor = PriceMonitor(
        proxies=[
            "http://user:pass@proxy1.example.com:8080",
            "http://user:pass@proxy2.example.com:8080",
        ],
    )

    monitor.add_product(Product(
        id="iphone-16",
        name="iPhone 16 Pro 256GB",
        url="https://www.amazon.com/dp/B0EXAMPLE",
        site="amazon",
        target_price=899.00,
    ))

    monitor.add_product(Product(
        id="macbook-m4",
        name="MacBook Pro M4 14-inch",
        url="https://www.amazon.com/dp/B0EXAMPLE2",
        site="amazon",
        target_price=1499.00,
    ))

    monitor.add_alert_handler(webhook_alert)

    # Initial check
    await monitor.check_all()

    # Schedule periodic checks
    scheduler = AsyncIOScheduler()
    scheduler.add_job(
        monitor.check_all,
        'interval',
        hours=1,
        id='price_check',
    )
    scheduler.start()

    # Keep running
    try:
        while True:
            await asyncio.sleep(3600)
    except KeyboardInterrupt:
        scheduler.shutdown()

asyncio.run(main())

Internal Links

FAQ

How often should I check prices?

For competitive intelligence, check every 1-4 hours. For personal deal-hunting, once or twice daily is sufficient. More frequent checks consume more proxy bandwidth and increase the risk of detection. Match your check frequency to how quickly prices change on your target sites.

Why do I need proxies for price monitoring?

E-commerce sites detect and block automated price checks from the same IP. Without proxies, you will see CAPTCHAs, 403 errors, or altered pricing (some sites show higher prices to suspected bots). Residential proxies provide the most reliable access to accurate pricing data.

Can I monitor prices on sites that use JavaScript rendering?

For sites that load prices via JavaScript (React/Vue SPAs), integrate Playwright or Selenium with your proxy setup. The bot fetches the page in a headless browser, waits for the price element to render, then extracts it. This is slower but handles dynamic sites.

How do I detect dynamic pricing aimed at me specifically?

Compare prices fetched through different proxies from different locations. If the same product shows different prices based on the proxy IP, the site is using dynamic pricing. The geo-targeted content checker complements the price monitor for this analysis.

What database should I use for price history?

SQLite works for monitoring up to a few thousand products. For larger operations, use PostgreSQL or TimescaleDB (optimized for time-series data). Store one row per price check per product, and create indexes on product_id and timestamp for fast history queries.


Related Reading

Scroll to Top