Creating a Social Media Data Collector

Creating a Social Media Data Collector

Social media data collection powers market research, brand monitoring, sentiment analysis, and competitive intelligence. Building a unified collector that handles multiple platforms — each with different APIs, rate limits, and anti-bot measures — requires careful architecture with proxy rotation at its core.

What We Collect

  • Posts/content — text, media URLs, timestamps, engagement counts
  • Profiles — bios, follower counts, posting frequency
  • Comments — replies, threads, conversation context
  • Hashtags/trends — trending topics, hashtag volume over time
  • Engagement metrics — likes, shares, views, comment counts

Architecture

Data Collector Service
├── Platform Adapters
│   ├── Twitter/X Adapter (API + scraping fallback)
│   ├── Instagram Adapter (web scraping)
│   ├── LinkedIn Adapter (web scraping)
│   ├── Reddit Adapter (API)
│   └── TikTok Adapter (web scraping)
├── Proxy Manager (rotation per platform)
├── Rate Limiter (per-platform limits)
├── Storage (PostgreSQL/SQLite)
└── Export (CSV, JSON, API)

Core Framework

import asyncio
import httpx
import json
import time
import hashlib
import sqlite3
from abc import ABC, abstractmethod
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Optional, Any
from datetime import datetime
from enum import Enum
import logging

logger = logging.getLogger(__name__)

class Platform(Enum):
    TWITTER = "twitter"
    INSTAGRAM = "instagram"
    REDDIT = "reddit"
    LINKEDIN = "linkedin"
    TIKTOK = "tiktok"

@dataclass
class SocialPost:
    platform: str
    post_id: str
    author: str
    author_id: str = ""
    text: str = ""
    media_urls: List[str] = field(default_factory=list)
    likes: int = 0
    comments: int = 0
    shares: int = 0
    views: int = 0
    timestamp: str = ""
    url: str = ""
    hashtags: List[str] = field(default_factory=list)
    mentions: List[str] = field(default_factory=list)
    raw_data: Dict = field(default_factory=dict)

@dataclass
class SocialProfile:
    platform: str
    username: str
    user_id: str = ""
    display_name: str = ""
    bio: str = ""
    followers: int = 0
    following: int = 0
    post_count: int = 0
    verified: bool = False
    avatar_url: str = ""
    url: str = ""
    collected_at: str = ""

@dataclass
class CollectionTask:
    platform: Platform
    task_type: str  # "posts", "profile", "search", "hashtag"
    target: str     # username, search query, or hashtag
    max_results: int = 100
    since: Optional[str] = None  # ISO timestamp


class PlatformAdapter(ABC):
    """Base class for platform-specific data collection."""

    def __init__(self, proxies: List[str], rate_limit_rps: float = 1.0):
        self.proxies = proxies
        self.proxy_index = 0
        self.rate_limit_rps = rate_limit_rps
        self.last_request_time = 0

    def _get_proxy(self) -> str:
        proxy = self.proxies[self.proxy_index % len(self.proxies)]
        self.proxy_index += 1
        return proxy

    async def _rate_limit(self):
        now = time.monotonic()
        min_interval = 1.0 / self.rate_limit_rps
        elapsed = now - self.last_request_time
        if elapsed < min_interval:
            await asyncio.sleep(min_interval - elapsed)
        self.last_request_time = time.monotonic()

    async def _fetch(
        self,
        url: str,
        headers: dict = None,
        params: dict = None,
    ) -> httpx.Response:
        await self._rate_limit()
        proxy = self._get_proxy()

        default_headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
                         'AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/json',
            'Accept-Language': 'en-US,en;q=0.9',
        }
        if headers:
            default_headers.update(headers)

        async with httpx.AsyncClient(
            proxy=proxy,
            timeout=30,
            follow_redirects=True,
        ) as client:
            response = await client.get(
                url, headers=default_headers, params=params,
            )

        return response

    @abstractmethod
    async def get_posts(
        self, username: str, count: int = 20
    ) -> List[SocialPost]:
        pass

    @abstractmethod
    async def get_profile(self, username: str) -> Optional[SocialProfile]:
        pass

    @abstractmethod
    async def search(
        self, query: str, count: int = 50
    ) -> List[SocialPost]:
        pass


