wake-up-neo.com

Was ist der beste Weg, um auf mehrere Zustandsvariablen in C++ 11 zu warten?

Zuerst ein wenig context : Ich lerne gerade etwas über Threading in C++ 11 und zu diesem Zweck versuche ich, eine kleine actor-Klasse zu erstellen, im Wesentlichen (ich habe die Ausnahmebehandlung und die Propagierung verlassen out) wie so:

class actor {
    private: std::atomic<bool> stop;
    private: std::condition_variable interrupt;
    private: std::thread actor_thread;
    private: message_queue incoming_msgs;

    public: actor() 
    : stop(false), 
      actor_thread([&]{ run_actor(); })
    {}

    public: virtual ~actor() {
        // if the actor is destroyed, we must ensure the thread dies too
        stop = true;
        // to this end, we have to interrupt the actor thread which is most probably
        // waiting on the incoming_msgs queue:
        interrupt.notify_all();
        actor_thread.join();
    }

    private: virtual void run_actor() {
        try {
            while(!stop)
                // wait for new message and process it
                // but interrupt the waiting process if interrupt is signaled:
                process(incoming_msgs.wait_and_pop(interrupt));
        } 
        catch(interrupted_exception) {
            // ...
        }
    };

    private: virtual void process(const message&) = 0;
    // ...
};

Jeder Akteur läuft in seinem eigenen actor_thread, wartet auf eine neue eingehende Nachricht auf incoming_msgs und verarbeitet sie, wenn eine Nachricht eintrifft. 

Der actor_thread wird zusammen mit der actor erstellt und muss zusammen mit ihm sterben, weshalb ich einen Interrupt-Mechanismus in der message_queue::wait_and_pop(std::condition_variable interrupt) brauche. 

Im Wesentlichen fordere ich, dass wait_and_pop blockiert, bis entweder A) eine neue message ankommt oder B) bis die interrupt ausgelöst wird, wobei - idealerweise - ein interrupted_exception geworfen werden soll.

Das Eintreffen einer neuen Nachricht im message_queue wird derzeit auch durch einen std::condition_variable new_msg_notification modelliert:

// ...
// in class message_queue:
message wait_and_pop(std::condition_variable& interrupt) {
    std::unique_lock<std::mutex> lock(mutex);

    // How to interrupt the following, when interrupt fires??
    new_msg_notification.wait(lock,[&]{
        return !queue.empty();
    });
    auto msg(std::move(queue.front()));
    queue.pop();
    return msg;
}

Um die lange Geschichte kurz zu machen, die Frage ist dies: Wie unterbreche ich das Warten auf eine neue Nachricht in new_msg_notification.wait(...), wenn die interrupt ausgelöst wird (ohne ein Timeout einzuführen)?

Alternativ kann die Frage gelesen werden als: Wie warte ich, bis einer von zwei std::condition_variables signalisiert wird? 

Ein naiver Ansatz scheint es zu sein, std::condition_variable für den Interrupt überhaupt nicht zu verwenden und stattdessen einfach ein atomares Flag std::atomic<bool> interrupted zu verwenden und dann mit einem sehr kurzen Timeout auf new_msg_notification zu warten, bis entweder eine neue Nachricht eingegangen ist oder bis true==interrupted. Ich möchte jedoch sehr gerne das Warten aufgeben.


BEARBEITEN:

Aus den Kommentaren und der Antwort von Pilcrow sind grundsätzlich zwei Ansätze möglich.

  1. Erzeuge eine spezielle "Terminate" -Meldung, wie von Alan, Mukunda und Pilcrow vorgeschlagen. Ich habe mich gegen diese Option entschieden, weil ich keine Ahnung von der Größe der Warteschlange zu dem Zeitpunkt habe, an dem der Schauspieler beendet werden soll. Es kann sehr gut sein (wie es meistens der Fall ist, wenn ich möchte, dass etwas schnell beendet wird), dass in der Warteschlange noch Tausende von Nachrichten verarbeitet werden müssen, und es scheint inakzeptabel zu sein, auf ihre Verarbeitung zu warten, bis schließlich die Beendigungsnachricht ihre Nachricht erhält Wende.
  2. Implementieren Sie eine benutzerdefinierte Version einer Bedingungsvariablen, die von einem anderen Thread unterbrochen werden kann, indem Sie die Benachrichtigung an die Bedingungsvariable weiterleiten, auf die der erste Thread wartet. Ich habe mich für diesen Ansatz entschieden.

Für diejenigen, die daran interessiert sind, geht meine Implementierung folgendermaßen vor. Die Bedingungsvariable in meinem Fall ist tatsächlich eine semaphore (weil ich sie mehr mag und weil mir die Übung gefallen hat). Ich habe dieses Semaphor mit einer zugehörigen interrupt ausgestattet, die über semaphore::get_interrupt() aus dem Semaphor bezogen werden kann. Wenn nun ein Thread in semaphore::wait() blockiert, hat ein anderer Thread die Möglichkeit, semaphore::interrupt::trigger() für den Interrupt des Semaphors aufzurufen, wodurch der erste Thread einen interrupt_exception aufhebt und propagiert.

struct
interrupt_exception {};

class
semaphore {
    public: class interrupt;
    private: mutable std::mutex mutex;

    // must be declared after our mutex due to construction order!
    private: interrupt* informed_by;
    private: std::atomic<long> counter;
    private: std::condition_variable cond;

    public: 
    semaphore();

    public: 
    ~semaphore() throw();

    public: void 
    wait();

    public: interrupt&
    get_interrupt() const { return *informed_by; }

    public: void
    post() {
        std::lock_guard<std::mutex> lock(mutex);
        counter++;
        cond.notify_one(); // never throws
    }

    public: unsigned long
    load () const {
        return counter.load();
    }
};

