Building a Product Research Tool for Dropshipping

Building a Product Research Tool for Dropshipping

Build a product research tool — scrape trending products from AliExpress, Amazon, and TikTok Shop, analyze margins, and track competition. This hands-on tutorial walks you through the complete implementation with Python, from architecture design to a working tool you can deploy today.

Architecture Overview

┌──────────────┐     ┌──────────────┐     ┌──────────────┐
│  Data Source  │ ──→ │  Processor   │ ──→ │   Storage    │
│  (Scraper)   │     │  (Pipeline)  │     │  (Database)  │
└──────────────┘     └──────────────┘     └──────────────┘
       │                     │                     │
       └─────────────────────┼─────────────────────┘
                             │
                    ┌────────┴────────┐
                    │   Dashboard /   │
                    │   API Output    │
                    └─────────────────┘

Prerequisites

# Create project
mkdir building-product-research-tool && cd building-product-research-tool
python -m venv venv && source venv/bin/activate

# Install dependencies
pip install httpx beautifulsoup4 lxml asyncio
pip install fastapi uvicorn  # For API/dashboard
pip install sqlalchemy       # For database
pip install schedule         # For scheduling

Core Implementation

import asyncio
import httpx
import json
import time
from datetime import datetime
from dataclasses import dataclass, asdict
from typing import List, Optional
from bs4 import BeautifulSoup

@dataclass
class ScrapedItem:
    source: str
    title: str
    url: str
    data: dict
    scraped_at: str = ""
    
    def __post_init__(self):
        self.scraped_at = self.scraped_at or datetime.utcnow().isoformat()

class BuildingProductResearchTool:
    """Main application class."""
    
    def __init__(self, proxy_url: Optional[str] = None):
        self.proxy = proxy_url
        self.results: List[ScrapedItem] = []
        self.client_kwargs = {
            'timeout': 30,
            'headers': {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                             'AppleWebKit/537.36 (KHTML, like Gecko) '
                             'Chrome/120.0.0.0 Safari/537.36',
            },
        }
        if proxy_url:
            self.client_kwargs['proxy'] = proxy_url
    
    async def scrape_source(self, url: str) -> List[ScrapedItem]:
        """Scrape data from a single source."""
        async with httpx.AsyncClient(**self.client_kwargs) as client:
            response = await client.get(url)
            
            if response.status_code != 200:
                print(f"Error {response.status_code} for {url}")
                return []
            
            soup = BeautifulSoup(response.text, 'lxml')
            items = self._parse_page(soup, url)
            self.results.extend(items)
            return items
    
    def _parse_page(self, soup: BeautifulSoup, source_url: str) -> List[ScrapedItem]:
        """Parse HTML and extract items. Customize for your target."""
        items = []
        
        for element in soup.select('article, .item, .listing, .result'):
            title_el = element.select_one('h2, h3, .title, .name')
            link_el = element.select_one('a[href]')
            
            if title_el:
                items.append(ScrapedItem(
                    source=source_url,
                    title=title_el.get_text(strip=True),
                    url=link_el['href'] if link_el else '',
                    data={},
                ))
        
        return items
    
    async def run(self, sources: List[str], concurrency: int = 5):
        """Run the scraper across multiple sources."""
        semaphore = asyncio.Semaphore(concurrency)
        
        async def scrape_with_limit(url):
            async with semaphore:
                return await self.scrape_source(url)
        
        tasks = [scrape_with_limit(url) for url in sources]
        await asyncio.gather(*tasks, return_exceptions=True)
        
        print(f"Scraped {len(self.results)} items from {len(sources)} sources")
        return self.results
    
    def export_json(self, filename: str = "output.json"):
        """Export results to JSON."""
        with open(filename, 'w') as f:
            json.dump([asdict(r) for r in self.results], f, indent=2)
        print(f"Exported {len(self.results)} items to {filename}")
    
    def export_csv(self, filename: str = "output.csv"):
        """Export results to CSV."""
        import csv
        if not self.results:
            return
        
        with open(filename, 'w', newline='') as f:
            writer = csv.DictWriter(f, fieldnames=asdict(self.results[0]).keys())
            writer.writeheader()
            for item in self.results:
                writer.writerow(asdict(item))
        print(f"Exported {len(self.results)} items to {filename}")

Database Storage

from sqlalchemy import create_engine, Column, String, JSON, DateTime
from sqlalchemy.orm import declarative_base, Session
from datetime import datetime

Base = declarative_base()