class RedditAdapter(PlatformAdapter):
    """Reddit data collection via public JSON API."""

    def __init__(self, proxies: List[str]):
        super().__init__(proxies, rate_limit_rps=0.5)

    async def get_posts(
        self, subreddit: str, count: int = 25, sort: str = "hot"
    ) -> List[SocialPost]:
        posts = []
        after = None

        while len(posts) < count:
            params = {"limit": min(100, count - len(posts)), "raw_json": 1}
            if after:
                params["after"] = after

            try:
                response = await self._fetch(
                    f"https://www.reddit.com/r/{subreddit}/{sort}.json",
                    params=params,
                )

                if response.status_code != 200:
                    logger.warning(f"Reddit returned {response.status_code}")
                    break

                data = response.json()
                children = data.get("data", {}).get("children", [])

                if not children:
                    break

                for child in children:
                    d = child.get("data", {})
                    post = SocialPost(
                        platform="reddit",
                        post_id=d.get("id", ""),
                        author=d.get("author", ""),
                        text=d.get("title", "") + "\n" + d.get("selftext", ""),
                        likes=d.get("ups", 0),
                        comments=d.get("num_comments", 0),
                        shares=0,
                        timestamp=datetime.fromtimestamp(
                            d.get("created_utc", 0)
                        ).isoformat(),
                        url=f"https://reddit.com{d.get('permalink', '')}",
                        hashtags=[],
                        raw_data=d,
                    )

                    if d.get("url") and not d["url"].startswith("https://www.reddit.com"):
                        post.media_urls.append(d["url"])

                    posts.append(post)

                after = data.get("data", {}).get("after")
                if not after:
                    break

            except Exception as e:
                logger.error(f"Reddit fetch error: {e}")
                break

        return posts[:count]

    async def get_profile(self, username: str) -> Optional[SocialProfile]:
        try:
            response = await self._fetch(
                f"https://www.reddit.com/user/{username}/about.json",
                params={"raw_json": 1},
            )

            if response.status_code != 200:
                return None

            data = response.json().get("data", {})
            return SocialProfile(
                platform="reddit",
                username=username,
                user_id=data.get("id", ""),
                display_name=data.get("subreddit", {}).get("title", username),
                bio=data.get("subreddit", {}).get("public_description", ""),
                followers=data.get("subreddit", {}).get("subscribers", 0),
                post_count=data.get("link_karma", 0),
                verified=data.get("verified", False),
                avatar_url=data.get("icon_img", ""),
                url=f"https://reddit.com/user/{username}",
                collected_at=datetime.utcnow().isoformat(),
            )
        except Exception as e:
            logger.error(f"Reddit profile error: {e}")
            return None

    async def search(
        self, query: str, count: int = 50
    ) -> List[SocialPost]:
        posts = []
        try:
            response = await self._fetch(
                "https://www.reddit.com/search.json",
                params={
                    "q": query,
                    "limit": min(100, count),
                    "sort": "relevance",
                    "raw_json": 1,
                },
            )

            if response.status_code == 200:
                data = response.json()
                for child in data.get("data", {}).get("children", []):
                    d = child.get("data", {})
                    post = SocialPost(
                        platform="reddit",
                        post_id=d.get("id", ""),
                        author=d.get("author", ""),
                        text=d.get("title", ""),
                        likes=d.get("ups", 0),
                        comments=d.get("num_comments", 0),
                        timestamp=datetime.fromtimestamp(
                            d.get("created_utc", 0)
                        ).isoformat(),
                        url=f"https://reddit.com{d.get('permalink', '')}",
                    )
                    posts.append(post)

        except Exception as e:
            logger.error(f"Reddit search error: {e}")

        return posts[:count]

Unified Collector

