Es scheint, dass wenn eine Ausnahme von einem Multiprocessing.Pool-Prozess ausgelöst wird, keine Stack-Ablaufverfolgung oder andere Anzeichen für einen Fehler vorliegen. Beispiel:
from multiprocessing import Pool
def go():
print(1)
raise Exception()
print(2)
p = Pool()
p.apply_async(go)
p.close()
p.join()
druckt 1 und stoppt lautlos. Interessanterweise funktioniert das Anheben einer BaseException stattdessen. Gibt es eine Möglichkeit, das Verhalten für alle Ausnahmen mit BaseException gleichzusetzen?
Ich habe eine vernünftige Lösung für das Problem, zumindest für Debugging-Zwecke. Ich habe derzeit keine Lösung, die die Ausnahme in den Hauptprozessen zurückwirft. Mein erster Gedanke war, einen Dekorateur zu verwenden, aber Sie können nur Pickle Funktionen definieren, die auf der obersten Ebene eines Moduls definiert sind .
Stattdessen eine einfache Wrapping-Klasse und eine Pool-Unterklasse, die diese für apply_async
(und damit apply
) verwendet. Ich lasse map_async
als Übung für den Leser.
import traceback
from multiprocessing.pool import Pool
import multiprocessing
# Shortcut to multiprocessing's logger
def error(msg, *args):
return multiprocessing.get_logger().error(msg, *args)
class LogExceptions(object):
def __init__(self, callable):
self.__callable = callable
def __call__(self, *args, **kwargs):
try:
result = self.__callable(*args, **kwargs)
except Exception as e:
# Here we add some debugging help. If multiprocessing's
# debugging is on, it will arrange to log the traceback
error(traceback.format_exc())
# Re-raise the original exception so the Pool worker can
# clean up
raise
# It was fine, give a normal answer
return result
class LoggingPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
return Pool.apply_async(self, LogExceptions(func), args, kwds, callback)
def go():
print(1)
raise Exception()
print(2)
multiprocessing.log_to_stderr()
p = LoggingPool(processes=1)
p.apply_async(go)
p.close()
p.join()
Das gibt mir:
1
[ERROR/PoolWorker-1] Traceback (most recent call last):
File "mpdebug.py", line 24, in __call__
result = self.__callable(*args, **kwargs)
File "mpdebug.py", line 44, in go
raise Exception()
Exception
Vielleicht fehlt mir etwas, aber ist es nicht das, was die get
-Methode des Result-Objekts zurückgibt? Siehe Prozesspools .
klasse Multiprocessing.pool.AsyncResult
Die von Pool.apply_async () und Pool.map_async () zurückgegebene Ergebnisklasse. Get ([timeout])
Gib das Ergebnis zurück, wenn es ankommt. Wenn Timeout nicht None ist und das Ergebnis nicht innerhalb von .__ ankommt. Timeout Sekunden, dann Multiprocessing.TimeoutError wird ausgelöst. Wenn die Fernbedienung call hat eine Ausnahme ausgelöst, dann wird diese Ausnahme von get () erneut ausgelöst.
Wenn Sie Ihr Beispiel leicht modifizieren, können Sie dies tun
from multiprocessing import Pool
def go():
print(1)
raise Exception("foobar")
print(2)
p = Pool()
x = p.apply_async(go)
x.get()
p.close()
p.join()
Was gibt als Ergebnis
1
Traceback (most recent call last):
File "rob.py", line 10, in <module>
x.get()
File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get
raise self._value
Exception: foobar
Dies ist nicht völlig zufriedenstellend, da der Traceback nicht gedruckt wird, aber besser als nichts ist.
UPDATE: Dieser Fehler wurde in Python 3.4 mit freundlicher Genehmigung von Richard Oudkerk behoben. Siehe die issue get-Methode von multiprocessing.pool.Async sollte den vollständigen Traceback zurückgeben.
Die Lösung mit den meisten Stimmen zum Zeitpunkt des Schreibens hat ein Problem:
from multiprocessing import Pool
def go():
print(1)
raise Exception("foobar")
print(2)
p = Pool()
x = p.apply_async(go)
x.get() ## waiting here for go() to complete...
p.close()
p.join()
Wie @dfrankow feststellte, wird auf x.get()
gewartet, was die asynchrone Ausführung einer Task ruiniert. Um die Effizienz zu verbessern (insbesondere, wenn Ihre Worker-Funktion go
sehr lange dauert), würde ich sie folgendermaßen ändern:
from multiprocessing import Pool
def go(x):
print(1)
# task_that_takes_a_long_time()
raise Exception("Can't go anywhere.")
print(2)
return x**2
p = Pool()
results = []
for x in range(1000):
results.append( p.apply_async(go, [x]) )
p.close()
for r in results:
r.get()
Vorteile : Die Worker-Funktion wird asynchron ausgeführt. Wenn Sie beispielsweise viele Aufgaben auf mehreren Kernen ausführen, ist die Arbeitsweise wesentlich effizienter als bei der ursprünglichen Lösung.
Nachteile : Wenn es in der Worker-Funktion eine Ausnahme gibt, wird diese nur angehoben nachdem der Pool alle Aufgaben ausgeführt hat. Dies kann das erwünschte Verhalten sein oder nicht. BEARBEITET gemäß dem Kommentar von @ colinfang, der dieses Problem behoben hat.
Ich habe mit diesem Dekorateur Ausnahmen beim Protokollieren gehabt:
import traceback, functools, multiprocessing
def trace_unhandled_exceptions(func):
@functools.wraps(func)
def wrapped_func(*args, **kwargs):
try:
func(*args, **kwargs)
except:
print 'Exception in '+func.__name__
traceback.print_exc()
return wrapped_func
mit dem Code in der Frage ist es
@trace_unhandled_exceptions
def go():
print(1)
raise Exception()
print(2)
p = multiprocessing.Pool(1)
p.apply_async(go)
p.close()
p.join()
Dekorieren Sie einfach die Funktion, die Sie an Ihren Prozesspool übergeben. Der Schlüssel für dieses Arbeiten ist @functools.wraps(func)
, ansonsten löst Multiprocessing eine PicklingError
aus.
code oben gibt
1
Exception in go
Traceback (most recent call last):
File "<stdin>", line 5, in wrapped_func
File "<stdin>", line 4, in go
Exception
import logging
from multiprocessing import Pool
def proc_wrapper(func, *args, **kwargs):
"""Print exception because multiprocessing lib doesn't return them right."""
try:
return func(*args, **kwargs)
except Exception as e:
logging.exception(e)
raise
def go(x):
print x
raise Exception("foobar")
p = Pool()
p.apply_async(proc_wrapper, (go, 5))
p.join()
p.close()
Ich habe ein Modul RemoteException.py erstellt, das den vollständigen Traceback einer Ausnahme in einem Prozess zeigt. Python2. Lade es herunter und füge dies deinem Code hinzu:
import RemoteException
@RemoteException.showError
def go():
raise Exception('Error!')
if __== '__main__':
import multiprocessing
p = multiprocessing.Pool(processes = 1)
r = p.apply(go) # full traceback is shown here
Da Sie apply_sync
verwendet haben, ist es wahrscheinlich der Zweck, einige Synchronisierungsaufgaben auszuführen. Callback zur Behandlung verwenden ist eine weitere Option. Bitte beachten Sie, dass diese Option nur für Python3.2 und höher und nicht für Python2.7 verfügbar ist.
from multiprocessing import Pool
def callback(result):
print('success', result)
def callback_error(result):
print('error', result)
def go():
print(1)
raise Exception()
print(2)
p = Pool()
p.apply_async(go, callback=callback, error_callback=callback_error)
# You can do another things
p.close()
p.join()
Ich würde versuchen, pdb zu verwenden:
import pdb
import sys
def handler(type, value, tb):
pdb.pm()
sys.excepthook = handler
Da es bereits gute Antworten für multiprocessing.Pool
gibt, werde ich eine Lösung mit einem anderen Ansatz zur Vollständigkeit anbieten.
Für python >= 3.2
scheint die folgende Lösung die einfachste zu sein:
from concurrent.futures import ProcessPoolExecutor, wait
def go():
print(1)
raise Exception()
print(2)
futures = []
with ProcessPoolExecutor() as p:
for i in range(10):
futures.append(p.submit(go))
results = [f.result() for f in futures]
Vorteile:
Weitere Informationen zur API finden Sie unter: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor
Wenn Sie eine große Anzahl von Aufgaben übermitteln und möchten, dass Ihr Hauptprozess fehlschlägt, sobald eine Ihrer Aufgaben fehlschlägt, können Sie das folgende Snippet verwenden:
from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed
import time
def go():
print(1)
time.sleep(0.3)
raise Exception()
print(2)
futures = []
with ProcessPoolExecutor(1) as p:
for i in range(10):
futures.append(p.submit(go))
for f in as_completed(futures):
if f.exception() is not None:
for f in futures:
f.cancel()
break
[f.result() for f in futures]
Alle anderen Antworten schlagen nur dann fehl, wenn alle Aufgaben ausgeführt wurden.