Skip to main content

Python 异步 IO

Python 深入理解 并行并发 同步异步 阻塞非阻塞

本章节使用具体的例子,完整的代码演示了 Python 的 并行并发、Python 的 同步异步、Python 的 阻塞非阻塞 他们之间的区别,以及如何更好的理解它们。其中涉及进程、线程、协程相关的概念。

一. 并行并发

  • 并行是指在同一时刻,多个任务(进程、线程等)同时执行,互不干扰,通常会是多核 CPU 实现。
  • 并发是指在一个时间段内多个任务都处于运行状态,但同一时刻只有一个任务处于占用 CPU 运行状态,在这个时间段内多个任务可能根据实际情况交替执行任务。
  • 总结:体现到具体的实现过程,并发指的是程序的结构(Python中也就是协程的实现方式),并行指的是程序运行时的状态。

并行例子

下面是并行例子,task是一个耗时操作,如果电脑有4核CPU的话,这4个任务会同时完成。

import multiprocessing
import time
import datetime

def worker(name):
    print(f"{str(datetime.datetime.now())} 进程 {name} 开始执行")
    time.sleep(2)
    print(f"{str(datetime.datetime.now())} 进程 {name} 执行结束")

if __name__ == '__main__':
    processes = []
    for i in range(4):
        p = multiprocessing.Process(target=worker, args=(f"Worker{i+1}",))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print("所有进程执行完毕")

输出

2025-07-26 12:19:52.447111 进程 Worker4 开始执行
2025-07-26 12:19:52.447763 进程 Worker1 开始执行
2025-07-26 12:19:52.448127 进程 Worker3 开始执行
2025-07-26 12:19:52.448375 进程 Worker2 开始执行
2025-07-26 12:19:54.451243 进程 Worker4 执行结束
2025-07-26 12:19:54.451243 进程 Worker1 执行结束
2025-07-26 12:19:54.451242 进程 Worker3 执行结束
2025-07-26 12:19:54.451297 进程 Worker2 执行结束
所有进程执行完毕

并发例子

下面是并发例子: 生产者消费者模型,这里把生产者和消费者看成是并发的两个任务,“看起来” 他们都在同时运行,实际上他们是在交替运行,实际的并发运用在 web 高并发中用得多,比如tornado异步框架。

def consumer():
    r = ''
    while True:
        n = yield r
        if not n:
            return
        print(f'Consuming {n}')
        r = 'OK'

def produce(c):
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print(f'Producing {n}')
        r = c.send(n)
        print(f'Consumer return: {r}')
    c.close()

c = consumer()
produce(c)

如果对此例子不明白的,请看前面的文章:

Python 协程
生活中的例子:洗衣做饭的智慧 想象一下,你回到家准备洗衣服和做饭。如果按照传统的方式: 传统方式(同步): 1. 把衣服放进洗衣机,然后站在那里等30分钟直到洗完 2. 洗完衣服后,开始做饭,花费40分钟 3. 总共用时:30 + 40 = 70分钟 聪明的方式(协程): 1. 把衣服放进洗衣机(2分钟) 2. 趁洗衣机工作时去做饭(40分钟) 3. 做饭期间,洗衣机洗完了,去晾衣服(3分钟) 4. 总共用时:大约45分钟 这就是协程的核心思想:在等待的时候,不要闲着,去做其他事情! 什么是协程? 协程(Coroutine)是一种特殊的函数,它可以在执行过程中暂停,让出 CPU 的控制权给其他协程,之后再从暂停的地方继续执行。 用 Python

二. 同步异步

同步异步关注的是消息通知机制。

  • 同步,就是在发出一个调用时,在没有得到结果之,该调用就没有结束不返回,但一旦调用返回结构就得到返回值了,调用者需主等待(读写)结果。
  • 异步,与同步相反,在调用发出之后,这个调用就直接返回了,所以没有返回结果,返回结果需要在后期通过状态、通知、回调函数来返回结果(不需要调用者主动等待结果)。

同步例子

下面是同步例子: 这是一个同步爬虫,多个URL顺序爬取,后一个URL的爬取必须要等到前一个执行完成后才能执行。

import re
import requests

urls = [f'https://www.baidu.com/s?wd={i}' for i in range(5)]

def parse(url, response):
    search = re.search(r'<title>(.*)</title>', response)
    title = search.group(1) if search else ''
    print((url, title))

def fetch(url):
    response = requests.get(url)
    parse(url, response.content.decode("utf-8"))

if __name__ == '__main__':
    for target in urls:
        fetch(target)

异步例子

下面是异步例子: 通过回调和事件循环的方式实现异步处理,把判断读就绪、写就绪的任务交给OS,主程序不断的创建新的任务,有耗时操作就让OS来监听是否就绪,就绪以后就通知应用程序来读取或者写入数据。

异步例子中没有使用 asyncio 或者 aiohttp 这些模块,使用最原始的 selector 方式,为了让大家看得更清楚一些。

from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
from urllib.parse import urlparse
import re
import socket

selector = DefaultSelector()
stopped = False
urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]

