12_异步编程

12_异步编程

Python 异步编程(async/await)


一、同步 vs 异步

同步的问题

同步代码按顺序执行,遇到 I/O 阻塞时,整个线程干等着。

importtimedeftask_sync(name,seconds):print(f"同步任务{name}开始")time.sleep(seconds)# 模拟 I/O 阻塞print(f"同步任务{name}结束")# 顺序执行:总耗时 = 2 + 3 = 5 秒task_sync("A",2)task_sync("B",3)

异步的解决

异步用await asyncio.sleep()代替time.sleep(),让出控制权给其他协程。

importasyncioasyncdeftask_async(name,seconds):print(f"异步任务{name}开始")awaitasyncio.sleep(seconds)# 非阻塞等待print(f"异步任务{name}结束")asyncdefmain():# 创建两个任务并发执行task_a=asyncio.create_task(task_async("A",2))task_b=asyncio.create_task(task_async("B",3))awaittask_aawaittask_b asyncio.run(main())# 异步总耗时 ≈ 3 秒(取最长),而非 5 秒

核心区别

同步异步
等待方式time.sleep()阻塞整个线程await asyncio.sleep()让出控制权
多个任务顺序执行,总耗时 = 和并发执行,总耗时 ≈ 最长
关键字async def/await

Python 异步编程(async/await)

二、协程基础

2.1 async def — 定义协程函数

importasyncioasyncdefhello():print("hello")awaitasyncio.sleep(1)print("world")print(hello())# <coroutine object hello at ...> 不会立即执行

调用协程函数返回协程对象,必须放入事件循环才能执行。

2.2 await — 等待可等待对象

asyncdefgreet():print("开始打招呼")awaithello()# 等待 hello() 执行完print("打招呼结束")importasyncio asyncio.run(greet())

规则await只能在async def函数内部使用。普通函数里不能写await

2.3 协程的返回值

asyncdefadd(a,b):awaitasyncio.sleep(0.5)returna+basyncdefmain():result=awaitadd(3,5)print("相加结果:",result)# 8asyncio.run(main())

Python 异步编程(async/await)


三、事件循环与运行方式

3.1 asyncio.run() — 推荐方式

import asyncio

pythonpython
async def main():
await asyncio.sleep(1)
print(“完成”)

asyncio.run(main()) # 创建事件循环,运行协程,关闭循环

### 3.2 手动管理事件循环(旧方式) 极少使用,但了解即可: ```python import asyncio loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(main()) loop.close() ``````python loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) loop.run_until_complete(main()) loop.close()

3.3 create_task — 并发执行

importasyncioasyncdefsay_after(delay,what):awaitasyncio.sleep(delay)print(what)asyncdefmain():task1=asyncio.create_task(say_after(1,"任务一"))task2=asyncio.create_task(say_after(1,"任务二"))awaittask1awaittask2# 两个任务并发执行,总耗时 ≈ 1 秒asyncio.run(main())

asyncio.create_task()将协程包装成 Task 对象,立即调度执行,不等待。


Python 异步编程(async/await)


四、并发执行工具

4.1 asyncio.gather() — 并发等所有结果

