Skip to main content

Python 进程与线程

Python 进程间通信

在使用 Python 的 multiprocessing 模块进行多进程开发时,多个进程之间的数据共享和通信成为一个重要问题。由于进程间彼此独立,不能像线程那样共享内存,因此需要使用特定的通信机制。本章节将依次介绍四种常见的进程通信方式:队列(Queue)、管道(Pipe)、共享内存(Shared Memory) 和 管理器(Manager)。

一. 使用队列(Queue)进行通信

multiprocessing.Queue 是最常见的进程通信方式之一,底层基于管道和锁实现,适合用于多个生产者或消费者的场景。

示例:一个进程写入数据,另一个进程读取数据

from multiprocessing import Process, Queue
import time

def producer(q):
    for i in range(5):
        print(f'生产数据:{i}')
        q.put(i)
        time.sleep(0.5)

def consumer(q):
    while True:
        if not q.empty():
            item = q.get()
            print(f'消费数据:{item}')
            if item == 4:
                break

if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=producer, args=(q,))
    p2 = Process(target=consumer, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

二. 使用管道(Pipe)进行通信

Pipe() 提供了两个连接端,可以双向发送和接收消息。适合两个进程之间的点对点通信。

示例:父进程发送消息,子进程接收并回应

from multiprocessing import Process, Pipe

def worker(conn):
    msg = conn.recv()
    print(f'子进程收到:{msg}')
    conn.send(f'你好,父进程,我收到消息:{msg}')

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=worker, args=(child_conn,))
    p.start()
    parent_conn.send('你好,子进程!')
    response = parent_conn.recv()
    print(f'父进程收到回应:{response}')
    p.join()

三. 使用共享内存

共享内存提供了一种比 Queue 更高效的数据共享方式,适合传输大量数值数据,如图像、数组等。

示例:多个进程共享一个数字,注意对数字相加时要加锁。

import multiprocessing as mp

def update_counter(val, lock):
    for _ in range(100):
        # 获取锁,确保对共享变量的独占访问
        with lock:
            val.value += 1

if __name__ == "__main__":
    # 创建共享整数,并获取关联的锁
    counter = mp.Value('i', 0)
    lock = counter.get_lock()

    # 创建多个进程,传递共享变量和锁
    processes = [mp.Process(target=update_counter, args=(counter, lock)) for _ in range(10)]

    # 启动所有进程
    for p in processes:
        p.start()

    # 等待所有进程完成
    for p in processes:
        p.join()

    # 输出最终计数结果
    print("Final counter:", counter.value)
    

四. 使用管理器

multiprocessing.Manager 提供一种简单方式来共享列表、字典、队列等 Python 对象,适合在多个进程中读写复杂数据结构。

示例:共享一个字典进行读写,注意,下面的代码加入了锁,确保结果正确,关于锁的部分,在后面章节会详细讲解。

from multiprocessing import Process, Manager, Lock

def writer(shared_dict, lock):
    with lock:
        shared_dict['pid'] = '子进程'
        shared_dict['status'] = '已写入'
        if 'cnt' not in shared_dict:
            shared_dict['cnt'] = 0
        for i in range(10):
            shared_dict['cnt'] += 1

def reader(shared_dict):
    print('主进程读取数据:', dict(shared_dict))

if __name__ == '__main__':
    with Manager() as manager:
        shared_dict = manager.dict()
        lock = Lock()

        processes = [Process(target=writer, args=(shared_dict, lock)) for _ in range(10)]

        # 启动所有进程
        for p in processes:
            p.start()

        # 等待所有进程完成
        for p in processes:
            p.join()

        reader(shared_dict)