Skip to main content

Python 异步 IO

Python 使用 asyncio

asyncio 是 Python 标准库中用于异步编程的模块,适用于需要高并发处理的任务,例如网络编程、文件操作或异步 I/O 操作。异步编程允许程序在等待 I/O 操作(如网络请求、文件读写)时继续让 CPU 执行其他任务,从而提高效率。asyncio 提供了一种基于协程(coroutine)的异步编程方式,通过事件循环(Event Loop)协调协程的执行。

1. 核心概念

  • 协程(Coroutine):使用 async def 定义的函数,运行时可以暂停和恢复,允许其他任务在等待时执行。
  • 事件循环(Event Loop)asyncio 的核心,负责调度和运行协程、任务以及回调。
  • 任务(Task):协程的封装,用于在事件循环中调度执行。
  • Future:表示尚未完成的操作,TaskFuture 的子类。
  • await:用于暂停协程,等待某个异步操作完成。

2. 基本用法

2.1 定义协程

使用 async def 定义一个协程函数,内部使用 await 暂停执行以等待异步操作。

import asyncio

async def say_hello():
    print("Hello")
    await asyncio.sleep(1)  # 模拟异步 I/O 操作
    print("World")

# 获取事件循环并运行协程
asyncio.run(say_hello())
  • asyncio.sleep(1):模拟异步等待 1 秒,类似同步的 time.sleep,但不会阻塞事件循环。
  • asyncio.run():运行协程的入口,负责创建事件循环并执行协程。

2.2 运行多个协程

可以通过 asyncio.gather() 并发运行多个协程:

import asyncio

async def task1():
    print("Task 1 开始")
    await asyncio.sleep(2)
    print("Task 1 结束")
    return "task1"

async def task2():
    print("Task 2 开始")
    await asyncio.sleep(1)
    print("Task 2 结束")
    return "task2"

async def main():
    results = await asyncio.gather(task1(), task2())
    print(results)

asyncio.run(main())

输出

Task 1 开始
Task 2 开始
Task 2 结束
Task 1 结束
['task1', 'task2']
  • asyncio.gather():并发运行多个协程,并等待所有协程完成。
  • 协程按照事件循环调度,可能交错执行。

2.3 使用任务(Task)

任务是对协程的封装,允许更细粒度的控制:

import asyncio

async def task(name, delay):
    print(f"{name} 开始")
    await asyncio.sleep(delay)
    print(f"{name} 结束")

async def main():
    # 创建任务
    t1 = asyncio.create_task(task("Task 1", 2))
    t2 = asyncio.create_task(task("Task 2", 1))
    await t1  # 等待任务 1 完成
    await t2  # 等待任务 2 完成

asyncio.run(main())
  • asyncio.create_task():将协程包装为任务,立即调度到事件循环。
  • 任务允许手动控制执行顺序或取消任务。

3. 进阶用法

3.1 异步上下文管理器

使用 async with 实现异步上下文管理器,适合管理异步资源的生命周期:

import asyncio
import aiohttp

async def fetch_url(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    content = await fetch_url("https://example.com")
    print(content[:100])  # 打印前 100 个字符

asyncio.run(main())
  • aiohttp.ClientSession:异步 HTTP 客户端,用于发送网络请求,aiohttp需要单独安装。
  • async with:确保资源(如网络连接)在使用后正确关闭。

3.2 异步迭代器

使用 async for 实现异步迭代器,适合处理异步数据流:

import asyncio

async def async_generator():
    for i in range(3):
        await asyncio.sleep(1)
        yield i

async def main():
    async for value in async_generator():
        print(f"Received: {value}")

asyncio.run(main())

输出

Received: 0
Received: 1
Received: 2
  • async def async_generator():定义异步生成器。
  • async for:用于迭代异步生成器的结果。

3.3 任务取消与超时

asyncio 提供机制来取消任务或设置超时:

import asyncio

async def long_running_task():
    print("任务开始")
    try:
        await asyncio.sleep(10)
        print("任务完成")
    except asyncio.CancelledError:
        print("任务被取消")

async def main():
    task = asyncio.create_task(long_running_task())
    await asyncio.sleep(2)
    task.cancel()  # 取消任务
    try:
        await task
    except asyncio.CancelledError:
        print("主程序捕获到取消异常")

asyncio.run(main())

输出

任务开始
任务被取消
主程序捕获到取消异常
  • task.cancel():取消正在运行的任务。
  • 使用 asyncio.wait_for() 可以设置超时:
import asyncio

async def slow_task():
    await asyncio.sleep(5)
    return "完成"

async def main():
    try:
        result = await asyncio.wait_for(slow_task(), timeout=2)
        print(result)
    except asyncio.TimeoutError:
        print("任务超时")

asyncio.run(main())

输出

任务超时

4. 实际应用示例:异步爬虫

以下是一个使用 aiohttp 实现的异步网页爬虫,批量获取多个网页内容:

import asyncio
import aiohttp

async def fetch_url(url):
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(url) as response:
                print(f"正在获取 {url}")
                return await response.text()
        except Exception as e:
            return f"获取 {url} 失败: {e}"

async def main():
    urls = [
        "https://example.com",
        "https://python.org",
        "https://x.ai"
    ]
    tasks = [fetch_url(url) for url in urls]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    for url, result in zip(urls, results):
        print(f"{url}: {len(str(result))} 字符")

asyncio.run(main())

输出示例

正在获取 https://example.com
正在获取 https://python.org
正在获取 https://x.ai
https://example.com: 1256 字符
https://python.org: 50321 字符
https://x.ai: 7894 字符
  • 使用 aiohttp 进行异步 HTTP 请求。
  • return_exceptions=True 确保即使某些请求失败,其他任务仍能继续执行。

5. 注意事项

  1. 使用 asyncio.run()
    • 仅在程序入口调用 asyncio.run(),不要在已运行的事件循环中调用。
    • 每个 asyncio.run() 创建一个新的事件循环。
  2. 避免阻塞操作
    • 不要在协程中使用同步的阻塞操作(如 time.sleep 或同步网络请求),否则会阻塞事件循环。
    • 使用 asyncio.sleep 或异步库(如 aiohttp)。
  3. 线程安全
    • asyncio 的事件循环不是线程安全的。不要在多个线程中操作同一个事件循环。
    • 如果需要多线程,考虑使用 loop.run_in_executor 执行阻塞任务。

6. 总结

asyncio 是 Python 异步编程的强大工具,适合处理高并发场景。通过协程、事件循环和任务,开发者可以高效地管理异步操作。重要关键点:

  • 使用 async defawait 定义和调用协程。
  • 使用 asyncio.gatherasyncio.create_task 并发运行多个协程。
  • 使用 async withasync for 处理异步资源和迭代。
  • 注意异常处理、任务取消和超时机制。
  • 结合异步库(如 aiohttp)实现实际应用。