wake-up-neo.com

Ausnahme im Multiprocessing-Pool wurde nicht erkannt

Es scheint, dass wenn eine Ausnahme von einem Multiprocessing.Pool-Prozess ausgelöst wird, keine Stack-Ablaufverfolgung oder andere Anzeichen für einen Fehler vorliegen. Beispiel: 

from multiprocessing import Pool 

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go)
p.close()
p.join()

druckt 1 und stoppt lautlos. Interessanterweise funktioniert das Anheben einer BaseException stattdessen. Gibt es eine Möglichkeit, das Verhalten für alle Ausnahmen mit BaseException gleichzusetzen?

58
Rob Lourens

Ich habe eine vernünftige Lösung für das Problem, zumindest für Debugging-Zwecke. Ich habe derzeit keine Lösung, die die Ausnahme in den Hauptprozessen zurückwirft. Mein erster Gedanke war, einen Dekorateur zu verwenden, aber Sie können nur Pickle Funktionen definieren, die auf der obersten Ebene eines Moduls definiert sind .

Stattdessen eine einfache Wrapping-Klasse und eine Pool-Unterklasse, die diese für apply_async (und damit apply) verwendet. Ich lasse map_async als Übung für den Leser.

import traceback
from multiprocessing.pool import Pool
import multiprocessing

# Shortcut to multiprocessing's logger
def error(msg, *args):
    return multiprocessing.get_logger().error(msg, *args)

class LogExceptions(object):
    def __init__(self, callable):
        self.__callable = callable

    def __call__(self, *args, **kwargs):
        try:
            result = self.__callable(*args, **kwargs)

        except Exception as e:
            # Here we add some debugging help. If multiprocessing's
            # debugging is on, it will arrange to log the traceback
            error(traceback.format_exc())
            # Re-raise the original exception so the Pool worker can
            # clean up
            raise

        # It was fine, give a normal answer
        return result

class LoggingPool(Pool):
    def apply_async(self, func, args=(), kwds={}, callback=None):
        return Pool.apply_async(self, LogExceptions(func), args, kwds, callback)

def go():
    print(1)
    raise Exception()
    print(2)

multiprocessing.log_to_stderr()
p = LoggingPool(processes=1)

p.apply_async(go)
p.close()
p.join()

Das gibt mir:

1
[ERROR/PoolWorker-1] Traceback (most recent call last):
  File "mpdebug.py", line 24, in __call__
    result = self.__callable(*args, **kwargs)
  File "mpdebug.py", line 44, in go
    raise Exception()
Exception
25
Rupert Nash

Vielleicht fehlt mir etwas, aber ist es nicht das, was die get-Methode des Result-Objekts zurückgibt? Siehe Prozesspools .

klasse Multiprocessing.pool.AsyncResult

Die von Pool.apply_async () und Pool.map_async () zurückgegebene Ergebnisklasse. Get ([timeout])
Gib das Ergebnis zurück, wenn es ankommt. Wenn Timeout nicht None ist und das Ergebnis nicht innerhalb von .__ ankommt. Timeout Sekunden, dann Multiprocessing.TimeoutError wird ausgelöst. Wenn die Fernbedienung call hat eine Ausnahme ausgelöst, dann wird diese Ausnahme von get () erneut ausgelöst.

Wenn Sie Ihr Beispiel leicht modifizieren, können Sie dies tun

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()
p.close()
p.join()

Was gibt als Ergebnis

1
Traceback (most recent call last):
  File "rob.py", line 10, in <module>
    x.get()
  File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get
    raise self._value
Exception: foobar

Dies ist nicht völlig zufriedenstellend, da der Traceback nicht gedruckt wird, aber besser als nichts ist.

UPDATE: Dieser Fehler wurde in Python 3.4 mit freundlicher Genehmigung von Richard Oudkerk behoben. Siehe die issue get-Methode von multiprocessing.pool.Async sollte den vollständigen Traceback zurückgeben.

47
Faheem Mitha

Die Lösung mit den meisten Stimmen zum Zeitpunkt des Schreibens hat ein Problem:

from multiprocessing import Pool

def go():
    print(1)
    raise Exception("foobar")
    print(2)

p = Pool()
x = p.apply_async(go)
x.get()  ## waiting here for go() to complete...
p.close()
p.join()

Wie @dfrankow feststellte, wird auf x.get() gewartet, was die asynchrone Ausführung einer Task ruiniert. Um die Effizienz zu verbessern (insbesondere, wenn Ihre Worker-Funktion go sehr lange dauert), würde ich sie folgendermaßen ändern:

from multiprocessing import Pool

def go(x):
    print(1)
    # task_that_takes_a_long_time()
    raise Exception("Can't go anywhere.")
    print(2)
    return x**2

p = Pool()
results = []
for x in range(1000):
    results.append( p.apply_async(go, [x]) )

p.close()

for r in results:
     r.get()