class SocialMediaCollector:
    def __init__(self, proxies: List[str], db_path: str = "social_data.db"):
        self.proxies = proxies
        self.db_path = db_path
        self.adapters: Dict[Platform, PlatformAdapter] = {}

        self._init_db()

    def register_adapter(self, platform: Platform, adapter: PlatformAdapter):
        self.adapters[platform] = adapter

    def _init_db(self):
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS posts (
                id TEXT PRIMARY KEY,
                platform TEXT,
                post_id TEXT,
                author TEXT,
                text TEXT,
                likes INTEGER,
                comments INTEGER,
                shares INTEGER,
                views INTEGER,
                timestamp TEXT,
                url TEXT,
                hashtags TEXT,
                collected_at TEXT
            )
        """)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS profiles (
                id TEXT PRIMARY KEY,
                platform TEXT,
                username TEXT,
                display_name TEXT,
                bio TEXT,
                followers INTEGER,
                following INTEGER,
                post_count INTEGER,
                verified INTEGER,
                collected_at TEXT
            )
        """)
        conn.commit()
        conn.close()

    async def collect(self, task: CollectionTask) -> List:
        adapter = self.adapters.get(task.platform)
        if not adapter:
            raise ValueError(f"No adapter for {task.platform}")

        results = []

        if task.task_type == "posts":
            posts = await adapter.get_posts(task.target, task.max_results)
            self._save_posts(posts)
            results = posts

        elif task.task_type == "profile":
            profile = await adapter.get_profile(task.target)
            if profile:
                self._save_profile(profile)
                results = [profile]

        elif task.task_type == "search":
            posts = await adapter.search(task.target, task.max_results)
            self._save_posts(posts)
            results = posts

        print(f"Collected {len(results)} items from {task.platform.value}")
        return results

    async def collect_batch(self, tasks: List[CollectionTask]) -> Dict:
        all_results = {}
        for task in tasks:
            try:
                results = await self.collect(task)
                key = f"{task.platform.value}_{task.task_type}_{task.target}"
                all_results[key] = results
            except Exception as e:
                logger.error(f"Task failed: {e}")
                all_results[f"error_{task.target}"] = str(e)

        return all_results

    def _save_posts(self, posts: List[SocialPost]):
        conn = sqlite3.connect(self.db_path)
        for post in posts:
            unique_id = f"{post.platform}_{post.post_id}"
            conn.execute(
                """INSERT OR REPLACE INTO posts
                (id, platform, post_id, author, text, likes, comments,
                 shares, views, timestamp, url, hashtags, collected_at)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
                (
                    unique_id, post.platform, post.post_id, post.author,
                    post.text, post.likes, post.comments, post.shares,
                    post.views, post.timestamp, post.url,
                    json.dumps(post.hashtags),
                    datetime.utcnow().isoformat(),
                )
            )
        conn.commit()
        conn.close()

    def _save_profile(self, profile: SocialProfile):
        conn = sqlite3.connect(self.db_path)
        unique_id = f"{profile.platform}_{profile.username}"
        conn.execute(
            """INSERT OR REPLACE INTO profiles
            (id, platform, username, display_name, bio, followers,
             following, post_count, verified, collected_at)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
            (
                unique_id, profile.platform, profile.username,
                profile.display_name, profile.bio, profile.followers,
                profile.following, profile.post_count, int(profile.verified),
                profile.collected_at,
            )
        )
        conn.commit()
        conn.close()

    def export_json(self, output_path: str):
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        posts = [dict(row) for row in conn.execute("SELECT * FROM posts")]
        profiles = [dict(row) for row in conn.execute("SELECT * FROM profiles")]
        conn.close()

        with open(output_path, 'w') as f:
            json.dump(
                {"posts": posts, "profiles": profiles},
                f, indent=2,
            )
        print(f"Exported to {output_path}")

    def export_csv(self, output_path: str):
        import csv
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        posts = [dict(row) for row in conn.execute("SELECT * FROM posts")]
        conn.close()

        if posts:
            with open(output_path, 'w', newline='') as f:
                writer = csv.DictWriter(f, fieldnames=posts[0].keys())
                writer.writeheader()
                writer.writerows(posts)
            print(f"Exported {len(posts)} posts to {output_path}")


# Usage
async def main():
    proxies = [
        "http://user:pass@proxy1.example.com:8080",
        "http://user:pass@proxy2.example.com:8080",
    ]

    collector = SocialMediaCollector(proxies)
    collector.register_adapter(
        Platform.REDDIT,
        RedditAdapter(proxies),
    )

    tasks = [
        CollectionTask(
            platform=Platform.REDDIT,
            task_type="posts",
            target="webdev",
            max_results=50,
        ),
        CollectionTask(
            platform=Platform.REDDIT,
            task_type="search",
            target="web scraping proxy",
            max_results=25,
        ),
        CollectionTask(
            platform=Platform.REDDIT,
            task_type="profile",
            target="spez",
        ),
    ]

    results = await collector.collect_batch(tasks)

    for key, data in results.items():
        print(f"{key}: {len(data) if isinstance(data, list) else data}")

    collector.export_json("social_data.json")
    collector.export_csv("social_posts.csv")

asyncio.run(main())

Internal Links

FAQ

Is social media data collection legal?

Publicly available data is generally accessible, but check each platform’s Terms of Service. Use official APIs when available (Reddit, Twitter). Avoid collecting private or personally identifiable information without consent. GDPR and similar regulations apply to data containing personal information.

Why do social media sites need proxies the most?

Social platforms have the most aggressive anti-bot systems. They track request patterns, fingerprint browsers, and rate-limit aggressively by IP. Without proxies, you get blocked after a few dozen requests. Residential proxies are strongly recommended for social media.

How do I handle platform API rate limits?

Each platform has different limits. Reddit allows 60 requests per minute with OAuth. Twitter’s free API tier is very limited. Instagram has no public API for most data. Build per-platform rate limiters and respect published limits to avoid account bans.

Should I use APIs or web scraping?

Use APIs when available — they are faster, more reliable, and less likely to break. Fall back to web scraping when APIs do not provide the data you need (Instagram public profiles, TikTok videos). The adapter pattern in this code supports both approaches per platform.

How do I store large amounts of social media data?

SQLite handles up to a few million posts. For larger datasets, use PostgreSQL with proper indexing on platform, author, and timestamp columns. For full-text search across posts, add a search index with Elasticsearch or PostgreSQL’s built-in full-text search.


Related Reading

Scroll to Top