class StoredItem(Base):
    __tablename__ = 'items'
    
    id = Column(String, primary_key=True)
    source = Column(String, index=True)
    title = Column(String)
    url = Column(String, unique=True)
    data = Column(JSON)
    scraped_at = Column(DateTime, default=datetime.utcnow)
    updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

class Storage:
    def __init__(self, db_url='sqlite:///scraped_data.db'):
        self.engine = create_engine(db_url)
        Base.metadata.create_all(self.engine)
    
    def save_items(self, items: List[ScrapedItem]):
        with Session(self.engine) as session:
            saved = 0
            for item in items:
                existing = session.query(StoredItem).filter_by(url=item.url).first()
                if not existing:
                    db_item = StoredItem(
                        id=item.url,
                        source=item.source,
                        title=item.title,
                        url=item.url,
                        data=item.data,
                    )
                    session.add(db_item)
                    saved += 1
                else:
                    existing.data = item.data
                    existing.updated_at = datetime.utcnow()
            
            session.commit()
            print(f"Saved {saved} new items, updated {len(items) - saved} existing")

API Endpoint

from fastapi import FastAPI, Query
from fastapi.responses import JSONResponse

app = FastAPI(title="Building a Product Research Tool for Dropshipping")

storage = Storage()

@app.get("/api/items")
async def get_items(
    source: Optional[str] = None,
    limit: int = Query(default=50, le=500),
    offset: int = 0,
):
    with Session(storage.engine) as session:
        query = session.query(StoredItem)
        if source:
            query = query.filter(StoredItem.source == source)
        
        total = query.count()
        items = query.offset(offset).limit(limit).all()
        
        return {
            "total": total,
            "items": [
                {
                    "title": item.title,
                    "url": item.url,
                    "source": item.source,
                    "data": item.data,
                    "scraped_at": item.scraped_at.isoformat(),
                }
                for item in items
            ],
        }

@app.post("/api/scrape")
async def trigger_scrape(sources: List[str]):
    scraper = BuildingProductResearchTool(
        proxy_url="http://user:pass@proxy.example.com:8080"
    )
    results = await scraper.run(sources)
    storage.save_items(results)
    return {"scraped": len(results)}

# Run: uvicorn main:app --reload

Scheduling

import schedule
import time

def scheduled_scrape():
    sources = [
        'https://example.com/listings',
        'https://example.com/products',
    ]
    
    scraper = BuildingProductResearchTool(
        proxy_url="http://user:pass@proxy.example.com:8080"
    )
    results = asyncio.run(scraper.run(sources))
    
    storage = Storage()
    storage.save_items(results)

# Run every hour
schedule.every(1).hours.do(scheduled_scrape)

while True:
    schedule.run_pending()
    time.sleep(60)

Deployment with Docker

FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# docker-compose.yml
version: '3.8'
services:
  app:
    build: .
    ports:
      - "8000:8000"
    environment:
      - PROXY_URL=http://user:pass@proxy.example.com:8080
      - DATABASE_URL=sqlite:///data/scraped.db
    volumes:
      - ./data:/app/data
  
  scheduler:
    build: .
    command: python scheduler.py
    environment:
      - PROXY_URL=http://user:pass@proxy.example.com:8080
      - DATABASE_URL=sqlite:///data/scraped.db
    volumes:
      - ./data:/app/data

Internal Links

FAQ

What proxy type works best for this tool?

For scraping public websites and directories, datacenter proxies offer the best cost-to-performance ratio. If targets have anti-bot protection, use residential proxies. Start with a small proxy pool and scale based on your needs.

How do I handle websites that change their HTML structure?

Build robust selectors using multiple fallback patterns. Monitor data quality metrics — when extraction rates drop, it signals a structure change. Use CSS selectors with data attributes when available, as they are more stable than class names.

Can I run this tool on a schedule?

Yes. Use Python’s schedule library for simple scheduling, or cron jobs on Linux, or deploy to a cloud service with built-in scheduling (AWS Lambda + EventBridge, Google Cloud Scheduler). Docker Compose makes it easy to run both the API and scheduler.

How do I avoid getting blocked?

Rotate user agents, use residential proxies for protected sites, respect robots.txt, add delays between requests (1-3 seconds), and avoid scraping during peak hours. Start slow and increase rate gradually while monitoring for blocks.

What database should I use for storing scraped data?

SQLite works for small projects (< 1M records). PostgreSQL handles larger datasets with better concurrent access. For time-series data (price tracking), consider TimescaleDB. For document-like data, MongoDB works well.


Related Reading

Scroll to Top