Vorteile : Die Worker-Funktion wird asynchron ausgeführt. Wenn Sie beispielsweise viele Aufgaben auf mehreren Kernen ausführen, ist die Arbeitsweise wesentlich effizienter als bei der ursprünglichen Lösung.

Nachteile : Wenn es in der Worker-Funktion eine Ausnahme gibt, wird diese nur angehoben nachdem der Pool alle Aufgaben ausgeführt hat. Dies kann das erwünschte Verhalten sein oder nicht. BEARBEITET gemäß dem Kommentar von @ colinfang, der dieses Problem behoben hat. 

18
gozzilli

Ich habe mit diesem Dekorateur Ausnahmen beim Protokollieren gehabt:

import traceback, functools, multiprocessing

def trace_unhandled_exceptions(func):
    @functools.wraps(func)
    def wrapped_func(*args, **kwargs):
        try:
            func(*args, **kwargs)
        except:
            print 'Exception in '+func.__name__
            traceback.print_exc()
    return wrapped_func

mit dem Code in der Frage ist es

@trace_unhandled_exceptions
def go():
    print(1)
    raise Exception()
    print(2)

p = multiprocessing.Pool(1)

p.apply_async(go)
p.close()
p.join()

Dekorieren Sie einfach die Funktion, die Sie an Ihren Prozesspool übergeben. Der Schlüssel für dieses Arbeiten ist @functools.wraps(func), ansonsten löst Multiprocessing eine PicklingError aus.

code oben gibt

1
Exception in go
Traceback (most recent call last):
  File "<stdin>", line 5, in wrapped_func
  File "<stdin>", line 4, in go
Exception
9
Mark Foreman
import logging
from multiprocessing import Pool

def proc_wrapper(func, *args, **kwargs):
    """Print exception because multiprocessing lib doesn't return them right."""
    try:
        return func(*args, **kwargs)
    except Exception as e:
        logging.exception(e)
        raise

def go(x):
    print x
    raise Exception("foobar")

p = Pool()
p.apply_async(proc_wrapper, (go, 5))
p.join()
p.close()
2
erezarnon

Ich habe ein Modul RemoteException.py erstellt, das den vollständigen Traceback einer Ausnahme in einem Prozess zeigt. Python2. Lade es herunter und füge dies deinem Code hinzu:

import RemoteException

@RemoteException.showError
def go():
    raise Exception('Error!')

if __== '__main__':
    import multiprocessing
    p = multiprocessing.Pool(processes = 1)
    r = p.apply(go) # full traceback is shown here
1
User

Da Sie apply_sync verwendet haben, ist es wahrscheinlich der Zweck, einige Synchronisierungsaufgaben auszuführen. Callback zur Behandlung verwenden ist eine weitere Option. Bitte beachten Sie, dass diese Option nur für Python3.2 und höher und nicht für Python2.7 verfügbar ist.

from multiprocessing import Pool

def callback(result):
    print('success', result)

def callback_error(result):
    print('error', result)

def go():
    print(1)
    raise Exception()
    print(2)

p = Pool()
p.apply_async(go, callback=callback, error_callback=callback_error)

# You can do another things

p.close()
p.join()
0
Asoul

Ich würde versuchen, pdb zu verwenden:

import pdb
import sys
def handler(type, value, tb):
  pdb.pm()
sys.excepthook = handler
0
Claris

Da es bereits gute Antworten für multiprocessing.Pool gibt, werde ich eine Lösung mit einem anderen Ansatz zur Vollständigkeit anbieten.

Für python >= 3.2 scheint die folgende Lösung die einfachste zu sein:

from concurrent.futures import ProcessPoolExecutor, wait

def go():
    print(1)
    raise Exception()
    print(2)


futures = []
with ProcessPoolExecutor() as p:
    for i in range(10):
        futures.append(p.submit(go))

results = [f.result() for f in futures]

Vorteile:

  • sehr wenig Code
  • löst eine Ausnahme im Hauptprozess aus
  • stellt eine Stapelablaufverfolgung bereit
  • keine externen Abhängigkeiten

Weitere Informationen zur API finden Sie unter: https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor

Wenn Sie eine große Anzahl von Aufgaben übermitteln und möchten, dass Ihr Hauptprozess fehlschlägt, sobald eine Ihrer Aufgaben fehlschlägt, können Sie das folgende Snippet verwenden:

from concurrent.futures import ProcessPoolExecutor, wait, FIRST_EXCEPTION, as_completed
import time


def go():
    print(1)
    time.sleep(0.3)
    raise Exception()
    print(2)


futures = []
with ProcessPoolExecutor(1) as p:
    for i in range(10):
        futures.append(p.submit(go))

    for f in as_completed(futures):
        if f.exception() is not None:
            for f in futures:
                f.cancel()
            break

[f.result() for f in futures]

Alle anderen Antworten schlagen nur dann fehl, wenn alle Aufgaben ausgeführt wurden.

0
Vlad