Skip to main content

Python 进程与线程

Python 分布式进程

分布式进程是指将一个程序的不同部分运行在不同的计算机或进程中,通过网络通信来协调工作。在Python中,我们可以使用multiprocessing模块的managers功能来实现分布式进程,让不同机器上的Python进程能够共享数据和协同工作。

核心原理

1. 进程间通信(IPC)

传统的多进程只能在同一台机器上运行,而分布式进程突破了这个限制。它通过以下机制实现:

  • 网络通信:使用TCP/IP协议在网络中传输数据
  • 序列化:将Python对象转换为字节流进行网络传输
  • 代理对象:在客户端创建远程对象的本地代理

2. Manager模式

Python的分布式进程采用Manager-Worker模式:

  • Manager进程:作为服务器,管理共享资源(队列、列表、字典等)
  • Worker进程:作为客户端,连接到Manager并使用共享资源
  • 注册机制:通过注册让Worker能够访问Manager的资源

3. 网络架构

Manager进程(服务器端)
    ├── 任务分发器:管理任务队列
    ├── 结果收集器:处理计算结果
    └── 网络服务:处理客户端连接
         ↓ TCP/IP通信
Worker进程(客户端)
    ├── 连接管理器:维持与服务器连接  
    ├── 任务处理器:执行具体计算
    └── 结果提交器:回传处理结果

基本组件详解

BaseManager类

BaseManager是分布式进程的核心类,它负责:

  • 启动服务器进程
  • 管理共享对象的注册
  • 处理客户端连接请求
  • 维护网络通信

共享对象类型

常用的共享对象包括:

  • Queue:进程安全的队列,用于任务分发
  • list:共享列表,存储结果数据
  • dict:共享字典,存储配置或状态信息
  • 自定义类:通过注册机制支持自定义对象

完整示例:分布式任务处理系统

下面通过一个计算质数的分布式系统来演示完整的实现过程。

服务端代码(manager.py)

import multiprocessing
import time
import threading
from multiprocessing.managers import BaseManager

# 创建任务队列和结果队列
task_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()

# 定义获取队列的函数
def get_task_queue():
    return task_queue

def get_result_queue():
    return result_queue

# 自定义Manager类
class QueueManager(BaseManager):
    pass

def collect_results():
    """结果收集线程函数"""
    print("\n=== 开始收集计算结果 ===")
    total_primes = []
    total_time = 0
    completed_tasks = 0
    expected_tasks = 10  # 预期的任务数量

    while completed_tasks < expected_tasks:
        try:
            result = result_queue.get(timeout=60)  # 等待60秒
            completed_tasks += 1

            print(f"\n收到结果 {completed_tasks}/{expected_tasks}:")
            print(f"  范围: {result['range'][0]} - {result['range'][1]}")
            print(f"  质数个数: {result['count']}")
            print(f"  质数列表: {result['primes']}")
            print(f"  处理时间: {result['processing_time']} 秒")

            total_primes.extend(result['primes'])
            total_time += result['processing_time']

        except:
            print(f"等待第 {completed_tasks + 1} 个结果超时")
            break

    # 输出汇总结果
    print(f"\n=== 最终统计 ===")
    print(f"完成任务数: {completed_tasks}/{expected_tasks}")
    print(f"总共找到质数: {len(total_primes)} 个")
    print(f"质数列表: {sorted(total_primes)}")
    print(f"总处理时间: {round(total_time, 3)} 秒")
    print(f"平均每个任务耗时: {round(total_time/completed_tasks, 3)} 秒" if completed_tasks > 0 else "")

def start_server(address, port):
    # 注册队列到Manager
    QueueManager.register('get_task_queue', callable=get_task_queue)
    QueueManager.register('get_result_queue', callable=get_result_queue)

    # 启动Manager服务器,监听端口5000
    manager = QueueManager(address=(address, port), authkey=b'secret_key')
    server = manager.get_server()

    print("Manager服务器已启动,等待Worker连接...")
    print("服务器地址: localhost:5000")

    # 添加一些计算任务(寻找指定范围内的质数)
    print("正在添加任务...")
    for i in range(100, 200, 10):
        task_queue.put((i, i + 10))
        print(f"添加任务: 计算 {i} 到 {i+10} 范围内的质数")

    # 启动结果收集线程
    collector_thread = threading.Thread(target=collect_results, daemon=True)
    collector_thread.start()

    # 启动服务器(会阻塞)
    try:
        server.serve_forever()
    except KeyboardInterrupt:
        print("\n服务器正在关闭...")
        server.shutdown()

