Skip to main content

Python 异步 IO

Python 使用 aiohttp

aiohttp 基于 asyncio 异步 HTTP 框架,适用于需要高并发网络请求的场景,如网页爬虫、API 调用或构建异步 Web 应用,结合 asyncio 的事件循环,提供高效的异步网络请求和服务器功能,aiohttp 在处理大量并发请求时性能更优。

1. 什么是 aiohttp?

aiohttp 是 Python 的异步 HTTP 框架,结合 asyncio 的事件循环,提供高效的异步网络请求和服务器功能。相比同步库(如 requests),aiohttp 在处理大量并发请求时性能更优。

核心特性

  • 异步客户端:支持异步 GET、POST 等 HTTP 请求。
  • 异步服务器:支持构建异步 Web 服务器。
  • WebSocket 支持:实现实时双向通信。
  • 中间件支持:便于处理请求和响应的预处理。
  • 高效并发:结合 asyncio 实现高并发网络操作。

2. 安装 aiohttp

安装 aiohttp 需要 Python 3.6+,使用 pip 安装:

pip install aiohttp

如果需要支持异步 DNS 解析(推荐),安装 aiohttpaiodns

pip install aiohttp aiodns

3. 基本用法

3.1 异步 HTTP GET 请求

使用 aiohttp.ClientSession 发起异步 GET 请求:

import aiohttp
import asyncio

