Nanobot AI Agent for Web Scraping: Complete Guide

Nanobot AI Agent for Web Scraping: Complete Guide

Nanobot is an AI agent framework designed for building autonomous agents that can interact with the web, execute code, and process data. unlike general-purpose LLM wrappers, Nanobot is built specifically for task automation, making it well-suited for web scraping workflows where an agent needs to navigate sites, extract data, handle errors, and produce clean output.

this guide covers how to set up Nanobot for web scraping, integrate proxy support, build extraction pipelines, and deploy reliable scraping agents that can handle real-world websites.

What Makes Nanobot Different

most AI agent frameworks focus on chatbot-style interactions. Nanobot takes a different approach:

  • task-oriented: agents are defined by their goals and tools, not by conversation flow
  • tool-first architecture: you define tools (functions) that the agent can call, and the agent orchestrates them to complete a task
  • built-in state management: the agent tracks what it has done, what worked, and what failed
  • retryable execution: failed steps can be retried automatically with different parameters
  • structured output: agents produce typed, validated output rather than free-text responses

for web scraping, these characteristics mean the agent can autonomously handle the entire pipeline from URL discovery to clean data delivery.

Setting Up Nanobot

Installation

pip install nanobot-ai

Basic Configuration

create a Nanobot agent with web scraping capabilities:

# scraping_agent.py
from nanobot import Agent, Tool
import httpx
from bs4 import BeautifulSoup
import json

# define the scraping tools
class WebTools:
    """tools for web scraping operations."""

    def __init__(self, proxy_url: str = None):
        self.proxy_url = proxy_url
        self.session_headers = {
            "User-Agent": (
                "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                "AppleWebKit/537.36 (KHTML, like Gecko) "
                "Chrome/120.0.0.0 Safari/537.36"
            ),
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Accept-Language": "en-US,en;q=0.9"
        }

    async def fetch_page(self, url: str) -> str:
        """fetch a web page and return its text content."""
        proxies = {"all://": self.proxy_url} if self.proxy_url else None

        async with httpx.AsyncClient(
            proxies=proxies,
            timeout=30,
            follow_redirects=True
        ) as client:
            response = await client.get(url, headers=self.session_headers)
            response.raise_for_status()

        soup = BeautifulSoup(response.text, "html.parser")

        # remove non-content elements
        for tag in soup.find_all(["script", "style", "nav", "footer", "header", "aside"]):
            tag.decompose()

        return soup.get_text(separator="\n", strip=True)

    async def fetch_html(self, url: str) -> str:
        """fetch raw HTML from a web page."""
        proxies = {"all://": self.proxy_url} if self.proxy_url else None

        async with httpx.AsyncClient(
            proxies=proxies,
            timeout=30,
            follow_redirects=True
        ) as client:
            response = await client.get(url, headers=self.session_headers)
            return response.text

    async def extract_structured(self, url: str, selectors: dict) -> dict:
        """extract specific elements using CSS selectors."""
        html = await self.fetch_html(url)
        soup = BeautifulSoup(html, "html.parser")

        result = {}
        for field, selector in selectors.items():
            element = soup.select_one(selector)
            if element:
                result[field] = element.get_text(strip=True)
            else:
                result[field] = None

        return result

    async def extract_list(self, url: str, container_selector: str,
                           item_selectors: dict) -> list:
        """extract a list of items from a page."""
        html = await self.fetch_html(url)
        soup = BeautifulSoup(html, "html.parser")

        items = []
        containers = soup.select(container_selector)

        for container in containers:
            item = {}
            for field, selector in item_selectors.items():
                element = container.select_one(selector)
                if element:
                    if element.name == "a":
                        item[field] = element.get("href", "")
                        item[f"{field}_text"] = element.get_text(strip=True)
                    else:
                        item[field] = element.get_text(strip=True)
                else:
                    item[field] = None
            items.append(item)

        return items

    async def search_web(self, query: str, num_results: int = 10) -> list:
        """search the web and return results."""
        from urllib.parse import quote

        search_url = f"https://html.duckduckgo.com/html/?q={quote(query)}"
        proxies = {"all://": self.proxy_url} if self.proxy_url else None

        async with httpx.AsyncClient(
            proxies=proxies,
            timeout=15
        ) as client:
            response = await client.get(search_url, headers=self.session_headers)

        soup = BeautifulSoup(response.text, "html.parser")
        results = []

        for result in soup.select(".result"):
            title_el = result.select_one(".result__title a")
            snippet_el = result.select_one(".result__snippet")

            if title_el:
                results.append({
                    "title": title_el.get_text(strip=True),
                    "url": title_el.get("href", ""),
                    "snippet": snippet_el.get_text(strip=True) if snippet_el else ""
                })

            if len(results) >= num_results:
                break

        return results

