Zuerst ein wenig context : Ich lerne gerade etwas über Threading in C++ 11 und zu diesem Zweck versuche ich, eine kleine actor
-Klasse zu erstellen, im Wesentlichen (ich habe die Ausnahmebehandlung und die Propagierung verlassen out) wie so:
class actor {
private: std::atomic<bool> stop;
private: std::condition_variable interrupt;
private: std::thread actor_thread;
private: message_queue incoming_msgs;
public: actor()
: stop(false),
actor_thread([&]{ run_actor(); })
{}
public: virtual ~actor() {
// if the actor is destroyed, we must ensure the thread dies too
stop = true;
// to this end, we have to interrupt the actor thread which is most probably
// waiting on the incoming_msgs queue:
interrupt.notify_all();
actor_thread.join();
}
private: virtual void run_actor() {
try {
while(!stop)
// wait for new message and process it
// but interrupt the waiting process if interrupt is signaled:
process(incoming_msgs.wait_and_pop(interrupt));
}
catch(interrupted_exception) {
// ...
}
};
private: virtual void process(const message&) = 0;
// ...
};
Jeder Akteur läuft in seinem eigenen actor_thread
, wartet auf eine neue eingehende Nachricht auf incoming_msgs
und verarbeitet sie, wenn eine Nachricht eintrifft.
Der actor_thread
wird zusammen mit der actor
erstellt und muss zusammen mit ihm sterben, weshalb ich einen Interrupt-Mechanismus in der message_queue::wait_and_pop(std::condition_variable interrupt)
brauche.
Im Wesentlichen fordere ich, dass wait_and_pop
blockiert, bis entweder A) eine neue message
ankommt oder B) bis die interrupt
ausgelöst wird, wobei - idealerweise - ein interrupted_exception
geworfen werden soll.
Das Eintreffen einer neuen Nachricht im message_queue
wird derzeit auch durch einen std::condition_variable new_msg_notification
modelliert:
// ...
// in class message_queue:
message wait_and_pop(std::condition_variable& interrupt) {
std::unique_lock<std::mutex> lock(mutex);
// How to interrupt the following, when interrupt fires??
new_msg_notification.wait(lock,[&]{
return !queue.empty();
});
auto msg(std::move(queue.front()));
queue.pop();
return msg;
}
Um die lange Geschichte kurz zu machen, die Frage ist dies: Wie unterbreche ich das Warten auf eine neue Nachricht in new_msg_notification.wait(...)
, wenn die interrupt
ausgelöst wird (ohne ein Timeout einzuführen)?
Alternativ kann die Frage gelesen werden als: Wie warte ich, bis einer von zwei std::condition_variable
s signalisiert wird?
Ein naiver Ansatz scheint es zu sein, std::condition_variable
für den Interrupt überhaupt nicht zu verwenden und stattdessen einfach ein atomares Flag std::atomic<bool> interrupted
zu verwenden und dann mit einem sehr kurzen Timeout auf new_msg_notification
zu warten, bis entweder eine neue Nachricht eingegangen ist oder bis true==interrupted
. Ich möchte jedoch sehr gerne das Warten aufgeben.
Aus den Kommentaren und der Antwort von Pilcrow sind grundsätzlich zwei Ansätze möglich.
Für diejenigen, die daran interessiert sind, geht meine Implementierung folgendermaßen vor. Die Bedingungsvariable in meinem Fall ist tatsächlich eine semaphore
(weil ich sie mehr mag und weil mir die Übung gefallen hat). Ich habe dieses Semaphor mit einer zugehörigen interrupt
ausgestattet, die über semaphore::get_interrupt()
aus dem Semaphor bezogen werden kann. Wenn nun ein Thread in semaphore::wait()
blockiert, hat ein anderer Thread die Möglichkeit, semaphore::interrupt::trigger()
für den Interrupt des Semaphors aufzurufen, wodurch der erste Thread einen interrupt_exception
aufhebt und propagiert.
struct
interrupt_exception {};
class
semaphore {
public: class interrupt;
private: mutable std::mutex mutex;
// must be declared after our mutex due to construction order!
private: interrupt* informed_by;
private: std::atomic<long> counter;
private: std::condition_variable cond;
public:
semaphore();
public:
~semaphore() throw();
public: void
wait();
public: interrupt&
get_interrupt() const { return *informed_by; }
public: void
post() {
std::lock_guard<std::mutex> lock(mutex);
counter++;
cond.notify_one(); // never throws
}
public: unsigned long
load () const {
return counter.load();
}
};
class
semaphore::interrupt {
private: semaphore *forward_posts_to;
private: std::atomic<bool> triggered;
public:
interrupt(semaphore *forward_posts_to) : triggered(false), forward_posts_to(forward_posts_to) {
assert(forward_posts_to);
std::lock_guard<std::mutex> lock(forward_posts_to->mutex);
forward_posts_to->informed_by = this;
}
public: void
trigger() {
assert(forward_posts_to);
std::lock_guard<std::mutex>(forward_posts_to->mutex);
triggered = true;
forward_posts_to->cond.notify_one(); // never throws
}
public: bool
is_triggered () const throw() {
return triggered.load();
}
public: void
reset () throw() {
return triggered.store(false);
}
};
semaphore::semaphore() : counter(0L), informed_by(new interrupt(this)) {}
// must be declared here because otherwise semaphore::interrupt is an incomplete type
semaphore::~semaphore() throw() {
delete informed_by;
}
void
semaphore::wait() {
std::unique_lock<std::mutex> lock(mutex);
if(0L==counter) {
cond.wait(lock,[&]{
if(informed_by->is_triggered())
throw interrupt_exception();
return counter>0;
});
}
counter--;
}
Mit dieser semaphore
sieht meine Implementierung der Nachrichtenwarteschlange jetzt so aus (mithilfe des Semaphors anstelle von std::condition_variable
könnte ich den std::mutex
entfernen:
class
message_queue {
private: std::queue<message> queue;
private: semaphore new_msg_notification;
public: void
Push(message&& msg) {
queue.Push(std::move(msg));
new_msg_notification.post();
}
public: const message
wait_and_pop() {
new_msg_notification.wait();
auto msg(std::move(queue.front()));
queue.pop();
return msg;
}
public: semaphore::interrupt&
get_interrupt() const { return new_msg_notification.get_interrupt(); }
};
Mein actor
kann jetzt seinen Thread mit sehr geringer Latenz in seinem Thread unterbrechen. Die Implementierung sieht derzeit so aus:
class
actor {
private: message_queue
incoming_msgs;
/// must be declared after incoming_msgs due to construction order!
private: semaphore::interrupt&
interrupt;
private: std::thread
my_thread;
private: std::exception_ptr
exception;
public:
actor()
: interrupt(incoming_msgs.get_interrupt()), my_thread(
[&]{
try {
run_actor();
}
catch(...) {
exception = std::current_exception();
}
})
{}
private: virtual void
run_actor() {
while(!interrupt.is_triggered())
process(incoming_msgs.wait_and_pop());
};
private: virtual void
process(const message&) = 0;
public: void
notify(message&& msg_in) {
incoming_msgs.Push(std::forward<message>(msg_in));
}
public: virtual
~actor() throw (interrupt_exception) {
interrupt.trigger();
my_thread.join();
if(exception)
std::rethrow_exception(exception);
}
};
Du fragst,
Was ist der beste Weg, um auf mehrere Zustandsvariablen in C++ 11 zu warten?
Sie können und müssen nicht umgestalten. Ein Thread kann jeweils nur auf eine Bedingungsvariable (und den zugehörigen Mutex) warten. In dieser Hinsicht sind die Windows-Funktionen für die Synchronisierung etwas reicher als die der "POSIX-Style" -Familie von Synchronisationsprimitiven.
Der typische Ansatz bei Thread-sicheren Warteschlangen besteht darin, ein spezielles "all done!" eine Nachricht oder eine Warteschlange "Unterbrechbar" (oder "Herunterfahren"). Im letzteren Fall schützt die interne Bedingungsvariable der Warteschlange dann ein komplexes Prädikat: Entweder ist ein Element verfügbar oder die Warteschlange wurde beschädigt.
In einem Kommentar beobachtest du das
a notify_all () hat keine Wirkung, wenn niemand wartet
Das stimmt, aber wahrscheinlich nicht relevant. wait()
ing für eine Bedingungsvariable bedeutet auch, ein Vergleichselement zu prüfen und es vor für eine Benachrichtigung zu blockieren. Ein Arbeitsthread, der damit beschäftigt ist, ein Warteschlangenelement zu bearbeiten, das eine notify_all()
"verfehlt", wird bei der nächsten Überprüfung der Warteschlangenbedingung feststellen, dass das Prädikat (ein neues Element ist verfügbar oder die Warteschlange ist fertig) geändert wurde.
Kürzlich habe ich dieses Problem mit Hilfe einer einzelnen Bedingungsvariablen und einer separaten booleschen Variablen für jeden Produzenten/Arbeiter gelöst. Das Prädikat innerhalb der Wait-Funktion im Consumer-Thread kann nach diesen Flags suchen und entscheiden, welcher Produzent/Arbeiter die Bedingung erfüllt hat.
Vielleicht kann das funktionieren:
befreien Sie sich von Interrupt.
message wait_and_pop(std::condition_variable& interrupt) {
std::unique_lock<std::mutex> lock(mutex);
{
new_msg_notification.wait(lock,[&]{
return !queue.empty() || stop;
});
if( !stop )
{
auto msg(std::move(queue.front()));
queue.pop();
return msg;
}
else
{
return NULL; //or some 'terminate' message
}
}
Ersetzen Sie im Destruktor interrupt.notify_all()
durch new_msg_notification.notify_all()