wake-up-neo.com

Fangen Sie die Ausnahme eines Threads im Aufrufer-Thread in Python ab

Python und Multithread-Programmierung sind im Allgemeinen sehr neu. Im Grunde habe ich ein Skript, das Dateien an einen anderen Ort kopiert. Ich möchte, dass dies in einem anderen Thread platziert wird, damit ich .... ausgeben kann, um anzuzeigen, dass das Skript noch läuft.

Das Problem, das ich habe, ist, dass, wenn die Dateien nicht kopiert werden können, eine Ausnahme ausgelöst wird. Dies ist in Ordnung, wenn Sie im Haupt-Thread laufen. Der folgende Code funktioniert jedoch nicht:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"

In der Thread-Klasse selbst habe ich versucht, die Ausnahme erneut zu werfen, aber sie funktioniert nicht. Ich habe gesehen, dass die Leute hier ähnliche Fragen stellen, aber alle scheinen etwas spezifischeres zu tun als das, was ich versuche (und ich verstehe die angebotenen Lösungen nicht ganz). Ich habe gesehen, dass Leute die Verwendung von sys.exc_info() erwähnen, jedoch weiß ich nicht, wo oder wie ich es verwenden soll.

Alle Hilfe wird sehr geschätzt!

EDIT: Der Code für die Thread-Klasse ist unten:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder

    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise
154
Phanto

Das Problem ist, dass thread_obj.start() sofort zurückkehrt. Der von Ihnen erstellte untergeordnete Thread wird in seinem eigenen Kontext mit seinem eigenen Stapel ausgeführt. Jede Ausnahme, die dort auftritt, befindet sich im Kontext des untergeordneten Threads und befindet sich in einem eigenen Stapel. Ich kann mir jetzt vorstellen, diese Informationen an den übergeordneten Thread zu übermitteln, indem Sie eine Art Nachrichtenübergabe verwenden.

Versuchen Sie dies für Größe:

import sys
import threading
import Queue


class ExcThread(threading.Thread):

    def __init__(self, bucket):
        threading.Thread.__init__(self)
        self.bucket = bucket

    def run(self):
        try:
            raise Exception('An error occured here.')
        except Exception:
            self.bucket.put(sys.exc_info())


def main():
    bucket = Queue.Queue()
    thread_obj = ExcThread(bucket)
    thread_obj.start()

    while True:
        try:
            exc = bucket.get(block=False)
        except Queue.Empty:
            pass
        else:
            exc_type, exc_obj, exc_trace = exc
            # deal with the exception
            print exc_type, exc_obj
            print exc_trace

        thread_obj.join(0.1)
        if thread_obj.isAlive():
            continue
        else:
            break


if __== '__main__':
    main()
95
Santa

Das Modul concurrent.futures macht es einfach, in separaten Threads (oder Prozessen) zu arbeiten und die daraus resultierenden Ausnahmen zu behandeln:

import concurrent.futures
import shutil

def copytree_with_dots(src_path, dst_path):
    with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
        # Execute the copy on a separate thread,
        # creating a future object to track progress.
        future = executor.submit(shutil.copytree, src_path, dst_path)

        while future.running():
            # Print pretty dots here.
            pass

        # Return the value returned by shutil.copytree(), None.
        # Raise any exceptions raised during the copy process.
        return future.result()

concurrent.futures ist in Python 3.2 enthalten und für frühere Versionen als das zurückportierte futures-Modul verfügbar.

28
Jon-Eric

Obwohl es nicht möglich ist, eine in einem anderen Thread geworfene Ausnahme direkt abzufangen, ist hier ein Code, um ganz transparent etwas zu erhalten, das dieser Funktionalität sehr nahe kommt. Ihr untergeordneter Thread muss die Klasse ExThread anstelle von threading.Thread unterteilen, und der übergeordnete Thread muss die Methode child_thread.join_with_exception() anstelle von child_thread.join() aufrufen, wenn er darauf wartet, dass der Thread seinen Job beendet.