async def fetch_url(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.text()

async def main():
    url = "https://example.com"
    content = await fetch_url(url)
    print(content[:100])  # 打印前 100 个字符

asyncio.run(main())
  • aiohttp.ClientSession:管理 HTTP 连接,推荐使用 async with 确保正确关闭。
  • session.get:发起异步 GET 请求。
  • response.text():异步读取响应内容为字符串。

3.2 异步 HTTP POST 请求

发送 POST 请求并携带数据:

import aiohttp
import asyncio

async def post_data(url, data):
    async with aiohttp.ClientSession() as session:
        async with session.post(url, json=data) as response:
            return await response.json()

async def main():
    url = "https://httpbin.org/post"
    data = {"name": "Alice", "age": 25}
    result = await post_data(url, data)
    print(result)

asyncio.run(main())
  • session.post:发送 POST 请求,支持 JSON 数据(json=)或表单数据(data=)。
  • response.json():异步解析响应为 JSON 格式。

3.3 处理响应

aiohttp 提供多种方式处理 HTTP 响应:

import aiohttp
import asyncio

async def fetch_details(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            print(f"状态码: {response.status}")
            print(f"内容类型: {response.headers['content-type']}")
            if response.status == 200:
                content = await response.text()
                return content[:100]
            return None

async def main():
    url = "https://example.com"
    content = await fetch_details(url)
    print(f"内容前 100 字符: {content}")

asyncio.run(main())

输出示例

状态码: 200
内容类型: text/html; charset=UTF-8
内容前 100 字符: <!doctype html>
<html>
<head>
    <title>Example Domain</title>
  • response.status:获取 HTTP 状态码。
  • response.headers:访问响应头。
  • 其他方法:response.read()(读取字节)、response.json()(解析 JSON)。

4. 进阶用法

4.1 并发请求

使用 asyncio.gather 并发发送多个请求:

import aiohttp
import asyncio

async def fetch_url(url):
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(url) as response:
                print(f"正在获取 {url}")
                return await response.text()
        except Exception as e:
            return f"获取 {url} 失败: {e}"

async def main():
    urls = [
        "https://example.com",
        "https://python.org",
        "https://x.ai"
    ]
    tasks = [fetch_url(url) for url in urls]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    for url, result in zip(urls, results):
        print(f"{url}: {len(str(result))} 字符")

asyncio.run(main())

输出示例

正在获取 https://example.com
正在获取 https://python.org
正在获取 https://x.ai
https://example.com: 1256 字符
https://python.org: 50321 字符
https://x.ai: 7894 字符
  • return_exceptions=True:确保部分请求失败不会影响其他任务。
  • 并发请求显著提高效率,尤其在爬取多个网页时。

4.2 文件上传

使用 aiohttp 异步上传文件:

import aiohttp
import asyncio
from aiohttp import FormData

async def upload_file(url):
    data = FormData()
    data.add_field('file',
                   b"Hello, aiohttp!",
                   filename='test.txt',
                   content_type='text/plain')
    
    async with aiohttp.ClientSession() as session:
        async with session.post(url, data=data) as response:
            return await response.json()

async def main():
    url = "https://httpbin.org/post"
    result = await upload_file(url)
    print(result['files'])

asyncio.run(main())

输出示例

{'file': 'Hello, aiohttp!'}
  • FormData:用于构造 multipart/form-data 数据,适合文件上传。
  • data.add_field:添加文件或表单字段。

4.3 WebSocket 通信

aiohttp 支持异步 WebSocket 通信,适合实时应用:

import aiohttp
import asyncio

async def websocket_client(url):
    async with aiohttp.ClientSession() as session:
        async with session.ws_connect(url) as ws:
            # 发送消息
            await ws.send_str("Hello, WebSocket!")
            # 接收消息
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    print(f"收到: {msg.data}")
                    break
                elif msg.type == aiohttp.WSMsgType.CLOSED:
                    break
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    break

async def main():
    url = "wss://echo.websocket.org"
    await websocket_client(url)

asyncio.run(main())

输出示例

收到: Request served by e286e427f61668
  • session.ws_connect:建立 WebSocket 连接。
  • ws.send_str:发送文本消息。
  • async for msg in ws:异步迭代接收消息。

4.4 设置超时

为请求设置超时以避免长时间等待:

import aiohttp
import asyncio

async def fetch_with_timeout(url):
    timeout = aiohttp.ClientTimeout(total=5)  # 总超时 5 秒
    async with aiohttp.ClientSession(timeout=timeout) as session:
        try:
            async with session.get(url) as response:
                return await response.text()
        except asyncio.TimeoutError:
            return f"请求 {url} 超时"

async def main():
    url = "https://httpbin.org/delay/10"  # 模拟慢响应
    result = await fetch_with_timeout(url)
    print(result)

asyncio.run(main())

输出

请求 https://httpbin.org/delay/10 超时
  • aiohttp.ClientTimeout:设置请求超时,包括连接、读取等。
  • 超时机制适合处理不稳定的网络环境。

5. 构建异步 Web 服务器

aiohttp 也可以用来创建异步 Web 服务器:

from aiohttp import web
import asyncio

async def handle(request):
    name = request.match_info.get('name', "World")
    text = f"Hello, {name}!"
    return web.Response(text=text)

async def main():
    app = web.Application()
    app.add_routes([web.get('/', handle),
                    web.get('/{name}', handle)])
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, 'localhost', 8080)
    await site.start()
    print("服务器运行在 http://localhost:8080")
    await asyncio.Event().wait()  # 保持服务器运行

asyncio.run(main())
  • 访问 http://localhost:8080http://localhost:8080/Alice,将返回 Hello, World!Hello, Alice!
  • web.Application:创建 Web 应用。
  • app.add_routes:添加路由。
  • web.TCPSite:运行 HTTP 服务器。

6. 注意事项

  1. 始终使用 ClientSession
    • 不要为每次请求创建新会话,使用 async with aiohttp.ClientSession() 复用连接。
    • 会话管理连接池,提高性能并避免资源泄漏。
  2. 异常处理
    • 处理网络异常(如 aiohttp.ClientError)和超时(asyncio.TimeoutError)。
    • 使用 return_exceptions=True 在并发请求中捕获异常。
  3. 避免阻塞操作
    • 不要在异步代码中使用同步阻塞操作(如 requests.get),否则会阻塞事件循环。