Python 深入理解 并行并发 同步异步 阻塞非阻塞
本章节使用具体的例子,完整的代码演示了 Python 的 并行并发、Python 的 同步异步、Python 的 阻塞非阻塞 他们之间的区别,以及如何更好的理解它们。其中涉及进程、线程、协程相关的概念。
一. 并行并发
- 并行是指在同一时刻,多个任务(进程、线程等)同时执行,互不干扰,通常会是多核 CPU 实现。
- 并发是指在一个时间段内多个任务都处于运行状态,但同一时刻只有一个任务处于占用 CPU 运行状态,在这个时间段内多个任务可能根据实际情况交替执行任务。
- 总结:体现到具体的实现过程,并发指的是程序的结构(Python中也就是协程的实现方式),并行指的是程序运行时的状态。
并行例子
下面是并行例子,task是一个耗时操作,如果电脑有4核CPU的话,这4个任务会同时完成。
import multiprocessing
import time
import datetime
def worker(name):
print(f"{str(datetime.datetime.now())} 进程 {name} 开始执行")
time.sleep(2)
print(f"{str(datetime.datetime.now())} 进程 {name} 执行结束")
if __name__ == '__main__':
processes = []
for i in range(4):
p = multiprocessing.Process(target=worker, args=(f"Worker{i+1}",))
processes.append(p)
p.start()
for p in processes:
p.join()
print("所有进程执行完毕")
输出
2025-07-26 12:19:52.447111 进程 Worker4 开始执行
2025-07-26 12:19:52.447763 进程 Worker1 开始执行
2025-07-26 12:19:52.448127 进程 Worker3 开始执行
2025-07-26 12:19:52.448375 进程 Worker2 开始执行
2025-07-26 12:19:54.451243 进程 Worker4 执行结束
2025-07-26 12:19:54.451243 进程 Worker1 执行结束
2025-07-26 12:19:54.451242 进程 Worker3 执行结束
2025-07-26 12:19:54.451297 进程 Worker2 执行结束
所有进程执行完毕
并发例子
下面是并发例子: 生产者消费者模型,这里把生产者和消费者看成是并发的两个任务,“看起来” 他们都在同时运行,实际上他们是在交替运行,实际的并发运用在 web 高并发中用得多,比如tornado
异步框架。
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print(f'Consuming {n}')
r = 'OK'
def produce(c):
c.send(None)
n = 0
while n < 5:
n = n + 1
print(f'Producing {n}')
r = c.send(n)
print(f'Consumer return: {r}')
c.close()
c = consumer()
produce(c)
如果对此例子不明白的,请看前面的文章:

二. 同步异步
同步异步关注的是消息通知机制。
- 同步,就是在发出一个调用时,在没有得到结果之,该调用就没有结束不返回,但一旦调用返回结构就得到返回值了,调用者需主等待(读写)结果。
- 异步,与同步相反,在调用发出之后,这个调用就直接返回了,所以没有返回结果,返回结果需要在后期通过状态、通知、回调函数来返回结果(不需要调用者主动等待结果)。
同步例子
下面是同步例子: 这是一个同步爬虫,多个URL顺序爬取,后一个URL的爬取必须要等到前一个执行完成后才能执行。
import re
import requests
urls = [f'https://www.baidu.com/s?wd={i}' for i in range(5)]
def parse(url, response):
search = re.search(r'<title>(.*)</title>', response)
title = search.group(1) if search else ''
print((url, title))
def fetch(url):
response = requests.get(url)
parse(url, response.content.decode("utf-8"))
if __name__ == '__main__':
for target in urls:
fetch(target)
异步例子
下面是异步例子: 通过回调和事件循环的方式实现异步处理,把判断读就绪、写就绪的任务交给OS,主程序不断的创建新的任务,有耗时操作就让OS来监听是否就绪,就绪以后就通知应用程序来读取或者写入数据。
异步例子中没有使用 asyncio 或者 aiohttp 这些模块,使用最原始的 selector 方式,为了让大家看得更清楚一些。
from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ
from urllib.parse import urlparse
import re
import socket
selector = DefaultSelector()
stopped = False
urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]
class Spider:
results = []
def __init__(self, url):
self.url = url
r = urlparse(url)
self.hostname = r.hostname
self.port = r.port if r.port else 80
self.path = r.path if r.path else '/'
self.query = r.query
self.response = b''
self.buffersize = 4096
self.sock = socket.socket()
def parse(self):
response = self.response.decode('utf8', errors='ignore')
search = re.search(r'<title>(.*)</title>', response)
title = search.group(1) if search else ''
self.results.append((self.url, title))
def fetch(self):
global selector
self.sock.setblocking(False)
try:
self.sock.connect((self.hostname, self.port))
except BlockingIOError:
pass
selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)
def connected(self, key):
global selector
selector.unregister(key.fd)
get = (f'GET {self.path}?{self.query} HTTP/1.1\r\n'
f'Connection: close\r\nHost: {self.hostname}\r\n\r\n')
self.sock.send(get.encode())
selector.register(key.fd, EVENT_READ, self.read_response)
def read_response(self, key):
global selector
global stopped
chunk = self.sock.recv(self.buffersize)
if chunk:
self.response += chunk
else:
selector.unregister(key.fd)
urls.remove(self.url)
stopped = True if not urls else False
self.parse()
def event_loop():
global selector
global stopped
while not stopped:
events = selector.select()
for key, mask in events:
callback = key.data
callback(key)
if __name__ == '__main__':
for target in urls:
spider = Spider(target)
spider.fetch()
event_loop()
print(Spider.results)
下面是现成模块实现异步的代码:
from tornado.httpclient import AsyncHTTPClient
import asyncio
import aiohttp
import requests
import uvloop
async def aiohttp_download(url):
print(f'getting url: {url}')
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
print(f'{url} status: {resp.status}')
async def tornado_download(url):
print(f'getting url: {url}')
http_client = AsyncHTTPClient()
resp = await http_client.fetch(url)
print(f'{url} status: {resp.code}')
async def requests_download(url):
print(f'getting url: {url}')
resp = requests.get(url)
print(f'{url} status: {resp.status_code}')
async def main():
urls = ['https://www.mi.com/',
'https://www.baidu.com/',
'https://note.yuchaoshui.com/',
'https://www.jianshu.com/',
'https://www.csdn.net/',
'https://www.liaoxuefeng.com/']
# tasks = [requests_download(url) for url in urls]
tasks = [tornado_download(url) for url in urls]
# tasks = [aiohttp_download(url) for url in urls]
await asyncio.wait(tasks)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
# 或者使用uvloop
# uvloop.install()
# asyncio.run(main())
三. 阻塞非阻塞
阻塞与非阻塞关注的是当前程序在等待调用结果时,当前线程或进程的状态,关注的是当前线程或进程的情况。
- 阻塞是指调用结果返回之,当前线程会被挂起(不会消耗CPU资源,而是等待状态),调用线程只有在得到结果之后才会返回。
- 非阻塞是指在不能得到结果之前,该调用不会挂起当前线程。
例子
问: 同步非阻塞这种场景是否存在?
答: 存在,下面的例子很好的说明了这一点,一个 socket 的连接建立、等待数据读写这些耗时操作都可以是非阻塞的,但是在不做其他任何处理的情况下,用 try except 来判断是否读、写就绪来实现同步的多个任务处理。
下面的这个例子在实际应用中不会用到,因为实际它在等待每一个任务的结果,只不过是通过while循环的方式来判断是否有结果了而已。
from urllib.parse import urlparse
import re
import socket
urls = [f'http://www.baidu.com/s?wd={i}' for i in range(10)]
results = []
def parse(url, response):
global results
response = response.decode('utf8', errors='ignore')
search = re.search(r'<title>(.*)</title>', response)
title = search.group(1) if search else ''
results.append((url, title))
def fetch(url):
r = urlparse(url)
hostname = r.hostname
port = r.port if r.port else 80
path = r.path if r.path else '/'
query = r.query
buffersize = 4096
sock = socket.socket()
sock.setblocking(False)
try:
sock.connect((hostname, port))
except BlockingIOError:
pass
get = (f'GET {path}?{query} HTTP/1.1\r\n'
f'Connection: close\r\nHost: {hostname}\r\n\r\n')
get = get.encode('utf8')
while True:
try:
sock.send(get)
break
except OSError:
pass
response = b''
while True:
try:
chunk = sock.recv(buffersize)
while chunk:
response += chunk
chunk = sock.recv(buffersize)
break
except OSError:
pass
parse(url, response)
if __name__ == '__main__':
for target in urls:
fetch(target)
print(results)
四. 总结
- 同步异步表示一种协作方式,是从全局更高的角度来看待(多进程、多线程、协程的工作方式),如果程序使用了同步方式,那么就不存在非阻塞这种状态了。
- 阻塞非阻塞是从单进程、单线程、协程的角度来看待的,也即当前的状态。