Building a Scraping Agent

Product Research Agent

this agent searches for products, visits multiple sites, and compiles a comparison:

class ProductResearchAgent:
    """an agent that researches products across multiple websites."""

    def __init__(self, proxy_url: str = None, llm_api_key: str = None):
        self.tools = WebTools(proxy_url=proxy_url)
        self.llm_api_key = llm_api_key
        self.results = []

    async def research(self, product_query: str, max_sources: int = 5) -> dict:
        """research a product across multiple sources."""

        # step 1: search for the product
        print(f"searching for: {product_query}")
        search_results = await self.tools.search_web(product_query, num_results=max_sources)

        # step 2: visit each result and extract product data
        for result in search_results:
            url = result.get("url", "")
            if not url or not url.startswith("http"):
                continue

            print(f"visiting: {url}")
            try:
                page_content = await self.tools.fetch_page(url)

                # step 3: use LLM to extract structured product data
                product_data = await self._llm_extract(page_content, product_query)
                product_data["source_url"] = url
                product_data["source_title"] = result.get("title", "")
                self.results.append(product_data)

            except Exception as e:
                print(f"failed to scrape {url}: {e}")
                continue

        # step 4: compile and deduplicate results
        return self._compile_report(product_query)

    async def _llm_extract(self, content: str, product_query: str) -> dict:
        """use an LLM to extract product data from page content."""
        from openai import OpenAI

        client = OpenAI(api_key=self.llm_api_key)

        # truncate content to fit context window
        content = content[:15000]

        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {
                    "role": "system",
                    "content": (
                        "extract product information from the web page content. "
                        "return JSON with: product_name, price, currency, "
                        "rating, features (list), pros (list), cons (list). "
                        "use null for fields not found."
                    )
                },
                {
                    "role": "user",
                    "content": f"product query: {product_query}\n\npage content:\n{content}"
                }
            ],
            temperature=0.1,
            response_format={"type": "json_object"}
        )

        return json.loads(response.choices[0].message.content)

    def _compile_report(self, query: str) -> dict:
        """compile all results into a research report."""
        return {
            "query": query,
            "sources_checked": len(self.results),
            "products_found": self.results,
            "price_range": self._get_price_range(),
            "common_features": self._get_common_features()
        }

    def _get_price_range(self) -> dict:
        prices = []
        for r in self.results:
            price = r.get("price")
            if price and isinstance(price, (int, float)):
                prices.append(price)

        if prices:
            return {"min": min(prices), "max": max(prices), "avg": sum(prices) / len(prices)}
        return {"min": None, "max": None, "avg": None}

    def _get_common_features(self) -> list:
        from collections import Counter

        all_features = []
        for r in self.results:
            features = r.get("features", [])
            if features:
                all_features.extend(features)

        counter = Counter(all_features)
        return [f for f, count in counter.most_common(10) if count > 1]

Running the Agent

import asyncio

async def main():
    agent = ProductResearchAgent(
        proxy_url="http://user:pass@residential.proxy.com:8080",
        llm_api_key="your-openai-key"
    )

    report = await agent.research("best residential proxy providers 2026")

    # save the report
    with open("proxy_research.json", "w") as f:
        json.dump(report, f, indent=2)

    print(f"research complete. found {report['sources_checked']} sources.")
    print(f"price range: ${report['price_range']['min']} - ${report['price_range']['max']}")

