multiprocessing ——基于过程的并行性¶
源代码: Lib/multiprocessing/
介绍¶
multiprocessing 是一个支持使用类似于 threading 模块。这个 multiprocessing 包提供本地和远程并发,有效地避免了 Global Interpreter Lock 通过使用子进程而不是线程。因此, multiprocessing 模块允许程序员充分利用给定机器上的多个处理器。它同时在Unix和Windows上运行。
这个 multiprocessing 模块还引入了API,这些API在 threading 模块。一个主要的例子是 Pool 对象,它提供了一种方便的方法,可以跨多个输入值并行执行一个函数,并跨进程分布输入数据(数据并行)。下面的示例演示了在模块中定义此类函数以便子进程能够成功导入该模块的常见做法。此数据并行的基本示例使用 Pool ,:
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
将打印到标准输出:
[1, 4, 9]
这个 Process 类¶
在 multiprocessing ,通过创建 Process 对象,然后调用其 start() 方法。 Process 遵循的API threading.Thread . 多进程程序的一个简单示例是:
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
要显示涉及的各个进程ID,下面是一个扩展的示例:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
为了解释为什么 if __name__ == '__main__' 部分是必需的,请参见 程序设计指南 .
上下文和启动方法¶
取决于平台, multiprocessing 支持三种启动流程的方法。这些 启动方法 是
- 产卵
父进程启动一个新的Python解释器进程。子进程将仅继承运行Process对象的
run()方法。特别是,不会继承来自父进程的不必要的文件描述符和句柄。与使用以下方法启动进程相比,使用此方法启动进程相当慢 fork 或 分叉服务器 。在Unix和Windows上可用。Windows和MacOS上的默认设置。
- fork
父进程使用
os.fork()以派生python解释器。子进程开始时,实际上与父进程相同。父进程的所有资源都由子进程继承。注意,安全地复刻多线程进程是有问题的。仅在Unix上可用。Unix上的默认值。
- 福克斯服务器
当程序启动并选择 福克斯服务器 启动方法,启动服务器进程。从那时起,每当需要一个新进程时,父进程就会连接到服务器并请求它复刻一个新进程。fork服务器进程是单线程的,因此使用它是安全的
os.fork(). 不会继承不必要的资源。在支持通过Unix管道传递文件描述符的Unix平台上可用。
在 3.8 版更改: 在MacOS上, 产卵 Start方法现在是默认方法。这个 fork Start方法应该被认为是不安全的,因为它可能导致子进程崩溃。见 bpo-33725 .
在 3.4 版更改: 产卵 在所有Unix平台上添加,以及 福克斯服务器 为某些Unix平台添加。子进程不再继承Windows上所有父进程可继承的句柄。
在Unix上使用 产卵 或 福克斯服务器 Start方法也将启动 资源跟踪器 跟踪未链接的命名系统资源(如命名信号量或 SharedMemory 对象)由程序进程创建。当所有进程退出时,资源跟踪器将取消链接任何剩余的跟踪对象。通常应该没有,但如果一个进程被一个信号杀死,可能会有一些“泄漏”的资源。(泄漏的信号灯和共享内存段在下次重新启动之前都不会自动解除链接。这对于两个对象都是有问题的,因为系统只允许有限数量的命名信号量,并且共享内存段在主内存中占用了一些空间。)
要选择开始方法,请使用 set_start_method() 在 if __name__ == '__main__' 主模块的子句。例如::
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method() 不应在程序中多次使用。
或者,您可以使用 get_context() 获取上下文对象。上下文对象与多处理模块具有相同的API,并且允许在同一程序中使用多个启动方法。::
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
请注意,与一个上下文相关的对象可能与其他上下文的进程不兼容。尤其是,使用 fork 无法将上下文传递给使用 产卵 或 福克斯服务器 启动方法。
希望使用特定start方法的库可能应该使用 get_context() 以避免干扰库用户的选择。
警告
这个 'spawn' 和 'forkserver' Start方法当前不能与“冻结的”可执行文件(即由类似包生成的二进制文件)一起使用 PyInstaller 和 cx_Freeze 在UNIX上。这个 'fork' Start方法确实有效。
在进程之间交换对象¶
multiprocessing 支持两种进程间的通信通道:
Queues
这个
Queue类是的近复制queue.Queue. 例如::from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()队列是线程和进程安全的。
Pipes
这个
Pipe()函数返回由管道连接的一对连接对象,默认情况下,管道是双向的。例如::from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()返回的两个连接对象
Pipe()表示管道的两端。每个连接对象都有send()和recv()方法(等等)。请注意,如果两个进程(或线程)尝试读取或写入 same 同时结束管道。当然,同时使用不同管端的过程不会有损坏的风险。
进程之间的同步¶
multiprocessing 包含来自的所有同步原语的等价物 threading . 例如,可以使用锁来确保一次只有一个进程打印到标准输出:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
如果不使用来自不同进程的锁输出,则很容易混淆所有内容。
使用一批工人¶
这个 Pool 类表示工作进程池。它有一些方法可以以几种不同的方式将任务卸载到工作进程中。
例如::
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
请注意,池的方法只能由创建它的进程使用。
注解
此包中的功能要求 __main__ 模块可由子项导入。这包括在 程序设计指南 但是这里值得指出。这意味着一些例子,例如 multiprocessing.pool.Pool 示例在交互式解释器中不起作用。例如::
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> with p:
... p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(如果您尝试这样做,它实际上将以半随机方式交错输出三个完整的回溯,然后您可能必须以某种方式停止父进程。)
参考文献¶
这个 multiprocessing 包主要复制 threading 模块。
Process 和例外¶
- class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶
流程对象表示在单独流程中运行的活动。这个
Process类的所有方法都具有等价的threading.Thread.应始终使用关键字参数调用构造函数。 group 应该永远是
None;它的存在只是为了与threading.Thread. 目标 可调用对象是否由run()方法。它默认为None,表示没有调用任何内容。 name 是进程名(请参见name了解更多详细信息)。 args 是目标调用的参数元组。 关键字参数 是用于目标调用的关键字参数字典。如果提供,则仅关键字 守护进程 参数设置进程daemon旗到True或False. 如果None(默认),此标志将从创建过程继承。默认情况下,不会将任何参数传递给 目标 .
如果子类重写构造函数,它必须确保它调用基类构造函数 (
Process.__init__())在对过程进行任何其他操作之前。在 3.3 版更改: 增加了 守护进程 参数。
- run()¶
表示进程活动的方法。
可以在子类中重写此方法。标准
run()方法调用作为目标参数(如果有)传递给对象构造函数的可调用对象,其中顺序参数和关键字参数取自 args 和 关键字参数 分别是参数。
- join([timeout])¶
如果可选参数 timeout 是
None(默认值),该方法将一直阻塞到join()方法被调用终止。如果 timeout 是正数,最多阻塞 timeout 秒。注意,该方法返回None如果其进程终止或方法超时。检查进程的exitcode以确定是否终止。一个进程可以连接多次。
进程无法联接自身,因为这将导致死锁。在进程启动之前尝试加入它是一个错误。
- name¶
进程的名称。名称是一个仅用于标识的字符串。它没有语义。可以为多个进程赋予相同的名称。
初始名称由构造函数设置。如果没有为构造函数提供显式名称,则为表单'process-n'的名称1 N: 2 ……: k '是构造的,其中每个nk 是其父级的第n个子级。
- daemon¶
进程的守护进程标志,一个布尔值。必须在之前设置
start()被称为。初始值从创建过程继承。
当一个进程退出时,它会尝试终止它的所有后台子进程。
请注意,不允许后台进程创建子进程。否则,如果在父进程退出时终止守护进程,则守护进程将使其子进程保持孤立状态。此外,这些是 not Unix守护进程或服务,它们是正常的进程,如果非守护进程退出,它们将被终止(而不是加入)。
除了
threading.Thread应用程序编程接口,Process对象还支持以下属性和方法:- pid¶
返回进程ID。在生成进程之前,将
None.
- exitcode¶
子项的退出代码。这将是
None如果进程尚未终止。负值 -N 指示子级已被信号终止 N .
- authkey¶
进程的身份验证密钥(字节字符串)。
什么时候?
multiprocessing已初始化,主进程将使用os.urandom().当A
Process对象已创建,它将继承其父进程的身份验证密钥,尽管可以通过设置更改此密钥authkey到另一个字节字符串。见 身份验证密钥 .
- sentinel¶
系统对象的一种数字句柄,当进程结束时,它将变为“就绪”。
如果要同时等待多个事件,可以使用此值
multiprocessing.connection.wait(). 否则调用join()更简单。在Windows上,这是一个操作系统句柄,可用于
WaitForSingleObject和WaitForMultipleObjectsAPI调用系列。在Unix上,这是一个文件描述符,可用于select模块。3.3 新版功能.
- terminate()¶
终止进程。在Unix上,可以使用
SIGTERM信号;在窗口TerminateProcess()使用。请注意,不会执行退出处理程序和finally子句等。请注意,进程的后代进程将 not 被终止——他们只会变成孤儿。
警告
如果在关联进程使用管道或队列时使用此方法,则该管道或队列可能会损坏,并且可能无法由其他进程使用。同样,如果进程获得了锁或信号量等,那么终止它可能会导致其他进程死锁。
- kill()¶
等同于
terminate()但是使用SIGKILLUNIX上的信号。3.7 新版功能.
- close()¶
关闭
Process对象,释放与其关联的所有资源。ValueError如果基础进程仍在运行,则引发。一次close()成功返回的大多数其他方法和属性Process对象将升高ValueError.3.7 新版功能.
请注意
start(),join(),is_alive(),terminate()和exitcode方法只能由创建流程对象的流程调用。一些方法的示例用法
Process:>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True
- exception multiprocessing.ProcessError¶
所有人的基本阶级
multiprocessing例外情况。
- exception multiprocessing.BufferTooShort¶
引发的异常
Connection.recv_bytes_into()当提供的缓冲区对象太小而无法读取消息时。如果
e是的实例BufferTooShort然后e.args[0]将以字节字符串形式给出消息。
- exception multiprocessing.AuthenticationError¶
出现身份验证错误时引发。
- exception multiprocessing.TimeoutError¶
由超时超时的方法引发。
管道和队列¶
当使用多个进程时,通常使用消息传递在进程之间进行通信,并避免使用任何同步原语(如锁)。
传递信息时,可以使用 Pipe() (用于两个进程之间的连接)或队列(允许多个生产者和消费者)。
这个 Queue , SimpleQueue 和 JoinableQueue 类型包括多生产者、多消费者 FIFO 在上建模的队列 queue.Queue 标准库中的类。他们在这方面不同 Queue 缺乏 task_done() 和 join() python 2.5中引入的方法 queue.Queue 类。
如果你使用 JoinableQueue 那么你 must 调用 JoinableQueue.task_done() 对于从队列中删除的每个任务,或者用于计算未完成任务数的信号量最终可能溢出,从而引发异常。
注意,还可以使用管理器对象创建共享队列——请参见 经理 .
注解
multiprocessing 使用常规 queue.Empty 和 queue.Full 发出超时信号的异常。它们在中不可用 multiprocessing 名称空间,因此需要从 queue .
注解
当一个对象被放入队列中时,该对象被pickle,后台线程随后将pickled数据刷新到底层管道中。这会产生一些结果,这有点令人吃惊,但不应造成任何实际困难——如果它们确实让您感到困扰,那么您可以使用用 manager .
将对象放入空队列后,队列的
empty()方法返回False和get_nowait()可以不升高而返回queue.Empty.如果多个进程将对象排队,则可能会导致对象在另一端接收不正常。但是,由同一进程排队的对象将始终按照彼此的预期顺序排列。
警告
如果使用 Process.terminate() 或 os.kill() 当它试图使用 Queue ,则队列中的数据可能会损坏。这可能会导致任何其他进程在稍后尝试使用队列时获得异常。
警告
如上所述,如果子进程已将项目放入队列中(并且未使用 JoinableQueue.cancel_join_thread ,则在将所有缓冲项刷新到管道之前,该进程不会终止。
这意味着,如果尝试加入该进程,可能会出现死锁,除非您确定已放入队列的所有项都已被占用。同样,如果子进程是非守护进程,那么当父进程尝试加入其所有非守护进程子进程时,它可能会挂起退出。
请注意,使用管理器创建的队列不存在此问题。见 程序设计指南 .
有关使用队列进行进程间通信的示例,请参阅 实例 .
- multiprocessing.Pipe([duplex])¶
返回一对
(conn1, conn2)属于Connection表示管道末端的对象。如果 双工 是
True(默认)则管道是双向的。如果 双工 是False那么管道是单向的:conn1只能用于接收消息和conn2只能用于发送消息。
- class multiprocessing.Queue([maxsize])¶
返回使用管道和一些锁/信号灯实现的进程共享队列。当一个进程第一次将一个项目放入队列时,就会启动一个feeder线程,该线程将对象从缓冲区传输到管道中。
通常
queue.Empty和queue.Full标准库的例外情况queue模块被引发到信号超时。Queue实现的所有方法queue.Queue除了task_done()和join().- qsize()¶
返回队列的大致大小。由于多线程/多处理语义,这个数字不可靠。
注意,这可能会提高
NotImplementedError在像Mac OS X这样的Unix平台上,sem_getvalue()未实现。
- empty()¶
返回
True如果队列为空,False否则。由于多线程/多处理语义,这是不可靠的。
- full()¶
返回
True如果队列已满,False否则。由于多线程/多处理语义,这是不可靠的。
- put(obj[, block[, timeout]])¶
将obj放入队列。如果可选参数 块 是
True(违约)和 timeout 是None(默认),必要时阻止,直到可用插槽。如果 timeout 是正数,最多阻塞 timeout 秒,然后提高queue.Full如果在此时间内没有可用插槽,则为例外。否则( 块 是False,如果立即有可用的插槽,则将项目放入队列,否则将引发queue.Full例外(例外) timeout 在这种情况下被忽略)。在 3.8 版更改: 如果队列关闭,
ValueError被引发而不是AssertionError.
- put_nowait(obj)¶
相当于
put(obj, False).
- get([block[, timeout]])¶
从队列中移除并返回项目。如果可选参数 块 是
True(违约)和 timeout 是None(默认),必要时阻止,直到项目可用。如果 timeout 是正数,最多阻塞 timeout 秒,然后提高queue.Empty如果在该时间内没有可用的项目,则为例外。否则(块为False,如果一个项目立即可用,则返回该项目,否则将引发queue.Empty例外(例外) timeout 在这种情况下被忽略)。在 3.8 版更改: 如果队列关闭,
ValueError被引发而不是OSError.
- get_nowait()¶
相当于
get(False).
multiprocessing.Queue有一些其他方法在中找不到queue.Queue. 大多数代码通常不需要这些方法:- close()¶
指示当前进程不会再将数据放入此队列。后台线程将所有缓冲数据刷新到管道后将退出。当队列被垃圾收集时,将自动调用此函数。
- join_thread()¶
加入后台线程。只能在以下时间后使用
close()已被调用。它将一直阻塞,直到后台线程退出,确保缓冲区中的所有数据都已刷新到管道中。默认情况下,如果进程不是队列的创建者,那么在退出时,它将尝试加入队列的后台线程。进程可以调用
cancel_join_thread()使join_thread()什么也不做。
- cancel_join_thread()¶
预防
join_thread()阻止。特别是,这可以防止后台线程在进程退出时自动加入——请参见join_thread().此方法的更好名称可能是
allow_exit_without_flush(). 它很可能会导致排队的数据丢失,您几乎可以肯定不需要使用它。只有当您需要当前进程立即退出而不必等待将排队的数据刷新到底层管道中,并且您不关心丢失的数据时,才会出现这种情况。
注解
此类的功能要求在主机操作系统上实现一个正常工作的共享信号量。如果没有这个类,这个类中的功能将被禁用,并尝试实例化一个
Queue将导致ImportError. 见 bpo-3770 更多信息。下面列出的任何专用队列类型都是如此。
- class multiprocessing.SimpleQueue¶
它是一个简化的
Queue类型,非常接近锁定Pipe.- empty()¶
返回
True如果队列为空,False否则。
- get()¶
从队列中移除并返回项目。
- put(item)¶
放 item 进入队列。
- class multiprocessing.JoinableQueue([maxsize])¶
JoinableQueue,AQueue子类,是另外具有task_done()和join()方法。- task_done()¶
指示以前排队的任务已完成。由队列使用者使用。对于每一个
get()用于获取任务,随后调用task_done()告诉队列任务的处理已完成。如果A
join()当前正在阻止,它将在处理完所有项目后恢复(意味着task_done()每一件物品都接到了调用put()进入队列)。提出一个
ValueError如果调用次数超过了队列中放置的项目数。
- join()¶
阻止,直到队列中的所有项目都被获取和处理。
每当将项目添加到队列时,未完成任务的计数就会增加。每当消费者调用时,计数就会下降。
task_done()以指示已检索到该项,并且对其进行的所有工作都已完成。当未完成任务的计数降至零时,join()解除阻塞。
其他¶
- multiprocessing.active_children()¶
返回当前进程的所有活动子进程的列表。
调用它会产生“连接”任何已经完成的进程的副作用。
- multiprocessing.cpu_count()¶
返回系统中的CPU数。
这个数字不等于当前进程可以使用的CPU数量。可用CPU的数量可以通过
len(os.sched_getaffinity(0))可以提高
NotImplementedError.
- multiprocessing.parent_process()¶
返回
Process对象对应的父进程current_process(). 对于主要过程,parent_process将None.3.8 新版功能.
- multiprocessing.freeze_support()¶
当程序使用
multiprocessing已冻结以生成Windows可执行文件。(已用 PY2Exe公司 , PyInstaller 和 cx_Freeze )我们需要在
if __name__ == '__main__'主模块的线路。例如::from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
如果
freeze_support()行被省略,然后尝试运行冻结的可执行文件将引发RuntimeError.调用
freeze_support()在除Windows以外的任何操作系统上调用时不起作用。此外,如果模块在Windows上由python解释器正常运行(程序尚未冻结),则freeze_support()没有效果。
- multiprocessing.get_all_start_methods()¶
返回受支持的Start方法的列表,其中第一个方法是默认方法。可能的启动方法是
'fork','spawn'和'forkserver'. 仅在Windows上'spawn'是可用的。在UNIX上'fork'和'spawn'始终支持,与'fork'作为默认值。3.4 新版功能.
- multiprocessing.get_context(method=None)¶
返回与具有相同属性的上下文对象
multiprocessing模块。如果 方法 是
None然后返回默认上下文。否则 方法 应该是'fork','spawn','forkserver'.ValueError如果指定的Start方法不可用,则引发。3.4 新版功能.
- multiprocessing.get_start_method(allow_none=False)¶
返回用于启动进程的Start方法的名称。
如果启动方法尚未修复,并且 allow_none 如果为false,则start方法将固定为默认值,并返回名称。如果启动方法尚未修复,并且 allow_none 那么是真的
None返回。返回值可以是
'fork','spawn','forkserver'或None.'fork'是Unix上的默认值,而'spawn'是Windows上的默认设置。3.4 新版功能.
- multiprocessing.set_executable()¶
设置启动子进程时要使用的python解释器的路径。(默认情况下)
sys.executable使用)。嵌入程序可能需要执行以下操作:set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
在创建子进程之前。
在 3.4 版更改: 现在在Unix上支持
'spawn'使用Start方法。
- multiprocessing.set_start_method(method)¶
设置用于启动子进程的方法。 方法 可以是
'fork','spawn'或'forkserver'.请注意,最多应调用一次,并且应在
if __name__ == '__main__'主模块的子句。3.4 新版功能.
连接对象¶
连接对象允许发送和接收可选取对象或字符串。它们可以被认为是面向消息的连接套接字。
- class multiprocessing.connection.Connection¶
- send(obj)¶
将对象发送到连接的另一端,使用
recv().对象必须可拾取。非常大的腌菜(大约32 mib+,尽管这取决于操作系统)可能会导致
ValueError例外。
- fileno()¶
返回连接使用的文件描述符或句柄。
- close()¶
关闭连接。
当连接被垃圾收集时,将自动调用此函数。
- poll([timeout])¶
返回是否有可读取的数据。
如果 timeout 如果未指定,则它将立即返回。如果 timeout 是一个数字,然后指定要阻止的最长时间(秒)。如果 timeout 是
None然后使用无限超时。请注意,可以使用
multiprocessing.connection.wait().
- send_bytes(buffer[, offset[, size]])¶
从发送字节数据 bytes-like object 作为完整的信息。
如果 抵消 然后从该位置读取数据 缓冲区 . 如果 size 则从缓冲区中读取许多字节。非常大的缓冲区(约32 mib+,但取决于操作系统)可能会引发
ValueError例外
- recv_bytes([maxlength])¶
以字符串形式返回从连接另一端发送的字节数据的完整消息。直到有东西要接收。引发
EOFError如果没有东西可以接收,另一端已经关闭。如果 最大长度 已指定,消息长度超过 最大长度 然后
OSError引发,连接将不再可读。
- recv_bytes_into(buffer[, offset])¶
读入 缓冲区 从连接的另一端发送的字节数据的完整消息,并返回消息中的字节数。直到有东西要接收。引发
EOFError如果没有东西可以接收,另一端就关闭了。缓冲区 必须是可写的 bytes-like object . 如果 抵消 则消息将从该位置写入缓冲区。偏移量必须是小于的长度的非负整数 缓冲区 (以字节为单位)。
如果缓冲区太短,则
BufferTooShort引发异常,完整消息可用为e.args[0]在哪里?e是异常实例。
在 3.3 版更改: 连接对象本身现在可以在进程之间使用
Connection.send()和Connection.recv().3.3 新版功能: 连接对象现在支持上下文管理协议——请参见 上下文管理器类型 .
__enter__()返回连接对象,以及__exit__()调用close().
例如:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
警告
这个 Connection.recv() 方法会自动取消拾取它接收的数据,这可能会带来安全风险,除非您可以信任发送消息的进程。
警告
如果进程在试图读取或写入管道时被终止,那么管道中的数据可能会损坏,因为可能无法确定消息边界在哪里。
同步原语¶
通常,同步原语在多进程程序中不像在多线程程序中那样必要。参见文档 threading 模块。
注意,还可以使用管理器对象创建同步原语——请参见 经理 .
- class multiprocessing.Barrier(parties[, action[, timeout]])¶
屏障对象:的复制
threading.Barrier.3.3 新版功能.
- class multiprocessing.BoundedSemaphore([value])¶
有界信号量对象:对
threading.BoundedSemaphore.它与同类产品的唯一区别在于:
acquire方法的第一个参数命名为 块 ,与Lock.acquire().注解
在Mac OS X上,这与
Semaphore因为sem_getvalue()未在该平台上实现。
- class multiprocessing.Condition([lock])¶
条件变量:的别名
threading.Condition.如果 lock 是指定的,那么它应该是
Lock或RLock对象从multiprocessing.在 3.3 版更改: 这个
wait_for()方法已添加。
- class multiprocessing.Event¶
一个复制
threading.Event.
- class multiprocessing.Lock¶
非递归锁对象:对
threading.Lock. 一旦一个进程或线程获得了一个锁,随后从任何进程或线程获取它的尝试将被阻塞,直到释放它;任何进程或线程都可能释放它。的概念和行为threading.Lock当它应用于线程时,在此处复制multiprocessing.Lock因为它适用于进程或线程,除非另有说明。注意
Lock实际上是一个factory函数,它返回multiprocessing.synchronize.Lock用默认上下文初始化。Lock支持 context manager 协议,因此可用于with声明。- acquire(block=True, timeout=None)¶
获取一个锁、阻塞或非阻塞。
与 块 参数设置为
True(默认),方法调用将阻塞,直到锁处于未锁定状态,然后将其设置为已锁定并返回True. 注意,第一个参数的名称与threading.Lock.acquire().与 块 参数设置为
False,方法调用不阻塞。如果锁当前处于锁定状态,则返回False;否则将锁设置为锁定状态并返回True.当使用正的浮点值调用时, timeout ,最多阻止指定的秒数 timeout 只要无法获取锁。具有负值的调用 timeout 相当于 timeout 为零。调用 timeout 价值
None(默认设置)将超时时间设置为无限。注意治疗阴性或None值为 timeout 不同于threading.Lock.acquire(). 这个 timeout 如果 块 参数设置为False因此被忽略。返回True如果已获取锁,或False如果超时时间已过。
- release()¶
释放锁。这可以从任何进程或线程调用,而不仅仅是最初获取锁的进程或线程。
行为与
threading.Lock.release()但当在未锁定的锁上调用时,ValueError提高了。
- class multiprocessing.RLock¶
递归锁对象:对
threading.RLock. 获取递归锁的进程或线程必须释放它。一旦一个进程或线程获得了递归锁,同一进程或线程就可以在不阻塞的情况下再次获取它;该进程或线程必须在每次获取它时释放一次。注意
RLock实际上是一个factory函数,它返回multiprocessing.synchronize.RLock用默认上下文初始化。RLock支持 context manager 协议,因此可用于with声明。- acquire(block=True, timeout=None)¶
获取一个锁、阻塞或非阻塞。
当使用 块 参数设置为
True,阻塞,直到锁处于未锁定状态(不属于任何进程或线程),除非该锁已属于当前进程或线程。然后,当前进程或线程取得锁的所有权(如果它还没有所有权),锁内的递归级别将增加一个,从而导致返回值为True.注意,第一个参数的行为与threading.RLock.acquire(),从参数本身的名称开始。当使用 块 参数设置为
False,不要阻塞。如果锁已经被另一个进程或线程获取(因此拥有),则当前进程或线程不拥有所有权,并且锁内的递归级别也不会更改,从而导致返回值为False. 如果锁处于未锁定状态,则当前进程或线程将取得所有权,并且递归级别将增加,从而导致返回值为True.使用和行为 timeout 参数与中的相同
Lock.acquire(). 注意,其中一些行为 timeout 不同于threading.RLock.acquire().
- release()¶
释放一个锁,减少递归级别。如果递减后递归级别为零,则将锁重置为未锁定(不属于任何进程或线程),如果阻止任何其他进程或线程等待锁解锁,则只允许其中一个进程或线程继续。如果递减后递归级别仍然为非零,则锁将保持锁定,并由调用进程或线程拥有。
只有在调用进程或线程拥有锁时才调用此方法。安
AssertionError如果此方法由所有者以外的进程或线程调用,或者锁处于未锁定(无主)状态,则引发。请注意,在这种情况下引发的异常类型与threading.RLock.release().
- class multiprocessing.Semaphore([value])¶
信号量对象:对
threading.Semaphore.它与同类产品的唯一区别在于:
acquire方法的第一个参数命名为 块 ,与Lock.acquire().
注解
在Mac OS X上, sem_timedwait 不支持,因此调用 acquire() 超时将使用休眠循环模拟该函数的行为。
注解
如果sigint信号由 Ctrl-C 当主线程被对的调用阻止时到达 BoundedSemaphore.acquire() , Lock.acquire() , RLock.acquire() , Semaphore.acquire() , Condition.acquire() 或 Condition.wait() 然后调用将立即中断,并且 KeyboardInterrupt 将被引发。
这与 threading 其中sigint将在进行等效的阻塞调用时被忽略。
注解
这个包的某些功能需要在主机操作系统上实现一个正常工作的共享信号量。没有一个, multiprocessing.synchronize 模块将被禁用,并且尝试导入它将导致 ImportError . 见 bpo-3770 更多信息。
经理¶
管理器提供了一种创建可以在不同进程之间共享的数据的方法,包括在不同计算机上运行的进程之间通过网络共享数据。管理器对象控制管理 共享对象 . 其他进程可以使用代理访问共享对象。
返回已开始的
SyncManager对象,可用于在进程之间共享对象。返回的管理器对象与生成的子进程相对应,并且具有创建共享对象并返回相应代理的方法。
一旦管理器进程被垃圾收集或其父进程退出,它们将立即关闭。管理器类在 multiprocessing.managers 模块:
- class multiprocessing.managers.BaseManager([address[, authkey]])¶
创建一个baseManager对象。
一旦创建,应该调用
start()或get_server().serve_forever()以确保管理器对象引用已启动的管理器进程。地址 是管理器进程侦听新连接的地址。如果 地址 是
None然后选择一个任意的。身份验证密钥 是用于检查服务器进程的传入连接的有效性的身份验证密钥。如果 身份验证密钥 是
None然后current_process().authkey使用。否则 身份验证密钥 必须是字节字符串。- start([initializer[, initargs]])¶
启动子流程以启动管理器。如果 初始化器 不是
None然后子进程将调用initializer(*initargs)当它开始的时候。
- get_server()¶
返回A
Server对象,表示由管理器控制的实际服务器。这个Server对象支持serve_forever()方法:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server另外还有一个address属性。
- connect()¶
将本地管理器对象连接到远程管理器进程::
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
- register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶
可用于向Manager类注册类型或可调用的ClassMethod。
打字 是“类型标识符”,用于标识特定类型的共享对象。这必须是一个字符串。
可赎回的 可调用,用于为此类型标识符创建对象。如果管理器实例将使用
connect()方法,或者如果 create_method 论证是False那么这个可以留作None.脯氨酸型 是的子类
BaseProxy用于为此共享对象创建代理 打字 . 如果None然后会自动创建一个代理类。暴露的 用于指定方法名的序列,应允许此typeid的代理使用
BaseProxy._callmethod(). (如果 暴露的 是None然后proxytype._exposed_如果存在,则使用。)如果未指定公开列表,则共享对象的所有“公共方法”都将可访问。(此处,“公共方法”是指具有__call__()方法,其名称不以开头'_')method_to_typeid 是一个映射,用于指定那些应返回代理的公开方法的返回类型。它将方法名映射到typeid字符串。(如果 method_to_typeid 是
None然后proxytype._method_to_typeid_如果方法的名称不是此映射的键,或者如果映射是None然后,该方法返回的对象将按值进行复制。create_method 确定是否应使用名称创建方法 打字 它可以用来告诉服务器进程创建一个新的共享对象并返回其代理。默认情况下是
True.
BaseManager实例还具有一个只读属性:- address¶
经理使用的地址。
在 3.3 版更改: 管理器对象支持上下文管理协议——请参见 上下文管理器类型 .
__enter__()启动服务器进程(如果尚未启动),然后返回Manager对象。__exit__()调用shutdown().在以前的版本中
__enter__()未启动管理器的服务器进程(如果尚未启动)。
- class multiprocessing.managers.SyncManager¶
一个子类
BaseManager可用于进程同步。此类型的对象由返回multiprocessing.Manager().其方法创建和返回 代理对象 用于跨进程同步的许多常用数据类型。这尤其包括共享列表和字典。
- Barrier(parties[, action[, timeout]])¶
创建共享
threading.Barrier对象并返回其代理。3.3 新版功能.
- BoundedSemaphore([value])¶
创建共享
threading.BoundedSemaphore对象并返回其代理。
- Condition([lock])¶
创建共享
threading.Condition对象并返回其代理。如果 lock 则它应该是
threading.Lock或threading.RLock对象。在 3.3 版更改: 这个
wait_for()方法已添加。
- Event()¶
创建共享
threading.Event对象并返回其代理。
- Lock()¶
创建共享
threading.Lock对象并返回其代理。
- Queue([maxsize])¶
创建共享
queue.Queue对象并返回其代理。
- RLock()¶
创建共享
threading.RLock对象并返回其代理。
- Semaphore([value])¶
创建共享
threading.Semaphore对象并返回其代理。
- Array(typecode, sequence)¶
创建一个数组并返回它的代理。
- Value(typecode, value)¶
使用可写的
value属性并返回其代理。
在 3.6 版更改: 共享对象可以嵌套。例如,共享容器对象(如共享列表)可以包含其他共享对象,这些对象都将由
SyncManager.
- class multiprocessing.managers.Namespace¶
可以注册的类型
SyncManager.命名空间对象没有公共方法,但具有可写属性。它的表示形式显示了其属性的值。
但是,当为命名空间对象使用代理时,以
'_'将是代理的属性,而不是引用的属性:>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
定制经理¶
要创建自己的管理器,可以创建 BaseManager 并使用 register() 用于向Manager类注册新类型或可调用项的ClassMethod。例如::
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
使用远程管理器¶
可以在一台计算机上运行管理服务器,并让客户机从其他计算机上使用它(假定涉及的防火墙允许)。
运行以下命令将为远程客户端可以访问的单个共享队列创建服务器:
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
一个客户端可以访问服务器,如下所示:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
另一个客户端也可以使用它:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
本地进程也可以访问该队列,使用客户端上的上述代码远程访问该队列:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super(Worker, self).__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
代理对象¶
代理是一个对象, 指 一个共同的对象,它(大概)生活在一个不同的过程中。共享对象被称为 参照物 代理的。多个代理对象可以具有相同的引用。
代理对象具有调用其引用的相应方法的方法(尽管并非引用的每个方法都必须通过代理可用)。这样,代理可以像其引用一样使用:
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
注意,申请 str() 委托人将返回被委托人的陈述,而 repr() 将返回代理的表示。
代理对象的一个重要特性是它们是可拾取的,因此可以在进程之间传递。因此,引用可以包含 代理对象 . 这允许嵌套这些托管列表、dict和其他 代理对象 :
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
类似地,dict和list代理可以相互嵌套:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
如果标准(非代理) list 或 dict 对象包含在引用中,对这些可变值的修改将不会通过管理器传播,因为代理无法知道何时修改其中包含的值。但是,在容器代理中存储值(这会触发 __setitem__ 在代理对象上)不会通过管理器传播,因此为了有效地修改此类项,可以将修改后的值重新分配给容器代理:
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
这种方法可能比使用嵌套方法不方便 代理对象 对于大多数用例,还演示了对同步的控制级别。
注解
代理输入 multiprocessing 不支持按值进行比较。例如,我们有:
>>> manager.list([1,2,3]) == [1,2,3]
False
在进行比较时,应该只使用引用的副本。
- class multiprocessing.managers.BaseProxy¶
代理对象是
BaseProxy.- _callmethod(methodname[, args[, kwds]])¶
调用并返回代理引用方法的结果。
如果
proxy是其引用为obj然后表达式:proxy._callmethod(methodname, args, kwds)
将计算表达式::
getattr(obj, methodname)(*args, **kwds)
在经理的过程中。
返回的值将是调用结果的副本或新共享对象的代理——请参见 method_to_typeid 的参数
BaseManager.register().如果调用引发异常,则由
_callmethod().如果在管理器进程中引发其他异常,则此异常将转换为RemoteError异常,由引发_callmethod().请特别注意,如果 方法名称 还没有 暴露的 .
使用的示例
_callmethod():>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
- _getvalue()¶
返回引用文件的副本。
如果引用不可勾选,则会引发异常。
- __repr__()¶
返回代理对象的表示形式。
- __str__()¶
返回引用的表示形式。
清理¶
代理对象使用weakref回调,这样当它被垃圾收集时,它就会从拥有其引用的管理器中注销自己。
当不再有任何引用共享对象的代理时,共享对象将从管理器进程中删除。
进程池¶
您可以创建一个进程池,该池将执行提交给它的任务 Pool 类。
- class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶
一个进程池对象,它控制可向其提交任务的工作进程池。它支持带超时和回调的异步结果,并具有并行映射实现。
过程 是要使用的工作进程数。如果 过程 是
None然后返回的号码os.cpu_count()使用。如果 初始化器 不是
None然后每个工作进程将调用initializer(*initargs)当它开始的时候。子级最大任务数 是指工作进程在退出并替换为新的工作进程之前可以完成的任务数,以便释放未使用的资源。默认值 子级最大任务数 是
None这意味着工作进程将和池一样长。context 可用于指定用于启动工作进程的上下文。通常使用函数创建池
multiprocessing.Pool()或Pool()上下文对象的方法。在两种情况下 context 设置正确。注意,pool对象的方法只能由创建pool的进程调用。
警告
multiprocessing.pool对象具有需要通过将池用作上下文管理器或通过调用close()和terminate()手动操作。如果不这样做,可能会导致进程在完成时挂起。注意它是 不正确 依赖垃圾收集器销毁池,因为CPython不能确保调用池的终结器(请参见
object.__del__()更多信息)。3.2 新版功能: 子级最大任务数
3.4 新版功能: context
注解
a中的工作进程
Pool通常在池工作队列的整个持续时间内活动。在其他系统(如apache、mod wsgi等)中发现的一种常见模式是,允许池中的工作人员在退出、清理和生成新进程以替换旧进程之前仅完成一定数量的工作。这个 子级最大任务数 参数Pool向最终用户公开此功能。- apply(func[, args[, kwds]])¶
调用 func 带着参数 args 和关键字参数 kwds . 它会一直阻塞,直到结果准备就绪。考虑到这一块,
apply_async()更适合并行执行工作。此外, func 只在池中的一个工作人员中执行。
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])¶
的变体
apply()返回AsyncResult对象。如果 回调 则它应该是接受单个参数的可调用文件。当结果准备好时 回调 应用于它,即除非调用失败,在这种情况下, error_callback 而是应用。
如果 error_callback 则它应该是接受单个参数的可调用文件。如果目标函数失败,则 error_callback 使用异常实例调用。
回调应该立即完成,否则处理结果的线程将被阻塞。
- map(func, iterable[, chunksize])¶
平行等价于
map()内置功能(仅支持一个 可迭代的 但是,对于多个iterables,请参见starmap()). 它会阻塞直到结果就绪。此方法将ITerable剪切成若干块,作为单独的任务提交给流程池。这些块的(近似)大小可以通过设置 chunksize 为正整数。
请注意,它可能会导致非常长的iterables的高内存使用率。考虑使用
imap()或imap_unordered()显式 chunksize 提高效率的选项。
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
的变体
map()返回AsyncResult对象。如果 回调 则它应该是接受单个参数的可调用文件。当结果准备好时 回调 应用于它,即除非调用失败,在这种情况下, error_callback 而是应用。
如果 error_callback 则它应该是接受单个参数的可调用文件。如果目标函数失败,则 error_callback 使用异常实例调用。
回调应该立即完成,否则处理结果的线程将被阻塞。
- imap(func, iterable[, chunksize])¶
更懒惰的版本
map().这个 chunksize 参数与
map()方法。对于非常长的iterables,使用大值 chunksize 可以使任务完成 much 比使用默认值更快1.如果 chunksize 是
1然后next()由返回的迭代器的方法imap()方法具有可选的 timeout 参数:next(timeout)将提高multiprocessing.TimeoutError如果结果不能在 timeout 秒。
- imap_unordered(func, iterable[, chunksize])¶
一样
imap()除了返回的迭代器结果的顺序应该被认为是任意的。(只有在只有一个工作进程时,订单才保证“正确”。)
- starmap(func, iterable[, chunksize])¶
类似于
map()除了 可迭代的 应该是作为参数解包的iterables。因此,一个 可迭代的 属于
[(1,2), (3, 4)]结果在[func(1,2), func(3,4)].3.3 新版功能.
- starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
结合
starmap()和map_async()重复 可迭代的 调用号码和调用号码 func 打开了iTerables。返回结果对象。3.3 新版功能.
- close()¶
阻止将更多任务提交到池。所有任务完成后,工作进程将退出。
- terminate()¶
立即停止工作进程,而不完成未完成的工作。当池对象被垃圾收集时
terminate()将立即调用。
- join()¶
等待工作进程退出。必须调用
close()或terminate()使用前join().
3.3 新版功能: 池对象现在支持上下文管理协议——请参见 上下文管理器类型 .
__enter__()返回pool对象,以及__exit__()调用terminate().
- class multiprocessing.pool.AsyncResult¶
返回的结果的类
Pool.apply_async()和Pool.map_async().- get([timeout])¶
返回结果。如果 timeout 不是
None结果不在 timeout 然后秒multiprocessing.TimeoutError提高了。如果远程调用引发异常,则该异常将由get().
- wait([timeout])¶
等待结果可用或直到 timeout 秒通过。
- ready()¶
返回调用是否已完成。
- successful()¶
返回呼叫是否已完成,但未引发异常。将提高
ValueError如果结果还没有准备好。在 3.7 版更改: 如果结果还没有准备好,
ValueError被提升而不是AssertionError.
下面的示例演示池的使用:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
听众和客户¶
通常,进程之间的消息传递是使用队列或 Connection 返回的对象 Pipe() .
然而, multiprocessing.connection 模块允许一些额外的灵活性。它基本上提供了一个用于处理套接字或窗口命名管道的高级面向消息的API。它还支持 摘要式身份验证 使用 hmac 模块,用于同时轮询多个连接。
- multiprocessing.connection.deliver_challenge(connection, authkey)¶
将随机生成的消息发送到连接的另一端,然后等待答复。
如果回复与消息摘要匹配,则使用 身份验证密钥 作为密钥,欢迎消息将发送到连接的另一端。否则
AuthenticationError提高了。
- multiprocessing.connection.answer_challenge(connection, authkey)¶
接收消息,使用计算消息摘要 身份验证密钥 作为键,然后将摘要发回。
如果未收到欢迎信息,则
AuthenticationError提高了。
- multiprocessing.connection.Client(address[, family[, authkey]])¶
尝试建立到使用地址的侦听器的连接 地址 返回一个
Connection.连接类型由 家庭 参数,但这通常可以省略,因为它通常可以从 地址 . (见 地址格式 )
如果 身份验证密钥 是给定的而不是无的,它应该是一个字节字符串,并将用作基于HMAC的身份验证质询的密钥。如果 身份验证密钥 一个也没有。
AuthenticationError在身份验证失败时引发。见 身份验证密钥 .
- class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])¶
“正在侦听”连接的绑定套接字或Windows命名管道的封装。
地址 侦听器对象的绑定套接字或命名管道要使用的地址。
注解
如果使用地址“0.0.0.0”,则该地址将不是Windows上可连接的终结点。如果需要可连接的端点,则应使用“127.0.0.1”。
家庭 是要使用的套接字(或命名管道)的类型。这可以是字符串之一
'AF_INET'(对于TCP套接字)'AF_UNIX'(对于UNIX域套接字)或'AF_PIPE'(对于名为pipe的窗口)。其中只有第一个是保证可用的。如果 家庭 是None然后根据 地址 . 如果 地址 也是None然后选择默认值。此默认值是假定为可用速度最快的族。见 地址格式 . 注意如果 家庭 是'AF_UNIX'地址是None然后将在使用创建的专用临时目录中创建套接字tempfile.mkstemp().如果侦听器对象使用套接字,则 积压 (默认为1)传递给
listen()套接字绑定后的方法。如果 身份验证密钥 是给定的而不是无的,它应该是一个字节字符串,并将用作基于HMAC的身份验证质询的密钥。如果 身份验证密钥 一个也没有。
AuthenticationError在身份验证失败时引发。见 身份验证密钥 .- accept()¶
接受侦听器对象的绑定套接字或命名管道上的连接,并返回
Connection对象。如果尝试身份验证但失败,则AuthenticationError提高了。
- close()¶
关闭侦听器对象的绑定套接字或命名管道。当侦听器被垃圾收集时,将自动调用此函数。但是,最好明确地调用它。
侦听器对象具有以下只读属性:
- address¶
侦听器对象正在使用的地址。
- last_accepted¶
上次接受的连接来自的地址。如果这个不可用,那么它是
None.
3.3 新版功能: 侦听器对象现在支持上下文管理协议——请参见 上下文管理器类型 .
__enter__()返回Listener对象,以及__exit__()调用close().
- multiprocessing.connection.wait(object_list, timeout=None)¶
等到有东西进来 object_list 准备好了。返回这些对象的列表 object_list 准备好了。如果 timeout 是一个浮点数,然后调用最多阻塞这么多秒。如果 timeout 是
None然后它将无限期阻塞。负超时等于零超时。对于Unix和Windows,对象可以出现在 object_list 如果是
可读的
Connection对象;一个连接和可读的
socket.socket对象;或
当可以从连接或套接字对象中读取数据,或者另一端已关闭时,连接或套接字对象已就绪。
Unix :
wait(object_list, timeout)几乎相等select.select(object_list, [], [], timeout).区别在于,如果select.select()被信号中断,它可以上升OSError错误号为EINTR而wait()不会。Windows :中的项目 object_list 必须是可等待的整数句柄(根据win32函数文档使用的定义)
WaitForMultipleObjects())或者它可以是一个带有fileno()返回套接字句柄或管道句柄的方法。(请注意,管道手柄和Socket手柄 not 可等待的手柄。)3.3 新版功能.
Examples
以下服务器代码创建使用 'secret password' 作为身份验证密钥。然后它等待连接并向客户机发送一些数据:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
以下代码连接到服务器并从服务器接收一些数据:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
以下代码使用 wait() 要同时等待来自多个进程的消息,请执行以下操作:
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
地址格式¶
安
'AF_INET'地址是表单的元组(hostname, port)在哪里? 主机名 是一个字符串 port 是一个整数。安
'AF_UNIX'地址是表示文件系统上文件名的字符串。一个
'AF_PIPE'地址是以下形式的字符串r'\.\pipe{PipeName}'。要使用Client()要连接到名为的远程计算机上的命名管道,请执行以下操作 ServerName 您应该使用以下形式的地址r'\{ServerName}\pipe{PipeName}'取而代之的是。
注意,默认情况下,以两个反斜杠开头的任何字符串都假定为 'AF_PIPE' 地址而不是 'AF_UNIX' 地址。
身份验证密钥¶
当一个人使用 Connection.recv ,接收到的数据将自动取消勾选。不幸的是,从不受信任的源中提取数据存在安全风险。因此 Listener 和 Client() 使用 hmac 提供摘要式身份验证的模块。
身份验证密钥是一个字节字符串,可以认为是一个密码:一旦建立了连接,两端都需要证明对方知道身份验证密钥。(证明两端使用相同的键 not 包括通过连接发送密钥。)
如果请求身份验证但未指定身份验证密钥,则返回值为 current_process().authkey 使用(见) Process )此值将自动由任何 Process 当前进程创建的对象。这意味着(默认情况下)多进程程序的所有进程都将共享一个身份验证密钥,该密钥可以在它们之间建立连接时使用。
也可以通过使用 os.urandom() .
登录¶
提供了一些日志记录支持。但是,请注意, logging 包不使用进程共享锁,因此(取决于处理程序类型)可能会混淆来自不同进程的消息。
- multiprocessing.get_logger()¶
返回使用的记录器
multiprocessing. 如有必要,将创建一个新的。当第一次创建时,记录器具有级别
logging.NOTSET没有默认的处理程序。默认情况下,发送到此记录器的消息不会传播到根记录器。注意,在Windows上,子进程将只继承父进程记录器的级别——记录器的任何其他自定义都不会被继承。
- multiprocessing.log_to_stderr()¶
此函数执行对
get_logger()但是除了返回由get_logger创建的记录器之外,它还添加了一个将输出发送到sys.stderr使用格式'[%(levelname)s/%(processName)s] %(message)s'.
下面是打开日志记录的示例会话:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
有关日志记录级别的完整表,请参见 logging 模块。
这个 multiprocessing.dummy 模块¶
multiprocessing.dummy 复制的API multiprocessing 但仅仅是一个封装 threading 模块。
特别值得一提的是, Pool 由以下人员提供的功能 multiprocessing.dummy 返回 ThreadPool ,它是的子类 Pool 它支持所有相同的方法调用,但使用的是工作线程池,而不是工作进程。
- class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])¶
控制可以向其提交作业的工作线程池的线程池对象。
ThreadPool实例与完全兼容接口Pool实例,并且还必须正确管理它们的资源,方法是将池用作上下文管理器或调用close()和terminate()手工操作。流程 要使用的工作线程数。如果 流程 是
None则由返回的数字os.cpu_count()是使用的。如果 初始化器 不是
None然后每个工作进程将调用initializer(*initargs)当它开始的时候。不像
Pool, maxtaskperChild 和 上下文 无法提供。注解
A
ThreadPool共享与相同的接口Pool,它是围绕进程池设计的,在引入concurrent.futures模块。因此,它继承了一些对于线程支持的池没有意义的操作,并且它有自己的类型来表示异步作业的状态,AsyncResult,这是任何其他库都不能理解的。用户通常应该更喜欢使用
concurrent.futures.ThreadPoolExecutor,它有一个从一开始就围绕线程设计的更简单的界面,它返回concurrent.futures.Future实例,这些实例与许多其他库兼容,包括asyncio。
程序设计指南¶
在使用时应遵守某些准则和习惯用法。 multiprocessing .
所有启动方法¶
以下内容适用于所有启动方法。
避免共享状态
尽可能避免在进程之间转移大量数据。
最好还是坚持使用队列或管道在进程之间进行通信,而不是使用较低级别的同步原语。
可摘性
确保代理方法的参数是可拾取的。
代理的线程安全性
不要使用来自多个线程的代理对象,除非用锁保护它。
(使用 same 代理服务器)
加入僵尸进程
在Unix上,当进程完成但尚未加入时,它将变成僵尸。永远不应该有太多,因为每次一个新进程启动(或
active_children()将联接所有尚未联接的已完成进程。同时调用已完成进程的Process.is_alive将加入进程。即使如此,显式地加入所有您开始的流程可能也是一个好的实践。
继承总比泡菜/松饼好。
当使用 产卵 或 福克斯服务器 从许多类型开始方法
multiprocessing需要可选择,以便子进程可以使用它们。但是,通常应该避免使用管道或队列将共享对象发送到其他进程。相反,您应该安排程序,以便需要访问在其他地方创建的共享资源的进程可以从祖先进程继承该资源。
避免终止进程
使用
Process.terminate停止进程的方法可能会导致该进程当前使用的任何共享资源(如锁、信号量、管道和队列)中断或对其他进程不可用。因此,最好只考虑使用
Process.terminate在从不使用任何共享资源的进程上。
连接使用队列的进程
请记住,将项目放入队列的进程将在终止之前等待,直到所有缓冲项目由“feeder”线程馈送到底层管道。(子进程可以调用
Queue.cancel_join_thread用于避免此行为的队列方法。)这意味着,无论何时使用队列,都需要确保在加入进程之前,已放入队列的所有项目最终都将被删除。否则,您无法确保已将项目放入队列的进程将终止。还请记住,非后台进程将自动加入。
死锁的例子如下:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()这里的一个解决方法是交换最后两行(或者简单地删除
p.join()线)。
显式地将资源传递给子进程
在Unix上使用 fork Start方法,子进程可以使用在父进程中使用全局资源创建的共享资源。但是,最好将对象作为参数传递给子进程的构造函数。
除了使代码(可能)与Windows和其他Start方法兼容之外,这还确保只要子进程仍然活动,就不会在父进程中对对象进行垃圾收集。如果在父进程中垃圾收集对象时释放某些资源,这可能很重要。
例如:
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()应重写为:
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
小心更换 sys.stdin 带有“类文件对象”
multiprocessing最初无条件调用:os.close(sys.stdin.fileno())在
multiprocessing.Process._bootstrap()方法---导致过程中的过程出现问题。已将此更改为:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)它解决了进程相互碰撞导致文件描述符错误的基本问题,但对替换
sys.stdin()带有“类似文件的对象”和输出缓冲。如果多个进程调用close()在这个类似文件的对象上,它可能导致同一数据多次刷新到该对象,从而导致损坏。如果编写一个类似对象的文件并实现自己的缓存,则可以通过在附加到缓存时存储pid和在pid更改时丢弃缓存来确保它的安全性。例如::
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cache
这个 产卵 和 福克斯服务器 启动方法¶
有一些额外的限制不适用于 fork 启动方法。
更挑剔
确保所有参数
Process.__init__()是可以腌制的。此外,如果您将Process然后确保在Process.start方法被调用。
全局变量
记住,如果在子进程中运行的代码试图访问全局变量,那么它看到的值(如果有)可能与父进程中在
Process.start被叫来。但是,仅作为模块级常量的全局变量不会导致任何问题。
主模块安全导入
确保新的python解释器可以安全地导入主模块,而不会造成意外的副作用(例如启动新的进程)。
例如,使用 产卵 或 福克斯服务器 运行以下模块的Start方法将失败
RuntimeError::from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()相反,应该使用
if __name__ == '__main__':如下:from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(The
freeze_support()如果程序将正常运行而不是冻结,则可以省略行。)这允许新生成的python解释器安全地导入模块,然后运行模块的
foo()功能。如果在主模块中创建了池或管理器,则应用类似的限制。
实例¶
演示如何创建和使用定制的管理器和代理:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
使用 Pool :
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
显示如何使用队列将任务馈送到工作进程集合并收集结果的示例:
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()