Skip to main content

Python 异步 IO

Python 协程

本章节主要讲解 什么是协程?,协程的核心概念,async/await 关键字,协程的状态转换,协程的优势,高效的资源利用,内存占用低,避免线程安全问题,协程的使用场景,网络请求和API调用,文件I/O操作,数据库操作,实时应用和WebSocket。

生活中的例子:洗衣做饭的智慧

想象一下,你回到家准备洗衣服和做饭。如果按照传统的方式:

传统方式(同步):

  1. 把衣服放进洗衣机,然后站在那里等30分钟直到洗完
  2. 洗完衣服后,开始做饭,花费40分钟
  3. 总共用时:30 + 40 = 70分钟

聪明的方式(协程):

  1. 把衣服放进洗衣机(2分钟)
  2. 趁洗衣机工作时去做饭(40分钟)
  3. 做饭期间,洗衣机洗完了,去晾衣服(3分钟)
  4. 总共用时:大约45分钟

这就是协程的核心思想:在等待的时候,不要闲着,去做其他事情!

什么是协程?

协程(Coroutine)是一种特殊的函数,它可以在执行过程中暂停,让出 CPU 的控制权给其他协程,之后再从暂停的地方继续执行。

用 Python 内置的 asyncio 模块 来表示上面的例子:

import asyncio

async def wash_clothes():
    print("🧺 开始洗衣服...")
    await asyncio.sleep(30)  # 模拟洗衣服需要30秒
    print("✅ 衣服洗完了!")

async def cook_dinner():
    print("🍳 开始做饭...")
    await asyncio.sleep(40)  # 模拟做饭需要40秒
    print("🍽️ 饭做好了!")

async def main():
    # 同时启动洗衣服和做饭
    await asyncio.gather(
        wash_clothes(),
        cook_dinner()
    )

# 运行协程
asyncio.run(main())

下面用 生成器函数来实现 2 个协程(生产者、消费组如何协调工作)

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print('[C] Consuming %s...' % n)
        r = '200 OK'


def produce(c):
    c.send(None)  # 首次调用,运行到 yield 出返回 r,并不会赋值给 n
    n = 0
    while n < 5:
        n = n + 1
        print('[P] Producing %s...' % n)
        r = c.send(n)
        print('[P] Consumer return: %s' % r)
    c.close()


c = consumer()
produce(c)
  • consumer():一个生成器函数,负责“消费”接收到的值并返回响应。
    • 因为它使用了 yield 关键字。生成器允许函数暂停执行,向调用者返回(yield)一个值,并在下次被调用时从暂停处继续执行。
    • n = yield r 会暂停函数并返回 r 的值给调用者。n 接收外部通过 send() 方法传入的值。
  • produce(c):一个普通函数,驱动生成器,通过向它发送值并打印结果。
    • c.send(None):初始化生成器。对于一个新创建的生成器,必须先调用 next() send(None) 来启动它,使其执行到第一个 yield 语句并暂停。send(None) 的作用是让生成器运行到 n = yield r 处,暂停并返回初始的 r(即空字符串 '')。为什么必须是 None?因为生成器刚启动时,还没有值可以赋给 n,所以只能发送 None
  • 就这样,只要生产者生成一个数据,就发送给消费组消费,整个流程无锁,由一个线程执行,produceconsumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。
  • 注意:在实际开发过程中,我们会使用现成的框架,而不是自己来写生成器函数,比如前面的 asyncio 就是现有框架。

协程的核心概念

1. async/await 关键字

  • async def 定义一个协程函数
  • await 用于等待另一个协程完成,期间可以让出CPU控制权
  • 这两个关键字,可以让写异步代码像写同步代码一样,非常方便。

2. 协程的状态转换

async def example():
    print("开始执行")           # 运行状态
    await asyncio.sleep(1)     # 暂停状态(让出控制权)
    print("继续执行")           # 恢复运行状态

协程的优势

1. 高效的资源利用