Technische Details zu dieser Implementierung: Wenn der untergeordnete Thread eine Ausnahme auslöst, wird er über eine Queue an das übergeordnete Thread übergeben und erneut im übergeordneten Thread geworfen. Beachten Sie, dass bei diesem Ansatz nicht viel zu warten ist.

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except BaseException:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    def run_with_exception(self):
        thread_name = threading.current_thread().name
        raise MyException("An error in thread '{}'.".format(thread_name))

def main():
    t = MyThread()
    t.start()
    try:
        t.join_with_exception()
    except MyException as ex:
        thread_name = threading.current_thread().name
        print "Caught a MyException in thread '{}': {}".format(thread_name, ex)

if __== '__main__':
    main()
27
Mateusz Kobos

Wenn eine Ausnahme in einem Thread auftritt, ist es am besten, sie im Aufrufer-Thread während join erneut anzuheben. Informationen zu der gerade behandelten Ausnahme erhalten Sie mit der Funktion sys.exc_info(). Diese Informationen können einfach als Eigenschaft des Thread-Objekts gespeichert werden, bis join aufgerufen wird. Anschließend können sie erneut aufgerufen werden.

Beachten Sie, dass ein Queue.Queue (wie in anderen Antworten vorgeschlagen) in diesem einfachen Fall nicht erforderlich ist, wenn der Thread höchstens eine Ausnahme auslöst und direkt nach dem Auslösen einer Ausnahme ausläuft. Wir vermeiden Rennenbedingungen, indem wir einfach auf den Abschluss des Threads warten.

Erweitern Sie beispielsweise ExcThread (unten) und überschreiben Sie excRun (anstelle von run).

Python 2.x:

import threading

class ExcThread(threading.Thread):
  def excRun(self):
    pass

  def run(self):
    self.exc = None
    try:
      # Possibly throws an exception
      self.excRun()
    except:
      import sys
      self.exc = sys.exc_info()
      # Save details of the exception thrown but don't rethrow,
      # just complete the function

  def join(self):
    threading.Thread.join(self)
    if self.exc:
      msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
      new_exc = Exception(msg)
      raise new_exc.__class__, new_exc, self.exc[2]

Python 3.x:

Die 3-Argument-Form für raise ist in Python 3 nicht mehr vorhanden. Ändern Sie daher die letzte Zeile in:

raise new_exc.with_traceback(self.exc[2])
15
Rok Strniša

Auf diese Frage gibt es viele wirklich komische Antworten. Bin ich zu sehr vereinfacht, weil mir dies für die meisten Dinge ausreichend erscheint.

from threading import Thread

class PropagatingThread(Thread):
    def run(self):
        self.exc = None
        try:
            if hasattr(self, '_Thread__target'):
                # Thread uses name mangling prior to Python 3.
                self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            else:
                self.ret = self._target(*self._args, **self._kwargs)
        except BaseException as e:
            self.exc = e

    def join(self):
        super(PropagatingThread, self).join()
        if self.exc:
            raise self.exc
        return self.ret

Wenn Sie sicher sind, dass Sie immer nur mit der einen oder der anderen Version von Python arbeiten, können Sie die run()-Methode auf die entstellte Version reduzieren (wenn Sie nur mit Python-Versionen 3 arbeiten). oder nur die saubere Version (wenn Sie nur mit Python-Versionen ab 3 laufen).

Verwendungsbeispiel:

def f(*args, **kwargs)
    print(args)
    print(kwargs)
    raise Exception('I suck')

t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()

Und Sie werden sehen, dass die Ausnahme beim Beitritt im anderen Thread ausgelöst wird.

11
ArtOfWarfare

Dies war ein unangenehmes kleines Problem, und ich würde gerne meine Lösung einwerfen. Einige andere Lösungen, die ich gefunden habe (async.io zum Beispiel), sahen vielversprechend aus, präsentierten jedoch auch eine Art Blackbox. Die Warteschlangen-/Ereignisschleifenmethode verbindet Sie mit einer bestimmten Implementierung. Der Concurrent-Futures-Quellcode besteht jedoch aus nur 1000 Zeilen und ist leicht zu verstehen . So konnte ich mein Problem leicht lösen: Ad-hoc-Worker-Threads ohne viel Setup erstellen und Ausnahmen im Haupt-Thread abfangen können.

