Python 进程间通信
在使用 Python 的 multiprocessing 模块进行多进程开发时,多个进程之间的数据共享和通信成为一个重要问题。由于进程间彼此独立,不能像线程那样共享内存,因此需要使用特定的通信机制。本章节将依次介绍四种常见的进程通信方式:队列(Queue)、管道(Pipe)、共享内存(Shared Memory) 和 管理器(Manager)。
一. 使用队列(Queue)进行通信
multiprocessing.Queue
是最常见的进程通信方式之一,底层基于管道和锁实现,适合用于多个生产者或消费者的场景。
示例:一个进程写入数据,另一个进程读取数据
from multiprocessing import Process, Queue
import time
def producer(q):
for i in range(5):
print(f'生产数据:{i}')
q.put(i)
time.sleep(0.5)
def consumer(q):
while True:
if not q.empty():
item = q.get()
print(f'消费数据:{item}')
if item == 4:
break
if __name__ == '__main__':
q = Queue()
p1 = Process(target=producer, args=(q,))
p2 = Process(target=consumer, args=(q,))
p1.start()
p2.start()
p1.join()
p2.join()
二. 使用管道(Pipe)进行通信
Pipe()
提供了两个连接端,可以双向发送和接收消息。适合两个进程之间的点对点通信。
示例:父进程发送消息,子进程接收并回应
from multiprocessing import Process, Pipe
def worker(conn):
msg = conn.recv()
print(f'子进程收到:{msg}')
conn.send(f'你好,父进程,我收到消息:{msg}')
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
p.start()
parent_conn.send('你好,子进程!')
response = parent_conn.recv()
print(f'父进程收到回应:{response}')
p.join()
三. 使用共享内存
共享内存提供了一种比 Queue 更高效的数据共享方式,适合传输大量数值数据,如图像、数组等。
示例:多个进程共享一个数字,注意对数字相加时要加锁。
import multiprocessing as mp
def update_counter(val, lock):
for _ in range(100):
# 获取锁,确保对共享变量的独占访问
with lock:
val.value += 1
if __name__ == "__main__":
# 创建共享整数,并获取关联的锁
counter = mp.Value('i', 0)
lock = counter.get_lock()
# 创建多个进程,传递共享变量和锁
processes = [mp.Process(target=update_counter, args=(counter, lock)) for _ in range(10)]
# 启动所有进程
for p in processes:
p.start()
# 等待所有进程完成
for p in processes:
p.join()
# 输出最终计数结果
print("Final counter:", counter.value)
四. 使用管理器
multiprocessing.Manager
提供一种简单方式来共享列表、字典、队列等 Python 对象,适合在多个进程中读写复杂数据结构。
示例:共享一个字典进行读写,注意,下面的代码加入了锁,确保结果正确,关于锁的部分,在后面章节会详细讲解。
from multiprocessing import Process, Manager, Lock
def writer(shared_dict, lock):
with lock:
shared_dict['pid'] = '子进程'
shared_dict['status'] = '已写入'
if 'cnt' not in shared_dict:
shared_dict['cnt'] = 0
for i in range(10):
shared_dict['cnt'] += 1
def reader(shared_dict):
print('主进程读取数据:', dict(shared_dict))
if __name__ == '__main__':
with Manager() as manager:
shared_dict = manager.dict()
lock = Lock()
processes = [Process(target=writer, args=(shared_dict, lock)) for _ in range(10)]
# 启动所有进程
for p in processes:
p.start()
# 等待所有进程完成
for p in processes:
p.join()
reader(shared_dict)