if __name__ == '__main__':
    start_server('127.0.0.1', 5000)

客户端代码(worker.py)

质数(也称为素数) 是指大于1的自然数,且除了1和它本身以外,不能被其他自然数整除的数。

import time
from multiprocessing.managers import BaseManager

# 定义计算质数的函数
def is_prime(n):
    """判断一个数是否为质数"""
    if n < 2:
        return False
    for i in range(2, int(n ** 0.5) + 1):
        if n % i == 0:
            return False
    return True

def find_primes_in_range(start, end):
    """在指定范围内寻找质数"""
    primes = []
    for num in range(start, end):
        if is_prime(num):
            primes.append(num)
    return primes

# 客户端Manager类
class QueueManager(BaseManager):
    pass

def start_worker(server_host, sever_port):
    # 注册队列(客户端不需要提供callable)
    QueueManager.register('get_task_queue')
    QueueManager.register('get_result_queue')

    # 连接到服务器
    print("正在连接到Manager服务器...")
    manager = QueueManager(address=(server_host, sever_port), authkey=b'secret_key')
    manager.connect()

    # 获取队列对象
    task_queue = manager.get_task_queue()
    result_queue = manager.get_result_queue()
    print("成功连接到服务器,开始处理任务...")

    # 持续处理任务
    while True:
        try:
            # 从任务队列获取任务(超时5秒)
            task = task_queue.get(timeout=5)
            start_num, end_num = task

            print(f"Worker正在处理任务: {start_num} 到 {end_num}")

            # 计算该范围内的质数
            start_time = time.time()
            primes = find_primes_in_range(start_num, end_num)
            end_time = time.time()

            # 准备结果数据
            result = {
                'range': (start_num, end_num),
                'primes': primes,
                'count': len(primes),
                'processing_time': round(end_time - start_time, 3)
            }

            # 将结果放入结果队列
            result_queue.put(result)
            print(f"任务完成: 找到 {len(primes)} 个质数,耗时 {result['processing_time']} 秒")

        except:
            # 没有更多任务,退出循环
            print("没有更多任务,Worker退出")
            break

if __name__ == '__main__':
    start_worker('127.0.0.1', 5000)

第一步:启动Manager服务器

python manager.py

服务器启动后会:

  1. 创建任务队列和结果队列
  2. 注册队列到Manager
  3. 在端口5000上启动TCP服务器
  4. 向任务队列添加10个计算任务
  5. 启动后台结果收集线程
  6. 等待客户端连接

第二步:启动Worker客户端

python worker.py

可以同时启动多个Worker实例来并行处理:

python worker.py &
python worker.py &
python worker.py &

每个Worker会:

  1. 连接到Manager服务器
  2. 获取任务队列和结果队列的代理对象
  3. 循环从任务队列取任务
  4. 计算指定范围内的质数
  5. 将结果放入结果队列

第三步:自动结果收集

Manager服务器会自动:

  1. 通过后台线程从结果队列收集数据
  2. 实时显示每个任务的处理结果
  3. 在所有任务完成后展示汇总统计
  4. 无需手动启动额外的收集程序

关键技术点说明

1. 认证机制

authkey=b'secret_key'

authkey确保只有知道密钥的客户端才能连接,提供基本的安全保护。

2. 序列化处理

Python自动处理对象的序列化和反序列化,支持大多数内置类型和简单的自定义对象。

3. 超时机制

task = task_queue.get(timeout=5)

设置超时避免程序无限等待,当没有任务时能够优雅退出。

4. 异常处理

网络程序需要处理连接断开、超时等各种异常情况,确保程序的稳定性。