class
semaphore::interrupt {
    private: semaphore *forward_posts_to;
    private: std::atomic<bool> triggered;

    public:
    interrupt(semaphore *forward_posts_to) : triggered(false), forward_posts_to(forward_posts_to) {
        assert(forward_posts_to);
        std::lock_guard<std::mutex> lock(forward_posts_to->mutex);
        forward_posts_to->informed_by = this;
    }

    public: void
    trigger() {
        assert(forward_posts_to);
        std::lock_guard<std::mutex>(forward_posts_to->mutex);

        triggered = true;
        forward_posts_to->cond.notify_one(); // never throws
    }

    public: bool
    is_triggered () const throw() {
        return triggered.load();
    }

    public: void
    reset () throw() {
        return triggered.store(false);
    }
};

semaphore::semaphore()  : counter(0L), informed_by(new interrupt(this)) {}

// must be declared here because otherwise semaphore::interrupt is an incomplete type
semaphore::~semaphore() throw()  {
    delete informed_by;
}

void
semaphore::wait() {
    std::unique_lock<std::mutex> lock(mutex);
    if(0L==counter) {
        cond.wait(lock,[&]{
            if(informed_by->is_triggered())
                throw interrupt_exception();
            return counter>0;
        });
    }
    counter--;
}

Mit dieser semaphore sieht meine Implementierung der Nachrichtenwarteschlange jetzt so aus (mithilfe des Semaphors anstelle von std::condition_variable könnte ich den std::mutex entfernen:

class
message_queue {    
    private: std::queue<message> queue;
    private: semaphore new_msg_notification;

    public: void
    Push(message&& msg) {
        queue.Push(std::move(msg));
        new_msg_notification.post();
    }

    public: const message
    wait_and_pop() {
        new_msg_notification.wait();
        auto msg(std::move(queue.front()));
        queue.pop();
        return msg;
    }

    public: semaphore::interrupt&
    get_interrupt() const { return new_msg_notification.get_interrupt(); }
};

Mein actor kann jetzt seinen Thread mit sehr geringer Latenz in seinem Thread unterbrechen. Die Implementierung sieht derzeit so aus:

class
actor {
    private: message_queue
    incoming_msgs;

    /// must be declared after incoming_msgs due to construction order!
    private: semaphore::interrupt&
    interrupt;

    private: std::thread
    my_thread;

    private: std::exception_ptr
    exception;

    public:
    actor()
    : interrupt(incoming_msgs.get_interrupt()), my_thread(
        [&]{
            try {
                run_actor();
            }
            catch(...) {
                exception = std::current_exception();
            }
        })
    {}

    private: virtual void
    run_actor() {
        while(!interrupt.is_triggered())
            process(incoming_msgs.wait_and_pop());
    };

    private: virtual void
    process(const message&) = 0;

    public: void
    notify(message&& msg_in) {
        incoming_msgs.Push(std::forward<message>(msg_in));
    }

    public: virtual
    ~actor() throw (interrupt_exception) {
        interrupt.trigger();
        my_thread.join();
        if(exception)
            std::rethrow_exception(exception);
    }
};
25
Julian

Du fragst,

Was ist der beste Weg, um auf mehrere Zustandsvariablen in C++ 11 zu warten?

Sie können und müssen nicht umgestalten. Ein Thread kann jeweils nur auf eine Bedingungsvariable (und den zugehörigen Mutex) warten. In dieser Hinsicht sind die Windows-Funktionen für die Synchronisierung etwas reicher als die der "POSIX-Style" -Familie von Synchronisationsprimitiven.

Der typische Ansatz bei Thread-sicheren Warteschlangen besteht darin, ein spezielles "all done!" eine Nachricht oder eine Warteschlange "Unterbrechbar" (oder "Herunterfahren"). Im letzteren Fall schützt die interne Bedingungsvariable der Warteschlange dann ein komplexes Prädikat: Entweder ist ein Element verfügbar oder die Warteschlange wurde beschädigt.

In einem Kommentar beobachtest du das

a notify_all () hat keine Wirkung, wenn niemand wartet

Das stimmt, aber wahrscheinlich nicht relevant. wait()ing für eine Bedingungsvariable bedeutet auch, ein Vergleichselement zu prüfen und es vor für eine Benachrichtigung zu blockieren. Ein Arbeitsthread, der damit beschäftigt ist, ein Warteschlangenelement zu bearbeiten, das eine notify_all() "verfehlt", wird bei der nächsten Überprüfung der Warteschlangenbedingung feststellen, dass das Prädikat (ein neues Element ist verfügbar oder die Warteschlange ist fertig) geändert wurde.

13
pilcrow

Kürzlich habe ich dieses Problem mit Hilfe einer einzelnen Bedingungsvariablen und einer separaten booleschen Variablen für jeden Produzenten/Arbeiter gelöst. Das Prädikat innerhalb der Wait-Funktion im Consumer-Thread kann nach diesen Flags suchen und entscheiden, welcher Produzent/Arbeiter die Bedingung erfüllt hat. 

3
Aditya Kumar

Vielleicht kann das funktionieren:

befreien Sie sich von Interrupt.

 message wait_and_pop(std::condition_variable& interrupt) {
    std::unique_lock<std::mutex> lock(mutex);
    {
        new_msg_notification.wait(lock,[&]{
            return !queue.empty() || stop;
        });

        if( !stop )
        {
            auto msg(std::move(queue.front()));
            queue.pop();
            return msg;
        }
        else
        {
            return NULL; //or some 'terminate' message
        }
}

Ersetzen Sie im Destruktor interrupt.notify_all() durch new_msg_notification.notify_all()

0
Davidb