class Spider:
    results = []

    def __init__(self, url):
        self.url = url
        r = urlparse(url)
        self.hostname = r.hostname
        self.port = r.port if r.port else 80
        self.path = r.path if r.path else '/'
        self.query = r.query
        self.response = b''
        self.buffersize = 4096
        self.sock = socket.socket()

    def parse(self):
        response = self.response.decode('utf8', errors='ignore')
        search = re.search(r'<title>(.*)</title>', response)
        title = search.group(1) if search else ''
        self.results.append((self.url, title))

    def fetch(self):
        global selector
        self.sock.setblocking(False)
        try:
            self.sock.connect((self.hostname, self.port))
        except BlockingIOError:
            pass
        selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)

    def connected(self, key):
        global selector
        selector.unregister(key.fd)
        get = (f'GET {self.path}?{self.query} HTTP/1.1\r\n'
               f'Connection: close\r\nHost: {self.hostname}\r\n\r\n')
        self.sock.send(get.encode())
        selector.register(key.fd, EVENT_READ, self.read_response)

    def read_response(self, key):
        global selector
        global stopped
        chunk = self.sock.recv(self.buffersize)
        if chunk:
            self.response += chunk
        else:
            selector.unregister(key.fd)
            urls.remove(self.url)
            stopped = True if not urls else False
            self.parse()

def event_loop():
    global selector
    global stopped
    while not stopped:
        events = selector.select()
        for key, mask in events:
            callback = key.data
            callback(key)

if __name__ == '__main__':
    for target in urls:
        spider = Spider(target)
        spider.fetch()
    event_loop()
    print(Spider.results)

下面是现成模块实现异步的代码:

from tornado.httpclient import AsyncHTTPClient
import asyncio
import aiohttp
import requests
import uvloop


async def aiohttp_download(url):
    print(f'getting url: {url}')
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            print(f'{url} status: {resp.status}')


async def tornado_download(url):
    print(f'getting url: {url}')
    http_client = AsyncHTTPClient()
    resp = await http_client.fetch(url)
    print(f'{url} status: {resp.code}')


async def requests_download(url):
    print(f'getting url: {url}')
    resp = requests.get(url)
    print(f'{url} status: {resp.status_code}')


async def main():
    urls = ['https://www.mi.com/',
            'https://www.baidu.com/',
            'https://note.yuchaoshui.com/',
            'https://www.jianshu.com/',
            'https://www.csdn.net/',
            'https://www.liaoxuefeng.com/']

    # tasks = [requests_download(url) for url in urls]
    tasks = [tornado_download(url) for url in urls]
    # tasks = [aiohttp_download(url) for url in urls]
    await asyncio.wait(tasks)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

# 或者使用uvloop
# uvloop.install()
# asyncio.run(main())

三. 阻塞非阻塞

阻塞与非阻塞关注的是当前程序在等待调用结果时,当前线程或进程的状态,关注的是当前线程或进程的情况。

  • 阻塞是指调用结果返回之,当前线程会被挂起(不会消耗CPU资源,而是等待状态),调用线程只有在得到结果之后才会返回。
  • 非阻塞是指在不能得到结果之前,该调用不会挂起当前线程。

例子

问: 同步非阻塞这种场景是否存在?

答: 存在,下面的例子很好的说明了这一点,一个 socket 的连接建立、等待数据读写这些耗时操作都可以是非阻塞的,但是在不做其他任何处理的情况下,用 try except 来判断是否读、写就绪来实现同步的多个任务处理。

下面的这个例子在实际应用中不会用到,因为实际它在等待每一个任务的结果,只不过是通过while循环的方式来判断是否有结果了而已。

from urllib.parse import urlparse
import re
import socket

urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]
results = []

def parse(url, response):
    global results
    response = response.decode('utf8', errors='ignore')
    search = re.search(r'<title>(.*)</title>', response)
    title = search.group(1) if search else ''
    results.append((url, title))

def fetch(url):
    r = urlparse(url)
    hostname = r.hostname
    port = r.port if r.port else 80
    path = r.path if r.path else '/'
    query = r.query
    buffersize = 4096

    sock = socket.socket()
    sock.setblocking(False)
    try:
        sock.connect((hostname, port))
    except BlockingIOError:
        pass

    get = (f'GET {path}?{query} HTTP/1.1\r\n'
           f'Connection: close\r\nHost: {hostname}\r\n\r\n')
    get = get.encode('utf8')

    while True:
        try:
            sock.send(get)
            break
        except OSError:
            pass

    response = b''
    while True:
        try:
            chunk = sock.recv(buffersize)
            while chunk:
                response += chunk
                chunk = sock.recv(buffersize)
            break
        except OSError:
            pass

    parse(url, response)

if __name__ == '__main__':
    for target in urls:
        fetch(target)
    print(results)

四. 总结

  • 同步异步表示一种协作方式,是从全局更高的角度来看待(多进程、多线程、协程的工作方式),如果程序使用了同步方式,那么就不存在非阻塞这种状态了。
  • 阻塞非阻塞是从单进程、单线程、协程的角度来看待的,也即当前的状态。