协程可以在一个线程内处理成千上万个并发任务,因为它们在等待I/O操作时会主动让出CPU。

import asyncio
import time

# 传统同步方式
def sync_download(url_id):
    print(f"开始下载 {url_id}")
    time.sleep(2)  # 模拟网络延迟
    print(f"下载完成 {url_id}")

# 协程方式
async def async_download(url_id):
    print(f"开始下载 {url_id}")
    await asyncio.sleep(2)  # 模拟网络延迟
    print(f"下载完成 {url_id}")

async def test_performance():
    start_time = time.time()

    # 并发下载5个文件
    tasks = [async_download(i) for i in range(5)]
    await asyncio.gather(*tasks)

    end_time = time.time()
    print(f"协程方式用时: {end_time - start_time:.2f}秒")
    # 输出约2秒,而同步方式需要10秒

asyncio.run(test_performance())

2. 内存占用低

  • 每个线程需要几MB内存
  • 每个协程只需要几KB内存
  • 可以轻松创建数万个协程

3. 避免线程安全问题

协程在单线程中运行,天然避免了竞态条件和锁的问题。

import  asyncio

# 不需要担心线程安全
counter = 0

async def increment():
    global counter
    await asyncio.sleep(0.1)  # 模拟一些异步操作
    counter += 1    # 不会有竞态条件

async def main():
    tasks = [increment() for _ in range(100)]
    await asyncio.gather(*tasks)
    print(f"最终计数: {counter}")  # 结果是确定的

asyncio.run(main())

协程的使用场景

1. 网络请求和API调用

pip install aiohttp

示例代码

import aiohttp
import asyncio

async def fetch_data(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        'http://httpbin.org/delay/1',
        'http://httpbin.org/delay/2',
        'http://httpbin.org/delay/1'
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_data(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    print(f"获取了 {len(results)} 个响应")

# 适用场景:爬虫、API聚合、微服务调用
asyncio.run(main())

2. 文件I/O操作

import aiofiles
import asyncio

async def read_file(filename):
    async with aiofiles.open(filename, 'r') as file:
        content = await file.read()
        return len(content)

async def process_files():
    files = ['file1.txt', 'file2.txt', 'file3.txt']
    tasks = [read_file(f) for f in files]
    sizes = await asyncio.gather(*tasks)
    
    for file, size in zip(files, sizes):
        print(f"{file}: {size} 字符")

# 适用场景:批量文件处理、日志分析

3. 数据库操作

import asyncpg
import asyncio

async def query_user(pool, user_id):
    async with pool.acquire() as conn:
        result = await conn.fetchrow(
            "SELECT * FROM users WHERE id = $1", user_id
        )
        return result

async def batch_query_users(user_ids):
    pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/db"
    )
    
    tasks = [query_user(pool, uid) for uid in user_ids]
    users = await asyncio.gather(*tasks)
    
    await pool.close()
    return users

# 适用场景:高并发Web应用、数据分析

4. 实时应用和WebSocket

import websockets
import asyncio

async def handle_client(websocket, path):
    print(f"新客户端连接: {websocket.remote_address}")
    
    async for message in websocket:
        # 处理消息并广播给其他客户端
        response = f"收到消息: {message}"
        await websocket.send(response)

async def start_server():
    server = await websockets.serve(handle_client, "localhost", 8765)
    print("WebSocket服务器启动在 ws://localhost:8765")
    await server.wait_closed()

# 适用场景:聊天应用、实时通知、游戏服务器

总结

协程就像是一个善于时间管理的人,不会在等待的时候浪费时间,而是去做其他有意义的事情。在Python中,当你需要处理大量I/O密集型任务时,协程能显著提升程序的性能和响应速度。

记住协程的三个关键点:

  1. 并发不是并行:协程是在单线程中切换执行
  2. 主动让出控制权:使用await关键字
  3. 适合I/O密集型任务:网络请求、文件操作、数据库查询等