Meine Lösung verwendet die API für gleichzeitige Futures und die Threading-API. Damit können Sie einen Worker erstellen, der Ihnen sowohl den Thread als auch die Zukunft gibt. Auf diese Weise können Sie dem Thread beitreten, um auf das Ergebnis zu warten:

worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())

... oder Sie können den Arbeiter einfach einen Rückruf senden lassen, wenn Sie fertig sind:

worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))

... oder Sie können eine Schleife machen, bis das Ereignis abgeschlossen ist:

worker = Worker(test)
thread = worker.start()

while True:
    print("waiting")
    if worker.future.done():
        exc = worker.future.exception()
        print('exception?', exc)
        result = worker.future.result()
        print('result', result)           
        break
    time.sleep(0.25)

Hier ist der Code:

from concurrent.futures import Future
import threading
import time

class Worker(object):
    def __init__(self, fn, args=()):
        self.future = Future()
        self._fn = fn
        self._args = args

    def start(self, cb=None):
        self._cb = cb
        self.future.set_running_or_notify_cancel()
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
        thread.start()
        return thread

    def run(self):
        try:
            self.future.set_result(self._fn(*self._args))
        except BaseException as e:
            self.future.set_exception(e)

        if(self._cb):
            self._cb(self.future.result())

... und die Testfunktion:

def test(*args):
    print('args are', args)
    time.sleep(2)
    raise Exception('foo')
4
Calvin Froedge

concurrent.futures.as_completed

https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed

Die folgende Lösung:

  • kehrt sofort zum Hauptthread zurück, wenn eine Ausnahme aufgerufen wird
  • erfordert keine zusätzlichen benutzerdefinierten Klassen, da Folgendes nicht benötigt wird:
    • eine explizite Queue
    • um eine Ausnahme zu Ihrem Arbeitsthread hinzuzufügen 

Quelle:

#!/usr/bin/env python3

import concurrent.futures
import time

def func_that_raises(do_raise):
    for i in range(3):
        print(i)
        time.sleep(0.1)
    if do_raise:
        raise Exception()
    for i in range(3):
        print(i)
        time.sleep(0.1)

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    futures = []
    futures.append(executor.submit(func_that_raises, False))
    futures.append(executor.submit(func_that_raises, True))
    for future in concurrent.futures.as_completed(futures):
        print(repr(future.exception()))

Mögliche Ausgabe:

0
0
1
1
2
2
0
Exception()
1
2
None

Es ist leider nicht möglich, Futures zu töten, um die anderen zu kündigen, da einer ausfällt:

Wenn du so etwas machst:

for future in concurrent.futures.as_completed(futures):
    if future.exception() is not None:
        raise future.exception()

dann fängt die Variable with und wartet, bis der zweite Thread beendet ist, bevor Sie fortfahren. Folgendes verhält sich ähnlich:

for future in concurrent.futures.as_completed(futures):
    future.result()

da future.result() die Ausnahme erneut auslöst, falls eine aufgetreten ist.

Wenn Sie den gesamten Python-Prozess beenden möchten, kommen Sie möglicherweise mit os._exit(0) aus. Dies bedeutet jedoch wahrscheinlich, dass Sie einen Refactor benötigen.

Getestet auf Python 3.6.7, Ubuntu 18.04.

Als Noobie für Threading habe ich lange gebraucht, um zu verstehen, wie der Code von Mateusz Kobos (oben) implementiert wird. Hier ist eine klarere Version, um zu verstehen, wie man sie benutzt. 

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except Exception:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    # This overrides the "run_with_exception" from class "ExThread"
    # Note, this is where the actual thread to be run lives. The thread
    # to be run could also call a method or be passed in as an object
    def run_with_exception(self):
        # Code will function until the int
        print "sleeping 5 seconds"
        import time
        for i in 1, 2, 3, 4, 5:
            print i
            time.sleep(1) 
        # Thread should break here
        int("str")
