前面的爬虫都是同步的——发一个请求等着回来,再发下一个。遇到大规模采集时,同步 IO 等在网络上的时间占了 90%,CPU 一直在空闲。
用aiohttp+asyncio做异步爬虫,同时发出几十个请求,总时间从几小时压缩到十几分钟。
一、同步 vs 异步 的核心区别
# 同步:一个一个来,总共 10 秒# 请求1 → 等1秒 → 请求2 → 等1秒 → ... → 请求10 → 等1秒# 总时间 = 10秒# 异步:同时发出,总共 1 秒# 请求1 → 等1秒 → 返回# 请求2 → 等1秒 → 返回# ... (1秒后全部返回)# 请求10 → 等1秒 → 返回# 总时间 ≈ 1秒异步适合 IO 密集型任务(网络请求、文件读写),不适合 CPU 密集型(图片处理、数据计算)。
二、aiohttp 基础
1. 安装
pipinstallaiohttp2. 最简单的异步请求
importaiohttpimportasyncioasyncdeffetch(url):"""异步请求一个 URL"""asyncwithaiohttp.ClientSession()assession:asyncwithsession.get(url)asresp:# 返回文本内容returnawaitresp.text()# 执行html=asyncio.run(fetch("https://example.com"))print(html[:200])3. 并发请求多个 URL
importaiohttpimportasyncioimporttimeasyncdeffetch_one(session,url):"""单个请求"""try:asyncwithsession.get(url,timeout=10)asresp:returnawaitresp.text()exceptExceptionase:returnf"请求失败:{e}"asyncdeffetch_all(urls):"""并发请求所有 URL"""asyncwithaiohttp.ClientSession()assession:tasks=[fetch_one(session,url)forurlinurls]results=awaitasyncio.gather(*tasks)returnresults# 使用urls=[f"https://example.com/page/{i}"foriinrange(1,21)]start=time.time()results=asyncio.run(fetch_all(urls))print(f"总耗时:{time.time()-start:.2f}秒")print(f"共获取{len(results)}个页面")同步写法跑 20 个页面要 20 秒以上,异步跑大约 1-2 秒(取决于网络)。
三、控制并发数——信号量
如果不控制并发,一下子发出几百个请求,很可能被网站封 IP 或自己电脑连接数不够。
importaiohttpimportasyncioclassAsyncCrawler:"""带并发控制的异步爬虫"""def__init__(self,max_concurrency=10):# 信号量:控制最大并发数self.semaphore=asyncio.Semaphore(max_concurrency)self.results=[]asyncdeffetch(self,session,url):"""带并发限制的请求"""asyncwithself.semaphore:# 超过 max_concurrency 会等待try:asyncwithsession.get(url,timeout=10)asresp:text=awaitresp.text()print(f"完成:{url}({len(text)}字符)")return(url,text)exceptExceptionase:print(f"失败:{url}-{e}")return(url,None)asyncdefcrawl(self,urls):"""批量爬取"""asyncwithaiohttp.ClientSession()assession:tasks=[self.fetch(session,url)forurlinurls]self.results=awaitasyncio.gather(*tasks)returnself.resultsdefsave_results(self,filename="results.json"):"""保存结果"""importjson data=[]forurl,contentinself.results:ifcontent:data.append({"url":url,"length":len(content)})withopen(filename,"w",encoding="utf-8")asf:json.dump(data,f,ensure_ascii=False,indent=2)print(f"已保存{len(data)}条结果到{filename}")# 使用:同时最多 10 个请求crawler=AsyncCrawler(max_concurrency=10)urls=[f"https://example.com/page/{i}"foriinrange(1,101)]importtime start=time.time()results=asyncio.run(crawler.crawl(urls))print(f"总耗时:{time.time()-start:.2f}s")crawler.save_results()四、异步爬取 + 解析
可以用asyncio.Queue做生产者-消费者模式:
importaiohttpimportasynciofrombs4importBeautifulSoupasyncdefworker(name,queue,session,results):"""消费者:从队列取 URL 并爬取"""whileTrue:url=awaitqueue.get()try:asyncwithsession.get(url)asresp:html=awaitresp.text()# 解析soup=BeautifulSoup(html,"html.parser")title=soup.title.stringifsoup.titleelse"无标题"results.append({"url":url,"title":title})print(f"[{name}] 完成:{url}→{title}")exceptExceptionase:print(f"[{name}] 失败:{url}-{e}")finally:queue.task_done()asyncdefmain(urls,concurrency=10):"""主入口:生产者+消费者模式"""queue=asyncio.Queue()results=[]# 生产者:往队列放 URLforurlinurls:awaitqueue.put(url)asyncwithaiohttp.ClientSession()assession:# 创建 N 个消费者协程workers=[asyncio.create_task(worker(f"worker-{i}",queue,session,results))foriinrange(concurrency)]# 等待队列处理完毕awaitqueue.join()# 取消所有 workerforwinworkers:w.cancel()returnresults# 使用urls=[f"https://example.com/page/{i}"foriinrange(1,51)]results=asyncio.run(main(urls,concurrency=10))print(f"\n共爬取{len(results)}个页面")forrinresults[:5]:print(f"{r['url']}→{r['title']}")五、超时与重试
1. 设置超时
asyncdeffetch_with_timeout(session,url):"""带超时的请求"""try:# 总超时30秒,连接超时10秒timeout=aiohttp.ClientTimeout(total=30,connect=10)asyncwithsession.get(url,timeout=timeout)asresp:returnawaitresp.text()exceptasyncio.TimeoutError:print(f"超时:{url}")returnNone2. 自动重试
asyncdeffetch_with_retry(session,url,max_retries=3):"""带重试的请求"""forattemptinrange(max_retries):try:asyncwithsession.get(url,timeout=10)asresp:ifresp.status==200:returnawaitresp.text()else:print(f"状态码异常{resp.status}:{url}")exceptExceptionase:print(f"第{attempt+1}次失败:{url}-{e}")awaitasyncio.sleep(2**attempt)# 指数退避:1s、2s、4sreturnNone六、异步 + 代理
asyncdeffetch_with_proxy(session,url,proxy):"""使用代理"""try:asyncwithsession.get(url,proxy=proxy,timeout=10)asresp:returnawaitresp.text()exceptExceptionase:print(f"代理{proxy}请求失败:{e}")returnNoneasyncdefcrawl_with_proxies(urls,proxies):"""使用代理池并发爬取"""asyncwithaiohttp.ClientSession()assession:tasks=[]fori,urlinenumerate(urls):proxy=proxies[i%len(proxies)]tasks.append(fetch_with_proxy(session,url,proxy))returnawaitasyncio.gather(*tasks)七、异步爬虫的最佳实践
并发数设置
10 个并发 → 阿里云等大网站基本没压力 20 个并发 → 多数小网站也扛得住 50 个并发 → 可能触发反爬 100+ 个并发 → 被 ban 概率极高,且本地连接数可能不够用建议从 5-10 个并发开始,慢慢往上加。
完整模板
importaiohttpimportasyncioimporttimefromtypingimportList,DictclassBaseAsyncCrawler:"""异步爬虫基类"""def__init__(self,max_concurrency=10,delay=0):self.max_concurrency=max_concurrency self.delay=delay# 请求间隔(秒)self.semaphore=asyncio.Semaphore(max_concurrency)self.session=Noneasyncdef__aenter__(self):self.session=aiohttp.ClientSession()returnselfasyncdef__aexit__(self,*args):awaitself.session.close()asyncdeffetch(self,url:str)->str:"""单个请求"""asyncwithself.semaphore:try:asyncwithself.session.get(url,timeout=10)asresp:ifself.delay:awaitasyncio.sleep(self.delay)returnawaitresp.text()exceptExceptionase:print(f"请求失败{url}:{e}")return""asyncdefcrawl(self,urls:List[str])->List[str]:"""批量爬取"""tasks=[self.fetch(url)forurlinurls]returnawaitasyncio.gather(*tasks)# 使用asyncdefmain():urls=[f"https://example.com/page/{i}"foriinrange(10)]asyncwithBaseAsyncCrawler(max_concurrency=5)ascrawler:start=time.time()results=awaitcrawler.crawl(urls)print(f"完成{len(results)}个请求,耗时{time.time()-start:.2f}s")asyncio.run(main())八、异步 vs 多线程 怎么选
| 对比 | 异步(aiohttp) | 多线程(requests+ThreadPool) |
|---|---|---|
| 性能 | ✅ 极高,几千并发没问题 | ❌ 受限于 GIL 和线程切换 |
| 代码 | ⭐⭐ 需要 async/await 语法 | ⭐ 简单,不用学新语法 |
| 调试 | ⭐⭐ 稍麻烦 | ⭐ 容易 |
| 适用 | 大规模采集(上万条) | 中小规模(几千条) |
建议:
- 爬几千条数据,用
requests + ThreadPoolExecutor就够了 - 爬几万条以上,上
aiohttp异步 - 不要为了异步而异步,简单够用优先
💡 觉得有用的话,点赞 + 关注【张老师技术栈】吧!每周更新 Java/Python/爬虫 实战干货,不让你白来。