Python异步IO:asyncio深度解析
Python异步IO:asyncio深度解析
引言
异步编程是现代后端开发的核心技能,它能够显著提升应用程序的性能和并发处理能力。作为一名从Python转向Rust的后端开发者,我在实践中总结了异步编程的最佳实践。本文将深入探讨Python的asyncio库,帮助你掌握异步编程的核心技术。
一、异步编程基础概念
1.1 同步与异步
| 特性 | 同步 | 异步 |
|---|---|---|
| 执行方式 | 阻塞等待 | 非阻塞继续 |
| 资源利用 | 低 | 高 |
| 编程复杂度 | 简单 | 较高 |
| 适用场景 | CPU密集型 | IO密集型 |
1.2 协程概念
协程是一种轻量级的并发编程方式,允许在单个线程中实现多个任务的并发执行。
1.3 asyncio核心组件
- Event Loop:事件循环,负责调度协程
- Coroutine:协程,可暂停的函数
- Task:任务,封装协程的对象
- Future:未来对象,表示异步操作的结果
二、asyncio入门
2.1 基本协程定义
import asyncio async def hello(): print("Hello") await asyncio.sleep(1) print("World") async def main(): await hello() asyncio.run(main())2.2 协程调度
async def task1(): for i in range(3): print(f"Task 1: {i}") await asyncio.sleep(0.5) async def task2(): for i in range(3): print(f"Task 2: {i}") await asyncio.sleep(0.7) async def main(): await asyncio.gather(task1(), task2()) asyncio.run(main())2.3 创建Task
async def coro(): await asyncio.sleep(1) return 42 async def main(): task = asyncio.create_task(coro()) result = await task print(f"Result: {result}") asyncio.run(main())三、并发模式
3.1 并行执行多个任务
async def fetch_data(url): print(f"Fetching {url}") await asyncio.sleep(1) return f"Data from {url}" async def main(): urls = ["https://example.com", "https://google.com", "https://github.com"] tasks = [asyncio.create_task(fetch_data(url)) for url in urls] results = await asyncio.gather(*tasks) for result in results: print(result) asyncio.run(main())3.2 带超时的任务
async def long_running_task(): await asyncio.sleep(5) return "Done" async def main(): try: result = await asyncio.wait_for(long_running_task(), timeout=2) print(result) except asyncio.TimeoutError: print("Task timed out") asyncio.run(main())3.3 任务取消
async def task(): try: while True: print("Running...") await asyncio.sleep(0.5) except asyncio.CancelledError: print("Task cancelled") raise async def main(): t = asyncio.create_task(task()) await asyncio.sleep(2) t.cancel() try: await t except asyncio.CancelledError: print("Main caught cancelled error") asyncio.run(main())四、异步IO操作
4.1 文件操作
async def read_file(filepath): async with asyncio.timeout(10): with open(filepath, 'r') as f: return f.read() async def write_file(filepath, content): async with asyncio.timeout(10): with open(filepath, 'w') as f: f.write(content)4.2 网络请求
import aiohttp async def fetch(session, url): async with session.get(url) as response: return await response.text() async def main(): async with aiohttp.ClientSession() as session: html = await fetch(session, 'https://example.com') print(html[:100]) asyncio.run(main())4.3 数据库操作
import asyncpg async def query_database(): conn = await asyncpg.connect(user='user', password='pass', database='db', host='localhost') values = await conn.fetch('SELECT * FROM users WHERE id = $1', 1) await conn.close() return values asyncio.run(query_database())五、同步代码与异步代码互操作
5.1 同步转异步
import time def blocking_task(): time.sleep(2) return "Blocking result" async def async_wrapper(): loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, blocking_task) return result async def main(): result = await async_wrapper() print(result) asyncio.run(main())5.2 异步转同步
async def async_task(): await asyncio.sleep(1) return "Async result" def sync_wrapper(): return asyncio.run(async_task()) result = sync_wrapper() print(result)5.3 使用线程池
async def cpu_bound_task(data): loop = asyncio.get_event_loop() def process_data(data): # CPU密集型操作 return sum(data) result = await loop.run_in_executor(None, process_data, data) return result六、异步上下文管理器
6.1 自定义异步上下文管理器
class AsyncResource: def __init__(self, name): self.name = name async def __aenter__(self): print(f"Entering {self.name}") await asyncio.sleep(0.5) return self async def __aexit__(self, exc_type, exc_val, exc_tb): print(f"Exiting {self.name}") await asyncio.sleep(0.5) async def main(): async with AsyncResource("database") as resource: print(f"Using {resource.name}") asyncio.run(main())6.2 异步迭代器
class AsyncDataStream: def __init__(self, max_items): self.max_items = max_items self.current = 0 def __aiter__(self): return self async def __anext__(self): if self.current >= self.max_items: raise StopAsyncIteration await asyncio.sleep(0.3) self.current += 1 return self.current async def main(): async for item in AsyncDataStream(5): print(f"Item: {item}") asyncio.run(main())七、异步编程模式
7.1 Producer-Consumer模式
async def producer(queue): for i in range(5): await asyncio.sleep(0.5) await queue.put(i) print(f"Produced: {i}") async def consumer(queue): while True: item = await queue.get() print(f"Consumed: {item}") queue.task_done() async def main(): queue = asyncio.Queue(maxsize=2) producer_task = asyncio.create_task(producer(queue)) consumer_task = asyncio.create_task(consumer(queue)) await producer_task await queue.join() consumer_task.cancel() await consumer_task asyncio.run(main())7.2 异步锁
async def update_counter(lock, counter): async with lock: current = counter['value'] await asyncio.sleep(0.1) counter['value'] = current + 1 async def main(): lock = asyncio.Lock() counter = {'value': 0} tasks = [update_counter(lock, counter) for _ in range(10)] await asyncio.gather(*tasks) print(f"Final counter: {counter['value']}") asyncio.run(main())7.3 信号量控制并发
async def fetch_url(semaphore, url): async with semaphore: print(f"Fetching {url}") await asyncio.sleep(1) return f"Done: {url}" async def main(): semaphore = asyncio.Semaphore(3) urls = [f"https://example.com/{i}" for i in range(10)] tasks = [fetch_url(semaphore, url) for url in urls] results = await asyncio.gather(*tasks) for result in results: print(result) asyncio.run(main())八、实战案例:异步Web爬虫
import asyncio import aiohttp from bs4 import BeautifulSoup async def fetch_page(session, url): try: async with session.get(url, timeout=10) as response: if response.status == 200: return await response.text() return None except Exception as e: print(f"Error fetching {url}: {e}") return None async def parse_page(html): if not html: return [] soup = BeautifulSoup(html, 'html.parser') links = [] for a_tag in soup.find_all('a', href=True): href = a_tag['href'] if href.startswith('http'): links.append(href) return links async def crawl(url, max_depth=3, visited=None, semaphore=None): if visited is None: visited = set() if semaphore is None: semaphore = asyncio.Semaphore(5) if url in visited or max_depth <= 0: return [] visited.add(url) print(f"Crawling: {url} (depth: {max_depth})") async with semaphore: html = await fetch_page(session, url) links = await parse_page(html) tasks = [] for link in links[:10]: if link not in visited: tasks.append(crawl(link, max_depth - 1, visited, semaphore)) results = await asyncio.gather(*tasks) all_links = [url] for result in results: all_links.extend(result) return all_links async def main(): global session async with aiohttp.ClientSession() as session: result = await crawl("https://example.com", max_depth=2) print(f"Total links crawled: {len(result)}") if __name__ == "__main__": asyncio.run(main())总结
异步编程是构建高性能后端系统的关键技术。通过本文的学习,你应该掌握了以下核心要点:
- 异步基础:同步与异步的区别、协程概念
- asyncio核心:Event Loop、Coroutine、Task、Future
- 并发模式:并行执行、超时控制、任务取消
- 异步IO:文件操作、网络请求、数据库操作
- 同步异步互操作:同步转异步、异步转同步、线程池
- 异步上下文管理器:异步with、异步迭代器
- 异步模式:Producer-Consumer、异步锁、信号量
- 实战案例:异步Web爬虫
作为从Python转向Rust的后端开发者,掌握异步编程对于构建高并发系统至关重要。后续文章将深入探讨Rust中的异步运行时Tokio。