# I'm honestly not sure why these appear here? So, I removed them. 
# Perhaps Mateusz can clarify?        
#         thread_name = threading.current_thread().name
#         raise MyException("An error in thread '{}'.".format(thread_name))

if __== '__main__':
    # The code lives in MyThread in this example. So creating the MyThread 
    # object set the code to be run (but does not start it yet)
    t = MyThread()
    # This actually starts the thread
    t.start()
    print
    print ("Notice 't.start()' is considered to have completed, although" 
           " the countdown continues in its new thread. So you code "
           "can tinue into new processing.")
    # Now that the thread is running, the join allows for monitoring of it
    try:
        t.join_with_exception()
    # should be able to be replace "Exception" with specific error (untested)
    except Exception, e: 
        print
        print "Exceptioon was caught and control passed back to the main thread"
        print "Do some handling here...or raise a custom exception "
        thread_name = threading.current_thread().name
        e = ("Caught a MyException in thread: '" + 
             str(thread_name) + 
             "' [" + str(e) + "]")
        raise Exception(e) # Or custom class of exception, such as MyException
2
RightmireM

Ähnlich wie bei RickardSjogrens ohne Queue, sys etc., aber auch ohne einige Zuhörer, die Signale signalisieren: Führen Sie direkt einen Exception-Handler aus, der einem Except-Block entspricht.

#!/usr/bin/env python3

import threading

