Python 协程
本章节主要讲解 什么是协程?,协程的核心概念,async/await 关键字,协程的状态转换,协程的优势,高效的资源利用,内存占用低,避免线程安全问题,协程的使用场景,网络请求和API调用,文件I/O操作,数据库操作,实时应用和WebSocket。
生活中的例子:洗衣做饭的智慧
想象一下,你回到家准备洗衣服和做饭。如果按照传统的方式:
传统方式(同步):
- 把衣服放进洗衣机,然后站在那里等30分钟直到洗完
- 洗完衣服后,开始做饭,花费40分钟
- 总共用时:30 + 40 = 70分钟
聪明的方式(协程):
- 把衣服放进洗衣机(2分钟)
- 趁洗衣机工作时去做饭(40分钟)
- 做饭期间,洗衣机洗完了,去晾衣服(3分钟)
- 总共用时:大约45分钟
这就是协程的核心思想:在等待的时候,不要闲着,去做其他事情!
什么是协程?
协程(Coroutine)是一种特殊的函数,它可以在执行过程中暂停,让出 CPU 的控制权给其他协程,之后再从暂停的地方继续执行。
用 Python 内置的 asyncio 模块 来表示上面的例子:
import asyncio
async def wash_clothes():
print("🧺 开始洗衣服...")
await asyncio.sleep(30) # 模拟洗衣服需要30秒
print("✅ 衣服洗完了!")
async def cook_dinner():
print("🍳 开始做饭...")
await asyncio.sleep(40) # 模拟做饭需要40秒
print("🍽️ 饭做好了!")
async def main():
# 同时启动洗衣服和做饭
await asyncio.gather(
wash_clothes(),
cook_dinner()
)
# 运行协程
asyncio.run(main())
下面用 生成器函数来实现 2 个协程(生产者、消费组如何协调工作)
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('[C] Consuming %s...' % n)
r = '200 OK'
def produce(c):
c.send(None) # 首次调用,运行到 yield 出返回 r,并不会赋值给 n
n = 0
while n < 5:
n = n + 1
print('[P] Producing %s...' % n)
r = c.send(n)
print('[P] Consumer return: %s' % r)
c.close()
c = consumer()
produce(c)
consumer()
:一个生成器函数,负责“消费”接收到的值并返回响应。- 因为它使用了
yield
关键字。生成器允许函数暂停执行,向调用者返回(yield
)一个值,并在下次被调用时从暂停处继续执行。 n = yield r
会暂停函数并返回 r 的值给调用者。n
接收外部通过send()
方法传入的值。
- 因为它使用了
produce(c)
:一个普通函数,驱动生成器,通过向它发送值并打印结果。c.send(None)
:初始化生成器。对于一个新创建的生成器,必须先调用next()
或send(None)
来启动它,使其执行到第一个yield
语句并暂停。send(None)
的作用是让生成器运行到n = yield r
处,暂停并返回初始的r
(即空字符串 '')。为什么必须是None
?因为生成器刚启动时,还没有值可以赋给n
,所以只能发送None
。
- 就这样,只要生产者生成一个数据,就发送给消费组消费,整个流程无锁,由一个线程执行,
produce
和consumer
协作完成任务,所以称为“协程”,而非线程的抢占式多任务。 - 注意:在实际开发过程中,我们会使用现成的框架,而不是自己来写生成器函数,比如前面的 asyncio 就是现有框架。
协程的核心概念
1. async/await 关键字
async def
定义一个协程函数await
用于等待另一个协程完成,期间可以让出CPU控制权- 这两个关键字,可以让写异步代码像写同步代码一样,非常方便。
2. 协程的状态转换
async def example():
print("开始执行") # 运行状态
await asyncio.sleep(1) # 暂停状态(让出控制权)
print("继续执行") # 恢复运行状态
协程的优势
1. 高效的资源利用
协程可以在一个线程内处理成千上万个并发任务,因为它们在等待I/O操作时会主动让出CPU。
import asyncio
import time
# 传统同步方式
def sync_download(url_id):
print(f"开始下载 {url_id}")
time.sleep(2) # 模拟网络延迟
print(f"下载完成 {url_id}")
# 协程方式
async def async_download(url_id):
print(f"开始下载 {url_id}")
await asyncio.sleep(2) # 模拟网络延迟
print(f"下载完成 {url_id}")
async def test_performance():
start_time = time.time()
# 并发下载5个文件
tasks = [async_download(i) for i in range(5)]
await asyncio.gather(*tasks)
end_time = time.time()
print(f"协程方式用时: {end_time - start_time:.2f}秒")
# 输出约2秒,而同步方式需要10秒
asyncio.run(test_performance())
2. 内存占用低
- 每个线程需要几MB内存
- 每个协程只需要几KB内存
- 可以轻松创建数万个协程
3. 避免线程安全问题
协程在单线程中运行,天然避免了竞态条件和锁的问题。
import asyncio
# 不需要担心线程安全
counter = 0
async def increment():
global counter
await asyncio.sleep(0.1) # 模拟一些异步操作
counter += 1 # 不会有竞态条件
async def main():
tasks = [increment() for _ in range(100)]
await asyncio.gather(*tasks)
print(f"最终计数: {counter}") # 结果是确定的
asyncio.run(main())
协程的使用场景
1. 网络请求和API调用
pip install aiohttp
示例代码
import aiohttp
import asyncio
async def fetch_data(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'http://httpbin.org/delay/1',
'http://httpbin.org/delay/2',
'http://httpbin.org/delay/1'
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_data(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"获取了 {len(results)} 个响应")
# 适用场景:爬虫、API聚合、微服务调用
asyncio.run(main())
2. 文件I/O操作
import aiofiles
import asyncio
async def read_file(filename):
async with aiofiles.open(filename, 'r') as file:
content = await file.read()
return len(content)
async def process_files():
files = ['file1.txt', 'file2.txt', 'file3.txt']
tasks = [read_file(f) for f in files]
sizes = await asyncio.gather(*tasks)
for file, size in zip(files, sizes):
print(f"{file}: {size} 字符")
# 适用场景:批量文件处理、日志分析
3. 数据库操作
import asyncpg
import asyncio
async def query_user(pool, user_id):
async with pool.acquire() as conn:
result = await conn.fetchrow(
"SELECT * FROM users WHERE id = $1", user_id
)
return result
async def batch_query_users(user_ids):
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/db"
)
tasks = [query_user(pool, uid) for uid in user_ids]
users = await asyncio.gather(*tasks)
await pool.close()
return users
# 适用场景:高并发Web应用、数据分析
4. 实时应用和WebSocket
import websockets
import asyncio
async def handle_client(websocket, path):
print(f"新客户端连接: {websocket.remote_address}")
async for message in websocket:
# 处理消息并广播给其他客户端
response = f"收到消息: {message}"
await websocket.send(response)
async def start_server():
server = await websockets.serve(handle_client, "localhost", 8765)
print("WebSocket服务器启动在 ws://localhost:8765")
await server.wait_closed()
# 适用场景:聊天应用、实时通知、游戏服务器
总结
协程就像是一个善于时间管理的人,不会在等待的时候浪费时间,而是去做其他有意义的事情。在Python中,当你需要处理大量I/O密集型任务时,协程能显著提升程序的性能和响应速度。
记住协程的三个关键点:
- 并发不是并行:协程是在单线程中切换执行
- 主动让出控制权:使用
await
关键字 - 适合I/O密集型任务:网络请求、文件操作、数据库查询等