asyncio.run(main())

Proxy Integration Patterns

Rotating Proxy Pool

import random
import time

class ProxyPool:
    """manage a pool of proxies with health tracking."""

    def __init__(self, proxies: list[str]):
        self.proxies = proxies
        self.health = {p: {"failures": 0, "last_used": 0} for p in proxies}

    def get_proxy(self) -> str:
        """get a healthy proxy from the pool."""
        # filter out proxies with too many recent failures
        healthy = [
            p for p in self.proxies
            if self.health[p]["failures"] < 3
        ]

        if not healthy:
            # reset all failure counts
            for p in self.proxies:
                self.health[p]["failures"] = 0
            healthy = self.proxies

        # pick the least recently used proxy
        healthy.sort(key=lambda p: self.health[p]["last_used"])
        proxy = healthy[0]
        self.health[proxy]["last_used"] = time.time()
        return proxy

    def report_success(self, proxy: str):
        self.health[proxy]["failures"] = 0

    def report_failure(self, proxy: str):
        self.health[proxy]["failures"] += 1

# usage with the scraping tools
pool = ProxyPool([
    "http://user:pass@gate.smartproxy.com:7777",
    "http://user:pass@pr.oxylabs.io:7777",
    "http://user:pass@brd.superproxy.io:22225"
])

class ProxyRotatingTools(WebTools):
    def __init__(self, pool: ProxyPool):
        self.pool = pool
        super().__init__()

    async def fetch_page(self, url: str) -> str:
        proxy = self.pool.get_proxy()
        self.proxy_url = proxy
        try:
            result = await super().fetch_page(url)
            self.pool.report_success(proxy)
            return result
        except Exception as e:
            self.pool.report_failure(proxy)
            raise e

Geo-Targeted Scraping

class GeoTargetedTools(WebTools):
    """tools with geographic proxy targeting."""

    GEO_PROXIES = {
        "us": "http://user-country-us:pass@proxy.example.com:8080",
        "uk": "http://user-country-gb:pass@proxy.example.com:8080",
        "de": "http://user-country-de:pass@proxy.example.com:8080",
        "kr": "http://user-country-kr:pass@proxy.example.com:8080",
        "jp": "http://user-country-jp:pass@proxy.example.com:8080",
        "sg": "http://user-country-sg:pass@proxy.example.com:8080"
    }

    async def fetch_page_geo(self, url: str, country: str) -> str:
        """fetch a page using a proxy from a specific country."""
        proxy = self.GEO_PROXIES.get(country)
        if not proxy:
            raise ValueError(f"no proxy available for country: {country}")

        self.proxy_url = proxy
        return await self.fetch_page(url)

Building a Competitive Intelligence Agent

a more advanced use case combines multiple scraping tools into a competitive intelligence workflow:

class CompetitiveIntelAgent:
    """agent for gathering competitive intelligence."""

    def __init__(self, proxy_url: str, llm_api_key: str):
        self.tools = WebTools(proxy_url=proxy_url)
        self.llm_api_key = llm_api_key

    async def analyze_competitor(self, company_url: str) -> dict:
        """comprehensive competitor analysis from their website."""

        report = {
            "url": company_url,
            "analyzed_at": None,
            "product_info": {},
            "pricing": {},
            "technology": {},
            "content_strategy": {}
        }

        from datetime import datetime
        report["analyzed_at"] = datetime.utcnow().isoformat()

        # fetch main page
        print(f"analyzing: {company_url}")
        main_content = await self.tools.fetch_page(company_url)

        # find and analyze key pages
        html = await self.tools.fetch_html(company_url)
        soup = BeautifulSoup(html, "html.parser")

        key_pages = self._find_key_pages(soup, company_url)

        # analyze pricing page
        if key_pages.get("pricing"):
            try:
                pricing_content = await self.tools.fetch_page(key_pages["pricing"])
                report["pricing"] = await self._analyze_pricing(pricing_content)
            except Exception as e:
                report["pricing"] = {"error": str(e)}

        # analyze product/features page
        if key_pages.get("features"):
            try:
                features_content = await self.tools.fetch_page(key_pages["features"])
                report["product_info"] = await self._analyze_features(features_content)
            except Exception as e:
                report["product_info"] = {"error": str(e)}

        # analyze blog for content strategy
        if key_pages.get("blog"):
            try:
                blog_content = await self.tools.fetch_page(key_pages["blog"])
                report["content_strategy"] = await self._analyze_blog(blog_content)
            except Exception as e:
                report["content_strategy"] = {"error": str(e)}

        return report

    def _find_key_pages(self, soup: BeautifulSoup, base_url: str) -> dict:
        """find links to pricing, features, blog, and other key pages."""
        from urllib.parse import urljoin

        key_pages = {}
        keywords = {
            "pricing": ["pricing", "plans", "price"],
            "features": ["features", "product", "solutions"],
            "blog": ["blog", "resources", "articles"],
            "about": ["about", "company", "team"],
            "docs": ["docs", "documentation", "api"]
        }

        for link in soup.find_all("a", href=True):
            href = link.get("href", "").lower()
            text = link.get_text(strip=True).lower()

            for page_type, kws in keywords.items():
                if page_type not in key_pages:
                    for kw in kws:
                        if kw in href or kw in text:
                            full_url = urljoin(base_url, link.get("href"))
                            key_pages[page_type] = full_url
                            break

        return key_pages

    async def _analyze_pricing(self, content: str) -> dict:
        """extract pricing information using LLM."""
        from openai import OpenAI
        client = OpenAI(api_key=self.llm_api_key)

        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{
                "role": "user",
                "content": (
                    f"extract pricing tiers from this page content. "
                    f"return JSON with tiers (name, price, billing_period, features list). "
                    f"content:\n{content[:10000]}"
                )
            }],
            response_format={"type": "json_object"},
            temperature=0.1
        )

        return json.loads(response.choices[0].message.content)

    async def _analyze_features(self, content: str) -> dict:
        """extract product features using LLM."""
        from openai import OpenAI
        client = OpenAI(api_key=self.llm_api_key)

        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{
                "role": "user",
                "content": (
                    f"extract product features and capabilities. "
                    f"return JSON with: main_product, key_features (list), "
                    f"target_audience, integrations (list). "
                    f"content:\n{content[:10000]}"
                )
            }],
            response_format={"type": "json_object"},
            temperature=0.1
        )

        return json.loads(response.choices[0].message.content)

    async def _analyze_blog(self, content: str) -> dict:
        """analyze blog for content strategy insights."""
        from openai import OpenAI
        client = OpenAI(api_key=self.llm_api_key)

        response = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{
                "role": "user",
                "content": (
                    f"analyze this blog page content for content strategy. "
                    f"return JSON with: post_frequency_estimate, "
                    f"main_topics (list), content_types (list), "
                    f"target_keywords (list). "
                    f"content:\n{content[:10000]}"
                )
            }],
            response_format={"type": "json_object"},
            temperature=0.1
        )

        return json.loads(response.choices[0].message.content)

# usage
agent = CompetitiveIntelAgent(
    proxy_url="http://user:pass@proxy.example.com:8080",
    llm_api_key="your-openai-key"
)

competitors = [
    "https://brightdata.com",
    "https://oxylabs.io",
    "https://smartproxy.com"
]

async def run_analysis():
    reports = []
    for url in competitors:
        report = await agent.analyze_competitor(url)
        reports.append(report)

    with open("competitive_intel.json", "w") as f:
        json.dump(reports, f, indent=2)

asyncio.run(run_analysis())

Error Handling and Resilience

robust error handling is essential for production scraping agents:

import asyncio
from enum import Enum

class RetryStrategy(Enum):
    EXPONENTIAL = "exponential"
    LINEAR = "linear"
    NONE = "none"