class ExceptionThread(threading.Thread):

    def __init__(self, callback=None, *args, **kwargs):
        """
        Redirect exceptions of thread to an exception handler.

        :param callback: function to handle occured exception
        :type callback: function(thread, exception)
        :param args: arguments for threading.Thread()
        :type args: Tuple
        :param kwargs: keyword arguments for threading.Thread()
        :type kwargs: dict
        """
        self._callback = callback
        super().__init__(*args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except BaseException as e:
            if self._callback is None:
                raise e
            else:
                self._callback(self, e)
        finally:
            # Avoid a refcycle if the thread is running a function with
            # an argument that has a member that points to the thread.
            del self._target, self._args, self._kwargs, self._callback

Nur self._callback und der Exceptionsblock in run () ergänzen das normale threading.Thread.

1
Chickenmarkus

Die Verwendung nackter Ausnahmen ist keine gute Praxis, da Sie normalerweise mehr fangen, als Sie erwarten.

Ich würde vorschlagen, die except so zu ändern, dass NUR die Ausnahme erfasst wird, die Sie behandeln möchten. Ich denke nicht, dass das Erhöhen die gewünschte Wirkung hat, denn wenn Sie TheThread in der äußeren try instanziieren, wird eine Zuweisung nie ausgelöst, wenn eine Ausnahme ausgelöst wird.

Stattdessen möchten Sie vielleicht einfach darauf aufmerksam machen und weitermachen, z.

def run(self):
    try:
       shul.copytree(self.sourceFolder, self.destFolder)
    except OSError, err:
       print err

Wenn diese Ausnahme dann abgefangen wird, können Sie sie dort behandeln. Wenn die äußere Variable try eine Ausnahme von TheThread feststellt, wissen Sie, dass es sich nicht um die bereits behandelte handelt, und hilft Ihnen, Ihren Prozessablauf zu isolieren.

0
jathanism

Eine Methode, die ich sehr mag, basiert auf dem Beobachtermuster . Ich definiere eine Signalklasse, die mein Thread verwendet, um Ausnahmen für Zuhörer auszugeben. Es kann auch verwendet werden, um Werte aus Threads zurückzugeben. Beispiel:

import threading

class Signal:
    def __init__(self):
        self._subscribers = list()

    def emit(self, *args, **kwargs):
        for func in self._subscribers:
            func(*args, **kwargs)

    def connect(self, func):
        self._subscribers.append(func)

    def disconnect(self, func):
        try:
            self._subscribers.remove(func)
        except ValueError:
            raise ValueError('Function {0} not removed from {1}'.format(func, self))


class WorkerThread(threading.Thread):

    def __init__(self, *args, **kwargs):
        super(WorkerThread, self).__init__(*args, **kwargs)
        self.Exception = Signal()
        self.Result = Signal()

    def run(self):
        if self._Thread__target is not None:
            try:
                self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            except Exception as e:
                self.Exception.emit(e)
            else:
                self.Result.emit(self._return_value)

if __== '__main__':
    import time

    def handle_exception(exc):
        print exc.message

    def handle_result(res):
        print res

    def a():
        time.sleep(1)
        raise IOError('a failed')

    def b():
        time.sleep(2)
        return 'b returns'

    t = WorkerThread(target=a)
    t2 = WorkerThread(target=b)
    t.Exception.connect(handle_exception)
    t2.Result.connect(handle_result)
    t.start()
    t2.start()

    print 'Threads started'

    t.join()
    t2.join()
    print 'Done'

Ich habe nicht genug Erfahrung in der Arbeit mit Threads, um zu behaupten, dass dies eine absolut sichere Methode ist. Aber es hat für mich funktioniert und ich mag die Flexibilität.

0
RickardSjogren

Ich weiß, dass ich zu der Party etwas spät komme, aber ich hatte ein sehr ähnliches Problem, aber es beinhaltete die Verwendung von tkinter als GUI, und der Mainloop machte es unmöglich, eine der .join () - Lösungen zu verwenden. Deshalb habe ich die in der EDIT angegebene Lösung der ursprünglichen Frage angepasst, aber es allgemeiner gemacht, um es für andere verständlicher zu machen.

Hier ist die neue Thread-Klasse in Aktion:

import threading
import traceback
import logging


class ExceptionThread(threading.Thread):
    def __init__(self, *args, **kwargs):
        threading.Thread.__init__(self, *args, **kwargs)

    def run(self):
        try:
            if self._target:
                self._target(*self._args, **self._kwargs)
        except Exception:
            logging.error(traceback.format_exc())


def test_function_1(input):
    raise IndexError(input)


if __== "__main__":
    input = 'useful'

    t1 = ExceptionThread(target=test_function_1, args=[input])
    t1.start()

Natürlich können Sie die Ausnahme auch auf andere Weise von der Protokollierung abwickeln lassen, z. B. Ausdrucken oder Ausgabe auf der Konsole.

Auf diese Weise können Sie die ExceptionThread-Klasse genau wie die Thread-Klasse verwenden, ohne dass spezielle Änderungen erforderlich sind. 

0
Firo

Wrap-Thread mit Ausnahmespeicher.

import threading
import sys
class ExcThread(threading.Thread):

    def __init__(self, target, args = None):
        self.args = args if args else []
        self.target = target
        self.exc = None
        threading.Thread.__init__(self)

    def run(self):
        try:
            self.target(*self.args)
            raise Exception('An error occured here.')
        except Exception:
            self.exc=sys.exc_info()

def main():
    def hello(name):
        print(!"Hello, {name}!")
    thread_obj = ExcThread(target=hello, args=("Jack"))
    thread_obj.start()

    thread_obj.join()
    exc = thread_obj.exc
    if exc:
        exc_type, exc_obj, exc_trace = exc
        print(exc_type, ':',exc_obj, ":", exc_trace)

main()
0
ahuigo

Eine einfache Möglichkeit, die Ausnahme von Thread zu erkennen und zur aufrufenden Methode zurückzukehren, besteht darin, das Wörterbuch oder eine Liste an die worker-Methode zu übergeben.

Beispiel (Wörterbuch an die Workermethode übergeben): 

import threading

def my_method(throw_me):
    raise Exception(throw_me)

def worker(shared_obj, *args, **kwargs):
    try:
        shared_obj['target'](*args, **kwargs)
    except Exception as err:
        shared_obj['err'] = err

shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"

th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()

if shared_obj['err']:
    print(">>%s" % shared_obj['err'])
0
rado stoyanov