分布式进程
在Thread和Process中,应该首选Process,其中不止因为Process更加稳定,而且Process可以分布到多台机器上,而Thread只能局限在一台机器上。Python的multiprocess
模块中managers
子模块支持把多进程分布到多台机器上,一个服务进程可以作为调度器将任务依靠网络通信分布到其他多个进程中。
首先看服务进程的示例。
import random, time, queue
from multiprocess.managers import BaseManager
# 设置发送和接受的队列
task_queue = queue.Queue()
result_queue = queue.Queue()
class QueueManager(BaseManager):
pass
# 注册两个队列
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.refister('get_result_queue', callable=lambda: result_queue)
# 绑定端口并设置验证
manager = QueueManager(address=('', 5000), authkey=b'abc')
# 启动队列
manager.start()
# 获取网络访问的对象
task = manager.get_task_queue()
result = manager.get_result_queuq()
# 部署任务
for i in range(10):
n = random.randint(0, 10000)
print('Put task {}'.format(n))
task.put(n)
# 从接受队列读结果
print('Try get result')
for i in range(10):
r = result.get(timeout=10)
print('Result {}'.format(r))
manager.shutdown()
print('master exit')
之后可以在另一台机器或者本机运行以下子进程。
import time, sys, queue
from multiprocess.managers import BaseManager
class QueueManager(BaseManager):
pass
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
server_addr = '127.0.0.1'
print('Connect to {}'.format(server_addr))
m = QueueManager(address=(server_addr, 5000), authkey=b'abc')
m.connect()
task = get_task_queue()
result = get_result_queue()
for i in range(10):
try:
n = task.get(timeout=1)
print('run task {0} * {0}'.format(n))
r = '{0} * {0} = {1}'.format(n, n * n)
time.sleep(n)
result.put(r)
except Queue.Empty:
print('task queue is empty')
print('worker exit')
此时就可以在两个进程上观察任务的分配和运行结果了。