Python小红书数据采集终极指南:xhs库完整使用教程与实战应用
Python小红书数据采集终极指南:xhs库完整使用教程与实战应用
【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs
小红书作为国内领先的生活方式分享平台,每天产生海量的用户生成内容,这些数据对市场分析、内容研究和用户洞察具有重要价值。xhs是一个强大的Python爬虫库,专门用于小红书Web端API的封装和数据采集,为开发者提供了一套完整、高效的数据获取解决方案。
🔍 核心关键词与使用场景
核心关键词:小红书数据采集、Python爬虫、xhs库、API封装、内容分析
长尾关键词:小红书笔记采集、用户数据分析、签名机制破解、批量数据抓取、内容趋势分析、竞品监控、社交媒体爬虫
为什么选择xhs库?
xhs库通过智能签名机制和完整的API封装,解决了小红书反爬虫策略带来的挑战。相比传统爬虫方案,它提供了:
- 完整的API覆盖:支持笔记、用户、评论、搜索等全方位数据获取
- 智能签名机制:自动处理复杂的x-s/x-t签名验证
- 稳定可靠:内置重试机制和错误处理
- 易于扩展:模块化设计,支持自定义扩展
🚀 快速安装与环境配置
基础安装
# 使用pip直接安装 pip install xhs # 或从源码安装最新版本 git clone https://gitcode.com/gh_mirrors/xh/xhs cd xhs python setup.py install环境依赖
xhs库需要以下依赖环境:
# 安装Playwright用于签名服务 pip install playwright playwright install # 下载反检测脚本 curl -O https://cdn.jsdelivr.net/gh/requireCool/stealth.min.js/stealth.min.js获取必要凭证
使用xhs库需要小红书的cookie信息,关键字段包括:
| Cookie字段 | 说明 | 获取方式 |
|---|---|---|
a1 | 用户身份标识 | 浏览器登录后从开发者工具获取 |
web_session | 会话信息 | 同上 |
webId | 设备标识 | 同上 |
📊 xhs库核心功能详解
1. 客户端初始化与基础配置
from xhs import XhsClient # 基础初始化 client = XhsClient(cookie="your_cookie_here") # 带签名功能的初始化 def custom_sign(uri, data=None, a1="", web_session=""): # 自定义签名逻辑 return {"x-s": "signature", "x-t": "timestamp"} client = XhsClient(cookie="your_cookie", sign=custom_sign)2. 笔记数据采集
xhs库提供了多种笔记获取方式:
# 根据笔记ID获取笔记详情 note_detail = client.get_note_by_id( note_id="6505318c000000001f03c5a6", xsec_token="your_xsec_token" ) # 从HTML页面解析笔记信息 note_from_html = client.get_note_by_id_from_html( note_id="6505318c000000001f03c5a6", xsec_token="your_xsec_token" ) # 提取笔记中的图片链接 from xhs import help image_urls = help.get_imgs_url_from_note(note_detail)3. 用户数据分析
获取用户信息和内容发布情况:
# 搜索用户 search_results = client.get_user_by_keyword(keyword="美食博主") # 获取用户详细信息 user_info = client.get_user_info(user_id="user_id_here") # 获取用户发布的笔记 user_notes = client.get_user_notes(user_id="user_id_here", cursor="") # 获取用户所有笔记(自动翻页) all_notes = client.get_user_all_notes( user_id="user_id_here", crawl_interval=1 # 爬取间隔(秒) )4. 智能搜索功能
支持多种搜索参数和排序方式:
# 基础关键词搜索 search_results = client.get_note_by_keyword( keyword="旅行攻略", page=1, page_size=20, sort="general", # 综合排序 note_type=0 # 0:全部, 1:图文, 2:视频 ) # 按热度排序搜索 hot_notes = client.get_note_by_keyword( keyword="美妆教程", sort="hotness", # 热度排序 page=1, page_size=15 ) # 按时间排序搜索 new_notes = client.get_note_by_keyword( keyword="健身", sort="time", # 时间排序 page=1, page_size=15 )5. 评论数据获取
分析用户互动情况:
# 获取笔记评论 comments = client.get_note_comments( note_id="note_id_here", cursor="", # 分页游标 xsec_token="your_xsec_token" ) # 获取子评论 sub_comments = client.get_note_sub_comments( note_id="note_id_here", root_comment_id="root_comment_id", num=10, # 获取数量 cursor="" ) # 获取所有评论(自动翻页) all_comments = client.get_note_all_comments( note_id="note_id_here", crawl_interval=1, xsec_token="your_xsec_token" )🛠️ 高级功能:签名服务部署
独立签名服务架构
对于大规模数据采集,建议部署独立的签名服务:
# 签名服务端示例 (xhs-api/app.py) from flask import Flask, request from playwright.sync_api import sync_playwright app = Flask(__name__) def sign(uri, data, a1, web_session): # 使用Playwright模拟浏览器环境进行签名 with sync_playwright() as playwright: browser = playwright.chromium.launch(headless=True) context = browser.new_context() page = context.new_page() page.goto("https://www.xiaohongshu.com") # 设置cookie并获取签名 encrypt_params = page.evaluate( "([url, data]) => window._webmsxyw(url, data)", [uri, data] ) return { "x-s": encrypt_params["X-s"], "x-t": str(encrypt_params["X-t"]) } @app.route("/sign", methods=["POST"]) def sign_endpoint(): data = request.json return sign(data["uri"], data["data"], data["a1"], data["web_session"])Docker部署签名服务
# Dockerfile FROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install -r requirements.txt COPY . . CMD ["python", "app.py"]部署命令:
docker build -t xhs-sign-server . docker run -d -p 5005:5005 xhs-sign-server📈 实战应用场景
场景一:市场趋势分析
import pandas as pd from datetime import datetime, timedelta class MarketAnalyzer: def __init__(self, client): self.client = client def analyze_trends(self, keyword, days=7): """分析关键词趋势""" trends_data = [] for day in range(days): date = datetime.now() - timedelta(days=day) notes = self.client.get_note_by_keyword( keyword=keyword, sort="time", page=1, page_size=50 ) if notes and 'items' in notes: daily_stats = { 'date': date.date(), 'total_notes': len(notes['items']), 'avg_likes': sum(n.get('likes', 0) for n in notes['items']) / len(notes['items']), 'avg_comments': sum(n.get('comments', 0) for n in notes['items']) / len(notes['items']) } trends_data.append(daily_stats) return pd.DataFrame(trends_data)场景二:竞品内容监控
class CompetitorMonitor: def __init__(self, client, competitor_ids): self.client = client self.competitor_ids = competitor_ids def monitor_competitor_content(self): """监控竞品内容发布情况""" competitor_data = {} for user_id in self.competitor_ids: try: user_info = self.client.get_user_info(user_id) user_notes = self.client.get_user_notes(user_id) competitor_data[user_id] = { 'username': user_info.get('nickname'), 'followers': user_info.get('fans'), 'total_notes': len(user_notes.get('notes', [])), 'recent_engagement': self.calculate_engagement(user_notes) } except Exception as e: print(f"Error monitoring user {user_id}: {e}") return competitor_data场景三:内容质量评估
class ContentQualityAnalyzer: def __init__(self, client): self.client = client def analyze_content_quality(self, note_id, xsec_token): """分析内容质量指标""" note_detail = self.client.get_note_by_id(note_id, xsec_token) quality_score = 0 metrics = {} # 计算互动率 engagement_rate = ( note_detail.get('likes', 0) + note_detail.get('comments', 0) * 2 + note_detail.get('collects', 0) * 3 ) / max(note_detail.get('views', 1), 1) metrics['engagement_rate'] = engagement_rate quality_score += engagement_rate * 40 # 内容完整性评分 has_images = len(note_detail.get('image_list', [])) > 0 has_video = note_detail.get('type') == 'video' has_description = bool(note_detail.get('desc', '').strip()) completeness_score = sum([has_images, has_video, has_description]) / 3 * 30 metrics['completeness_score'] = completeness_score quality_score += completeness_score # 时效性评分 publish_time = note_detail.get('time', 0) current_time = datetime.now().timestamp() recency = max(0, 1 - (current_time - publish_time) / (30 * 24 * 3600)) # 30天衰减 metrics['recency_score'] = recency * 30 quality_score += recency * 30 return { 'quality_score': min(100, quality_score), 'metrics': metrics, 'note_detail': note_detail }⚡ 性能优化与最佳实践
1. 并发处理优化
import concurrent.futures from functools import partial class BatchProcessor: def __init__(self, client, max_workers=5): self.client = client self.max_workers = max_workers def batch_get_notes(self, note_ids, xsec_tokens): """批量获取笔记信息""" with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [] for note_id, xsec_token in zip(note_ids, xsec_tokens): future = executor.submit( self.client.get_note_by_id, note_id=note_id, xsec_token=xsec_token ) futures.append(future) results = [] for future in concurrent.futures.as_completed(futures): try: results.append(future.result()) except Exception as e: print(f"Error fetching note: {e}") results.append(None) return results2. 缓存机制实现
import json import os from datetime import datetime, timedelta from hashlib import md5 class DataCache: def __init__(self, cache_dir=".cache", ttl_hours=24): self.cache_dir = cache_dir self.ttl = timedelta(hours=ttl_hours) os.makedirs(cache_dir, exist_ok=True) def get_cache_key(self, func_name, *args, **kwargs): """生成缓存键""" key_str = f"{func_name}:{args}:{kwargs}" return md5(key_str.encode()).hexdigest() def get(self, cache_key): """获取缓存数据""" cache_file = os.path.join(self.cache_dir, f"{cache_key}.json") if os.path.exists(cache_file): with open(cache_file, 'r', encoding='utf-8') as f: cache_data = json.load(f) cache_time = datetime.fromisoformat(cache_data['timestamp']) if datetime.now() - cache_time < self.ttl: return cache_data['data'] return None def set(self, cache_key, data): """设置缓存数据""" cache_file = os.path.join(self.cache_dir, f"{cache_key}.json") cache_data = { 'timestamp': datetime.now().isoformat(), 'data': data } with open(cache_file, 'w', encoding='utf-8') as f: json.dump(cache_data, f, ensure_ascii=False, indent=2)3. 错误处理与重试机制
import time import random from functools import wraps def retry_on_failure(max_retries=3, base_delay=1, max_delay=10): """失败重试装饰器""" def decorator(func): @wraps(func) def wrapper(*args, **kwargs): last_exception = None for attempt in range(max_retries): try: return func(*args, **kwargs) except Exception as e: last_exception = e if attempt < max_retries - 1: # 指数退避策略 delay = min( base_delay * (2 ** attempt) + random.uniform(0, 1), max_delay ) print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s...") time.sleep(delay) raise last_exception return wrapper return decorator # 使用示例 @retry_on_failure(max_retries=3, base_delay=2) def safe_api_call(client, note_id, xsec_token): return client.get_note_by_id(note_id, xsec_token)🔧 配置与调优指南
请求频率控制
class RateLimitedClient: def __init__(self, client, requests_per_minute=30): self.client = client self.requests_per_minute = requests_per_minute self.min_interval = 60 / requests_per_minute self.last_request_time = 0 def rate_limited_request(self, func, *args, **kwargs): """带频率限制的请求""" current_time = time.time() elapsed = current_time - self.last_request_time if elapsed < self.min_interval: sleep_time = self.min_interval - elapsed time.sleep(sleep_time) result = func(*args, **kwargs) self.last_request_time = time.time() return result代理配置
# 配置代理 proxies = { "http": "http://your-proxy:port", "https": "http://your-proxy:port", } client = XhsClient( cookie="your_cookie", proxies=proxies, timeout=30 # 增加超时时间 )📋 常见问题解答(FAQ)
Q1: 如何获取有效的cookie?
A: 在Chrome浏览器中:
- 登录小红书网页版
- 按F12打开开发者工具
- 进入Application/Storage > Cookies > https://www.xiaohongshu.com
- 复制
a1、web_session、webId字段的值
Q2: 签名失败怎么办?
A: 检查以下事项:
- 确保
stealth.min.js文件已正确下载 - 确认cookie中的
a1值与签名服务中的一致 - 适当增加签名时的sleep时间
- 尝试使用独立的签名服务部署
Q3: 如何避免被封IP?
A: 建议采取以下措施:
- 控制请求频率(建议每秒不超过1-2次)
- 使用代理IP轮换
- 实现指数退避重试机制
- 遵守小红书的robots.txt规则
Q4: 数据采集的合法边界是什么?
A: 请遵守以下原则:
- 仅采集公开数据
- 不要绕过平台的安全机制
- 尊重用户隐私和版权
- 不要用于商业侵权或非法用途
- 控制数据采集规模,避免对服务器造成压力
Q5: 如何处理大量数据存储?
A: 建议的数据存储方案:
| 数据类型 | 存储方案 | 优化建议 |
|---|---|---|
| 笔记基本信息 | PostgreSQL/MySQL | 建立索引优化查询 |
| 图片/视频链接 | 对象存储(如S3) | 使用CDN加速访问 |
| 用户关系数据 | Neo4j图数据库 | 优化图遍历查询 |
| 实时监控数据 | Redis/Elasticsearch | 设置合适的TTL |
🎯 总结与最佳实践
xhs库为小红书数据采集提供了一个强大而灵活的解决方案。通过合理的配置和使用,你可以:
- 高效获取数据:利用完整的API封装快速获取所需信息
- 稳定运行:通过签名服务和错误处理机制确保稳定性
- 灵活扩展:模块化设计支持自定义功能扩展
- 合规使用:遵循平台规则,合理控制采集频率
推荐的项目结构
xhs-project/ ├── src/ │ ├── crawlers/ # 爬虫模块 │ ├── processors/ # 数据处理模块 │ ├── utils/ # 工具函数 │ └── config.py # 配置文件 ├── cache/ # 缓存数据 ├── logs/ # 日志文件 ├── requirements.txt # 依赖文件 └── README.md # 项目说明性能监控建议
import logging from datetime import datetime class PerformanceMonitor: def __init__(self): self.logger = logging.getLogger(__name__) self.request_count = 0 self.error_count = 0 self.start_time = datetime.now() def log_request(self, success=True, duration=None): self.request_count += 1 if not success: self.error_count += 1 success_rate = (self.request_count - self.error_count) / self.request_count * 100 self.logger.info( f"请求统计: 总数={self.request_count}, " f"成功率={success_rate:.2f}%, " f"时长={duration}s" if duration else "" )通过合理使用xhs库,你可以构建稳定可靠的小红书数据采集系统,为市场分析、内容研究和业务决策提供有力支持。记得始终遵守平台规则,合理使用数据资源。
【免费下载链接】xhs基于小红书 Web 端进行的请求封装。https://reajason.github.io/xhs/项目地址: https://gitcode.com/gh_mirrors/xh/xhs
创作声明:本文部分内容由AI辅助生成(AIGC),仅供参考