class ResilientScraper:
    """a scraper with built-in retry and error handling."""

    def __init__(self, tools: WebTools, max_retries: int = 3,
                 strategy: RetryStrategy = RetryStrategy.EXPONENTIAL):
        self.tools = tools
        self.max_retries = max_retries
        self.strategy = strategy
        self.error_log = []

    async def fetch_with_retry(self, url: str) -> str:
        """fetch a page with automatic retry on failure."""
        last_error = None

        for attempt in range(self.max_retries):
            try:
                result = await self.tools.fetch_page(url)
                return result
            except httpx.HTTPStatusError as e:
                last_error = e
                if e.response.status_code == 403:
                    self.error_log.append({
                        "url": url, "error": "blocked (403)",
                        "attempt": attempt + 1
                    })
                    # switch proxy on block
                    if hasattr(self.tools, 'pool'):
                        self.tools.pool.report_failure(self.tools.proxy_url)
                elif e.response.status_code == 429:
                    # rate limited, wait longer
                    wait = (attempt + 1) * 10
                    await asyncio.sleep(wait)
                    continue
            except httpx.TimeoutException:
                last_error = TimeoutError(f"timeout fetching {url}")
                self.error_log.append({
                    "url": url, "error": "timeout",
                    "attempt": attempt + 1
                })
            except Exception as e:
                last_error = e
                self.error_log.append({
                    "url": url, "error": str(e),
                    "attempt": attempt + 1
                })

            # calculate wait time based on strategy
            if self.strategy == RetryStrategy.EXPONENTIAL:
                wait = 2 ** attempt
            elif self.strategy == RetryStrategy.LINEAR:
                wait = (attempt + 1) * 2
            else:
                break

            await asyncio.sleep(wait)

        raise last_error or Exception(f"failed to fetch {url} after {self.max_retries} attempts")

Monitoring and Logging

track your agent’s performance over time:

import sqlite3
from datetime import datetime

class AgentMetrics:
    """track scraping agent performance metrics."""

    def __init__(self, db_path: str = "agent_metrics.db"):
        self.conn = sqlite3.connect(db_path)
        self._create_tables()

    def _create_tables(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS scrape_events (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                timestamp TEXT,
                url TEXT,
                success BOOLEAN,
                duration_ms INTEGER,
                proxy_used TEXT,
                error TEXT,
                data_points_extracted INTEGER
            )
        """)
        self.conn.commit()

    def log_scrape(self, url: str, success: bool, duration_ms: int,
                   proxy: str = None, error: str = None, data_points: int = 0):
        self.conn.execute("""
            INSERT INTO scrape_events
            (timestamp, url, success, duration_ms, proxy_used, error, data_points_extracted)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        """, (
            datetime.utcnow().isoformat(),
            url, success, duration_ms, proxy, error, data_points
        ))
        self.conn.commit()

    def get_success_rate(self, hours: int = 24) -> float:
        cursor = self.conn.execute("""
            SELECT
                COUNT(CASE WHEN success THEN 1 END) * 100.0 / COUNT(*)
            FROM scrape_events
            WHERE timestamp >= datetime('now', ?)
        """, (f"-{hours} hours",))
        result = cursor.fetchone()[0]
        return result or 0.0

    def get_avg_duration(self, hours: int = 24) -> float:
        cursor = self.conn.execute("""
            SELECT AVG(duration_ms) FROM scrape_events
            WHERE success AND timestamp >= datetime('now', ?)
        """, (f"-{hours} hours",))
        result = cursor.fetchone()[0]
        return result or 0.0

Conclusion

Nanobot provides a structured way to build AI agents that handle web scraping autonomously. the task-oriented architecture, combined with tool definitions and state management, makes it possible to create scraping workflows that adapt to errors, rotate through proxies, and produce validated output. start with the basic WebTools class, add proxy rotation and error handling, then build specialized agents for your specific data collection needs. the key to production reliability is the combination of robust proxy infrastructure, comprehensive error handling, and metrics tracking that lets you spot issues before they become problems.

Leave a Comment

Your email address will not be published. Required fields are marked *

Scroll to Top