importasyncio```pythonasyncdeftask(name,delay):awaitasyncio.sleep(delay)returnf"{name}的结果"asyncdefdemo():results=awaitasyncio.gather(task("A",2),task("B",1),task("C",3),)print(results)# ['A的结果', 'B的结果', 'C的结果'] 保持顺序asyncio.run(demo())

适用场景:需要等所有任务完成,且关心返回值。

4.2 asyncio.wait() — 按条件等待

importasyncioasyncdeftask(name,delay):awaitasyncio.sleep(delay)print(f"任务{name}完成")asyncdefmain():tasks=[asyncio.create_task(task("X",2)),asyncio.create_task(task("Y",1)),]# 等任意一个完成就返回done,pending=awaitasyncio.wait(tasks,return_when=asyncio.FIRST_COMPLETED)print(f"完成:{len(done)}个任务")# 取消未完成的fortinpending:t.cancel()asyncio.run(main())

适用场景:只需要第一个完成的结果,其余可以取消。

4.3 asyncio.as_completed() — 按完成顺序获取

importasyncioasyncdeftask(name,delay):awaitasyncio.sleep(delay)returnf"任务{name}"tasks=[asyncio.create_task(task("P",3)),asyncio.create_task(task("Q",1)),asyncio.create_task(task("R",2)),]forcoroinasyncio.as_completed(tasks):result=awaitcoroprint("完成一个:",result)# 输出顺序:Q → R → P(按完成快慢)

适用场景:需要按完成顺序逐个处理结果。

三种方式对比

方式顺序等多久场景
gather保持传入顺序等全部需要所有结果
wait不保证等到条件满足只需部分结果
as_completed按完成快慢逐个获取流式处理

五、异步上下文管理器

async with管理异步资源(如网络连接)。

5.1 自定义异步上下文管理器

importasyncioclassAsyncResource:asyncdef__aenter__(self):print("异步进入资源")awaitasyncio.sleep(1)returnselfasyncdef__aexit__(self,exc_type,exc_val,exc_tb):print("异步退出资源")awaitasyncio.sleep(0.5)asyncdefdo_work(self):print("工作中...")asyncdefuse_resource():asyncwithAsyncResource()asres:awaitres.do_work()asyncio.run(use_resource())

和同步对比

同步异步
__enter__/__exit____aenter__/__aexit__
withasync with

5.2 http

实际应用中常用aiohttp

importaiohttpasyncdeffetch_url(url):asyncwithaiohttp.ClientSession()assession:asyncwithsession.get(url)asresponse:returnawaitresponse.text()asyncio.run(fetch_url("https://www.baidu.com"))

Python 异步编程(async/await)


六、异步迭代器与异步生成器

6.1 异步迭代器

手写__aiter__+__anext__

importasyncioclassAsyncCounter:def__init__(self,start,end):self.start=start self.end=enddef__aiter__(self):returnselfasyncdef__anext__(self):ifself.start>=self.end:raiseStopAsyncIterationawaitasyncio.sleep(0.5)# 模拟异步操作value=self.start self.start+=1returnvalueasyncdefuse():asyncfornuminAsyncCounter(1,5):print(f"异步迭代得到:{num}")asyncio.run(use())

和同步对比

同步异步
__iter__/__next____aiter__/__anext__
StopIterationStopAsyncIteration
forasync for

6.2 异步生成器

async def+yield,比手写迭代器简洁得多:

importasyncioasyncdefasync_range(start,end):foriinrange(start,end):awaitasyncio.sleep(0.3)yieldiasyncdefuse():asyncforvalinasync_range(10,15):print("异步生成器:",val)asyncio.run(use())

和同步生成器对比

同步生成器异步生成器
defasync def
yieldyield(不变)
forasync for

七、异步队列(生产者消费者)

importasyncio,randomasyncdefproducer(queue,id):foriinrange(3):item=f"生产者{id}的产品{i}"awaitqueue.put(item)print(f"生产者:{item}")awaitasyncio.sleep(random.uniform(0.5,1.5))asyncdefconsumer(queue,id):whileTrue:item=awaitqueue.get()ifitemisNone:# 结束信号queue.task_done()breakprint(f"消费者{id}消费:{item}")awaitasyncio.sleep(random.uniform(0.2,1.0))queue.task_done()asyncdefmain():queue=asyncio.Queue(maxsize=5)producers=[asyncio.create_task(producer(queue,i))foriinrange(2)]consumers=[asyncio.create_task(consumer(queue,i))foriinrange(3)]awaitasyncio.gather(*producers)# 等生产者完成for_inconsumers:awaitqueue.put(None)# 发送结束信号awaitasyncio.gather(*consumers)# 等消费者处理完asyncio.run(main())

关键 API

操作方法
入队await queue.put(item)
出队item = await queue.get()
标记完成queue.task_done()
最大容量asyncio.Queue(maxsize=N)

和同步 queue.Queue 对比

同步异步
queue.put(item)阻塞await queue.put(item)非阻塞
queue.get()阻塞await queue.get()非阻塞

八、异步 TCP 客户端(底层 I/O)

不用第三方库,用标准库asyncio.open_connection发 HTTP 请求:

asyncdeftcp_client(host,port,message):reader,writer=awaitasyncio.open_connection(host,port)writer.write(message.encode())awaitwriter.drain()# 刷新缓冲区data=awaitreader.read(1024)writer.close()awaitwriter.wait_closed()returndata.decode()# 发 HTTP GET 请求request="GET /get HTTP/1.1\r\nHost: httpbin.org\r\nConnection: close\r\n\r\n"response=asyncio.run(tcp_client("httpbin.org",80,request))

学习意义:理解底层writer(发数据)/reader(收数据)机制,P1 V3 的工具调用(Function Calling)本质上也是异步 HTTP 请求。


Python 异步编程(async/await)

九、异步 vs 多线程

async/await多线程(threading)
并发模型协程(单线程)多线程(OS 调度)
切换成本极低(用户态)较高(内核态)
适合场景I/O 密集型I/O 密集型
GIL 限制不受影响受影响(CPU 密集型慢)
调试难度较低较高(竞态条件)

结论:Python 里做 I/O 密集型并发,优先用 async/await。