Creating a Social Media Data Collector
Social media data collection powers market research, brand monitoring, sentiment analysis, and competitive intelligence. Building a unified collector that handles multiple platforms — each with different APIs, rate limits, and anti-bot measures — requires careful architecture with proxy rotation at its core.
What We Collect
- Posts/content — text, media URLs, timestamps, engagement counts
- Profiles — bios, follower counts, posting frequency
- Comments — replies, threads, conversation context
- Hashtags/trends — trending topics, hashtag volume over time
- Engagement metrics — likes, shares, views, comment counts
Architecture
Data Collector Service
├── Platform Adapters
│ ├── Twitter/X Adapter (API + scraping fallback)
│ ├── Instagram Adapter (web scraping)
│ ├── LinkedIn Adapter (web scraping)
│ ├── Reddit Adapter (API)
│ └── TikTok Adapter (web scraping)
├── Proxy Manager (rotation per platform)
├── Rate Limiter (per-platform limits)
├── Storage (PostgreSQL/SQLite)
└── Export (CSV, JSON, API)Core Framework
import asyncio
import httpx
import json
import time
import hashlib
import sqlite3
from abc import ABC, abstractmethod
from dataclasses import dataclass, field, asdict
from typing import List, Dict, Optional, Any
from datetime import datetime
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class Platform(Enum):
TWITTER = "twitter"
INSTAGRAM = "instagram"
REDDIT = "reddit"
LINKEDIN = "linkedin"
TIKTOK = "tiktok"
@dataclass
class SocialPost:
platform: str
post_id: str
author: str
author_id: str = ""
text: str = ""
media_urls: List[str] = field(default_factory=list)
likes: int = 0
comments: int = 0
shares: int = 0
views: int = 0
timestamp: str = ""
url: str = ""
hashtags: List[str] = field(default_factory=list)
mentions: List[str] = field(default_factory=list)
raw_data: Dict = field(default_factory=dict)
@dataclass
class SocialProfile:
platform: str
username: str
user_id: str = ""
display_name: str = ""
bio: str = ""
followers: int = 0
following: int = 0
post_count: int = 0
verified: bool = False
avatar_url: str = ""
url: str = ""
collected_at: str = ""
@dataclass
class CollectionTask:
platform: Platform
task_type: str # "posts", "profile", "search", "hashtag"
target: str # username, search query, or hashtag
max_results: int = 100
since: Optional[str] = None # ISO timestamp
class PlatformAdapter(ABC):
"""Base class for platform-specific data collection."""
def __init__(self, proxies: List[str], rate_limit_rps: float = 1.0):
self.proxies = proxies
self.proxy_index = 0
self.rate_limit_rps = rate_limit_rps
self.last_request_time = 0
def _get_proxy(self) -> str:
proxy = self.proxies[self.proxy_index % len(self.proxies)]
self.proxy_index += 1
return proxy
async def _rate_limit(self):
now = time.monotonic()
min_interval = 1.0 / self.rate_limit_rps
elapsed = now - self.last_request_time
if elapsed < min_interval:
await asyncio.sleep(min_interval - elapsed)
self.last_request_time = time.monotonic()
async def _fetch(
self,
url: str,
headers: dict = None,
params: dict = None,
) -> httpx.Response:
await self._rate_limit()
proxy = self._get_proxy()
default_headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) '
'AppleWebKit/537.36 Chrome/120.0.0.0 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/json',
'Accept-Language': 'en-US,en;q=0.9',
}
if headers:
default_headers.update(headers)
async with httpx.AsyncClient(
proxy=proxy,
timeout=30,
follow_redirects=True,
) as client:
response = await client.get(
url, headers=default_headers, params=params,
)
return response
@abstractmethod
async def get_posts(
self, username: str, count: int = 20
) -> List[SocialPost]:
pass
@abstractmethod
async def get_profile(self, username: str) -> Optional[SocialProfile]:
pass
@abstractmethod
async def search(
self, query: str, count: int = 50
) -> List[SocialPost]:
pass
class RedditAdapter(PlatformAdapter):
"""Reddit data collection via public JSON API."""
def __init__(self, proxies: List[str]):
super().__init__(proxies, rate_limit_rps=0.5)
async def get_posts(
self, subreddit: str, count: int = 25, sort: str = "hot"
) -> List[SocialPost]:
posts = []
after = None
while len(posts) < count:
params = {"limit": min(100, count - len(posts)), "raw_json": 1}
if after:
params["after"] = after
try:
response = await self._fetch(
f"https://www.reddit.com/r/{subreddit}/{sort}.json",
params=params,
)
if response.status_code != 200:
logger.warning(f"Reddit returned {response.status_code}")
break
data = response.json()
children = data.get("data", {}).get("children", [])
if not children:
break
for child in children:
d = child.get("data", {})
post = SocialPost(
platform="reddit",
post_id=d.get("id", ""),
author=d.get("author", ""),
text=d.get("title", "") + "\n" + d.get("selftext", ""),
likes=d.get("ups", 0),
comments=d.get("num_comments", 0),
shares=0,
timestamp=datetime.fromtimestamp(
d.get("created_utc", 0)
).isoformat(),
url=f"https://reddit.com{d.get('permalink', '')}",
hashtags=[],
raw_data=d,
)
if d.get("url") and not d["url"].startswith("https://www.reddit.com"):
post.media_urls.append(d["url"])
posts.append(post)
after = data.get("data", {}).get("after")
if not after:
break
except Exception as e:
logger.error(f"Reddit fetch error: {e}")
break
return posts[:count]
async def get_profile(self, username: str) -> Optional[SocialProfile]:
try:
response = await self._fetch(
f"https://www.reddit.com/user/{username}/about.json",
params={"raw_json": 1},
)
if response.status_code != 200:
return None
data = response.json().get("data", {})
return SocialProfile(
platform="reddit",
username=username,
user_id=data.get("id", ""),
display_name=data.get("subreddit", {}).get("title", username),
bio=data.get("subreddit", {}).get("public_description", ""),
followers=data.get("subreddit", {}).get("subscribers", 0),
post_count=data.get("link_karma", 0),
verified=data.get("verified", False),
avatar_url=data.get("icon_img", ""),
url=f"https://reddit.com/user/{username}",
collected_at=datetime.utcnow().isoformat(),
)
except Exception as e:
logger.error(f"Reddit profile error: {e}")
return None
async def search(
self, query: str, count: int = 50
) -> List[SocialPost]:
posts = []
try:
response = await self._fetch(
"https://www.reddit.com/search.json",
params={
"q": query,
"limit": min(100, count),
"sort": "relevance",
"raw_json": 1,
},
)
if response.status_code == 200:
data = response.json()
for child in data.get("data", {}).get("children", []):
d = child.get("data", {})
post = SocialPost(
platform="reddit",
post_id=d.get("id", ""),
author=d.get("author", ""),
text=d.get("title", ""),
likes=d.get("ups", 0),
comments=d.get("num_comments", 0),
timestamp=datetime.fromtimestamp(
d.get("created_utc", 0)
).isoformat(),
url=f"https://reddit.com{d.get('permalink', '')}",
)
posts.append(post)
except Exception as e:
logger.error(f"Reddit search error: {e}")
return posts[:count]Unified Collector
class SocialMediaCollector:
def __init__(self, proxies: List[str], db_path: str = "social_data.db"):
self.proxies = proxies
self.db_path = db_path
self.adapters: Dict[Platform, PlatformAdapter] = {}
self._init_db()
def register_adapter(self, platform: Platform, adapter: PlatformAdapter):
self.adapters[platform] = adapter
def _init_db(self):
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS posts (
id TEXT PRIMARY KEY,
platform TEXT,
post_id TEXT,
author TEXT,
text TEXT,
likes INTEGER,
comments INTEGER,
shares INTEGER,
views INTEGER,
timestamp TEXT,
url TEXT,
hashtags TEXT,
collected_at TEXT
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS profiles (
id TEXT PRIMARY KEY,
platform TEXT,
username TEXT,
display_name TEXT,
bio TEXT,
followers INTEGER,
following INTEGER,
post_count INTEGER,
verified INTEGER,
collected_at TEXT
)
""")
conn.commit()
conn.close()
async def collect(self, task: CollectionTask) -> List:
adapter = self.adapters.get(task.platform)
if not adapter:
raise ValueError(f"No adapter for {task.platform}")
results = []
if task.task_type == "posts":
posts = await adapter.get_posts(task.target, task.max_results)
self._save_posts(posts)
results = posts
elif task.task_type == "profile":
profile = await adapter.get_profile(task.target)
if profile:
self._save_profile(profile)
results = [profile]
elif task.task_type == "search":
posts = await adapter.search(task.target, task.max_results)
self._save_posts(posts)
results = posts
print(f"Collected {len(results)} items from {task.platform.value}")
return results
async def collect_batch(self, tasks: List[CollectionTask]) -> Dict:
all_results = {}
for task in tasks:
try:
results = await self.collect(task)
key = f"{task.platform.value}_{task.task_type}_{task.target}"
all_results[key] = results
except Exception as e:
logger.error(f"Task failed: {e}")
all_results[f"error_{task.target}"] = str(e)
return all_results
def _save_posts(self, posts: List[SocialPost]):
conn = sqlite3.connect(self.db_path)
for post in posts:
unique_id = f"{post.platform}_{post.post_id}"
conn.execute(
"""INSERT OR REPLACE INTO posts
(id, platform, post_id, author, text, likes, comments,
shares, views, timestamp, url, hashtags, collected_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
unique_id, post.platform, post.post_id, post.author,
post.text, post.likes, post.comments, post.shares,
post.views, post.timestamp, post.url,
json.dumps(post.hashtags),
datetime.utcnow().isoformat(),
)
)
conn.commit()
conn.close()
def _save_profile(self, profile: SocialProfile):
conn = sqlite3.connect(self.db_path)
unique_id = f"{profile.platform}_{profile.username}"
conn.execute(
"""INSERT OR REPLACE INTO profiles
(id, platform, username, display_name, bio, followers,
following, post_count, verified, collected_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(
unique_id, profile.platform, profile.username,
profile.display_name, profile.bio, profile.followers,
profile.following, profile.post_count, int(profile.verified),
profile.collected_at,
)
)
conn.commit()
conn.close()
def export_json(self, output_path: str):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
posts = [dict(row) for row in conn.execute("SELECT * FROM posts")]
profiles = [dict(row) for row in conn.execute("SELECT * FROM profiles")]
conn.close()
with open(output_path, 'w') as f:
json.dump(
{"posts": posts, "profiles": profiles},
f, indent=2,
)
print(f"Exported to {output_path}")
def export_csv(self, output_path: str):
import csv
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
posts = [dict(row) for row in conn.execute("SELECT * FROM posts")]
conn.close()
if posts:
with open(output_path, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=posts[0].keys())
writer.writeheader()
writer.writerows(posts)
print(f"Exported {len(posts)} posts to {output_path}")
# Usage
async def main():
proxies = [
"http://user:pass@proxy1.example.com:8080",
"http://user:pass@proxy2.example.com:8080",
]
collector = SocialMediaCollector(proxies)
collector.register_adapter(
Platform.REDDIT,
RedditAdapter(proxies),
)
tasks = [
CollectionTask(
platform=Platform.REDDIT,
task_type="posts",
target="webdev",
max_results=50,
),
CollectionTask(
platform=Platform.REDDIT,
task_type="search",
target="web scraping proxy",
max_results=25,
),
CollectionTask(
platform=Platform.REDDIT,
task_type="profile",
target="spez",
),
]
results = await collector.collect_batch(tasks)
for key, data in results.items():
print(f"{key}: {len(data) if isinstance(data, list) else data}")
collector.export_json("social_data.json")
collector.export_csv("social_posts.csv")
asyncio.run(main())Internal Links
- Building a Proxy Rotation Library in Python — proxy rotation
- Building a Rate-Limited Scraper with Asyncio — rate limiting
- How to Scrape Instagram with Proxies — Instagram specifics
- How to Scrape Twitter/X with Proxies — Twitter specifics
- Social Media Proxies Guide — proxy recommendations
FAQ
Is social media data collection legal?
Publicly available data is generally accessible, but check each platform’s Terms of Service. Use official APIs when available (Reddit, Twitter). Avoid collecting private or personally identifiable information without consent. GDPR and similar regulations apply to data containing personal information.
Why do social media sites need proxies the most?
Social platforms have the most aggressive anti-bot systems. They track request patterns, fingerprint browsers, and rate-limit aggressively by IP. Without proxies, you get blocked after a few dozen requests. Residential proxies are strongly recommended for social media.
How do I handle platform API rate limits?
Each platform has different limits. Reddit allows 60 requests per minute with OAuth. Twitter’s free API tier is very limited. Instagram has no public API for most data. Build per-platform rate limiters and respect published limits to avoid account bans.
Should I use APIs or web scraping?
Use APIs when available — they are faster, more reliable, and less likely to break. Fall back to web scraping when APIs do not provide the data you need (Instagram public profiles, TikTok videos). The adapter pattern in this code supports both approaches per platform.
How do I store large amounts of social media data?
SQLite handles up to a few million posts. For larger datasets, use PostgreSQL with proper indexing on platform, author, and timestamp columns. For full-text search across posts, add a search index with Elasticsearch or PostgreSQL’s built-in full-text search.
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a Proxy Rotator in Python: Complete Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Bandwidth Optimization for Proxies: Reduce Costs & Increase Speed
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)
Related Reading
- Build an Anti-Detection Test Suite: Verify Browser Stealth
- Build a News Crawler in Python: Step-by-Step Tutorial
- AJAX Request Interception: Scraping API Calls Directly
- Azure Functions for Serverless Web Scraping: the Complete Guide
- How to Configure Proxies on iPhone and Android
- How to Use Proxies in Node.js (Axios, Fetch, Puppeteer)