wake-up-neo.com

Wie benutze ich sync.Cond richtig?

Ich habe Probleme, herauszufinden, wie sync.Cond richtig verwendet wird. Soweit ich das beurteilen kann, gibt es eine Race-Bedingung zwischen dem Sperren des Locker und dem Aufrufen der Wait-Methode der Bedingung. In diesem Beispiel wird eine künstliche Verzögerung zwischen den beiden Linien in der Hauptcoroutine hinzugefügt, um die Race-Bedingung zu simulieren:

package main

import (
    "sync"
    "time"
)

func main() {
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    go func() {
        time.Sleep(1 * time.Second)
        c.Broadcast()
    }()
    m.Lock()
    time.Sleep(2 * time.Second)
    c.Wait()
}

[ Run on the Go Playground ]

Dies verursacht eine sofortige Panik:

schwerwiegender Fehler: Alle Goroutines sind eingeschlafen - Deadlock! 

 Goroutine 1 [Halbacquire]: 
 /usr/local/go/src/runtime/sema.go:241 + 0x2e0 
 sync. (* Cond) .Warten (0x10330200, 0x0) 
 /usr/local/go/src/sync/cond.go:63 + 0xe0 
 main.main () 
 /tmp/sandbox301865429/main.go:17 + 0x1a0

Was mache ich falsch? Wie vermeide ich diesen scheinbaren Rennzustand? Gibt es ein besseres Synchronisationskonstrukt, das ich verwenden sollte?


Edit: Ich weiß, ich hätte das Problem, das ich hier lösen möchte, besser erklären sollen. Ich habe eine lange laufende Goroutine, die eine große Datei und eine Reihe anderer Goroutinen herunterlädt, die Zugriff auf die HTTP-Header benötigen, wenn sie verfügbar sind. Dieses Problem ist schwieriger als es sich anhört.

Ich kann keine Kanäle verwenden, da nur eine Goroutine den Wert erhalten würde. Und einige der anderen Goroutinen würden versuchen, die Header abzurufen, lange nachdem sie bereits verfügbar sind.

Die Downloader-Goroutine kann die HTTP-Header einfach in einer Variablen speichern und einen Mutex verwenden, um den Zugriff auf sie zu sichern. Dies ist jedoch keine Möglichkeit für die anderen Goroutinen, auf ihre Verfügbarkeit zu "warten".

Ich hatte gedacht, dass sowohl ein sync.Mutex als auch sync.Cond zusammen dieses Ziel erreichen könnten, aber es scheint, dass dies nicht möglich ist.

13
Nathan Osman

Ich habe endlich einen Weg gefunden, dies zu tun, und sync.Cond ist überhaupt nicht betroffen - nur der Mutex.

type Task struct {
    m       sync.Mutex
    headers http.Header
}

func NewTask() *Task {
    t := &Task{}
    t.m.Lock()
    go func() {
        defer t.m.Unlock()
        // ...do stuff...
    }()
    return t
}

func (t *Task) WaitFor() http.Header {
    t.m.Lock()
    defer t.m.Unlock()
    return t.headers
}

Wie funktioniert das?

Der Mutex ist zu Beginn der Task gesperrt, um sicherzustellen, dass alles, was WaitFor() aufruft, blockiert wird. Sobald die Header verfügbar sind und der Mutex von der Goroutine freigegeben wurde, wird jeder Aufruf von WaitFor() nacheinander ausgeführt. Alle zukünftigen Aufrufe (auch nach dem Ende der Goroutine) haben kein Problem, den Mutex zu sperren, da er immer entsperrt bleibt.

1
Nathan Osman

OP beantwortete seine eigene, beantwortete jedoch nicht die ursprüngliche Frage direkt, ich werde posten, wie sync.Cond richtig verwendet wird.

Sie brauchen sync.Cond nicht wirklich, wenn Sie für jedes Schreiben und Lesen eine Goroutine haben - ein einzelner sync.Mutex würde ausreichen, um zwischen ihnen zu kommunizieren. sync.Cond könnte in Situationen nützlich sein, in denen mehrere Leser darauf warten, dass die freigegebenen Ressourcen verfügbar sind.

var sharedRsc = make(map[string]interface{})
func main() {
    var wg sync.WaitGroup
    wg.Add(2)
    m := sync.Mutex{}
    c := sync.NewCond(&m)
    go func() {
        // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc1"])
        c.L.Unlock()
        wg.Done()
    }()

    go func() {
        // this go routine wait for changes to the sharedRsc
        c.L.Lock()
        for len(sharedRsc) == 0 {
            c.Wait()
        }
        fmt.Println(sharedRsc["rsc2"])
        c.L.Unlock()
        wg.Done()
    }()

    // this one writes changes to sharedRsc
    c.L.Lock()
    sharedRsc["rsc1"] = "foo"
    sharedRsc["rsc2"] = "bar"
    c.Broadcast()
    c.L.Unlock()
    wg.Wait()
}

Spielplatz

Allerdings ist die Verwendung von Kanälen immer noch die empfohlene Methode, um Daten weiterzuleiten, wenn die Situation dies zulässt.

Hinweis: sync.WaitGroup wird hier nur verwendet, um zu warten, bis die Goroutinen ihre Ausführung abgeschlossen haben.

7

Sie müssen sicherstellen, dass c.Broadcast after heißt, nachdem Sie c.Wait angerufen haben. Die korrekte Version Ihres Programms wäre:

package main

import (
    "fmt"
    "sync"
)

func main() {
    m := &sync.Mutex{}
    c := sync.NewCond(m)
    m.Lock()
    go func() {
        m.Lock() // Wait for c.Wait()
        c.Broadcast()
        m.Unlock()
    }()
    c.Wait() // Unlocks m
}

https://play.golang.org/p/O1r8v8yW6h

5
eric chiang

Sieht aus wie Sie c.Warten Sie auf Broadcast, was niemals mit Ihren Zeitintervallen passieren würde

time.Sleep(3 * time.Second) //Broadcast after any Wait for it
c.Broadcast()

ihr Snippet scheint zu funktionieren http://play.golang.org/p/OE8aP4i6gY . Oder fehlt mir etwas, das Sie versuchen zu erreichen?

2
Uvelichitel
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    m := sync.Mutex{}
    m.Lock() // main gouroutine is owner of lock
    c := sync.NewCond(&m)
    go func() {
        m.Lock() // obtain a lock
        defer m.Unlock()
        fmt.Println("3. goroutine is owner of lock")
        time.Sleep(2 * time.Second) // long computing - because you are the owner, you can change state variable(s)
        c.Broadcast()               // State has been changed, publish it to waiting goroutines
        fmt.Println("4. goroutine will release lock soon (deffered Unlock")
    }()
    fmt.Println("1. main goroutine is owner of lock")
    time.Sleep(1 * time.Second) // initialization
    fmt.Println("2. main goroutine is still lockek")
    c.Wait() // Wait temporarily release a mutex during wating and give opportunity to other goroutines to change the state.
    // Because you don't know, whether this is state, that you are waiting for, is usually called in loop.
    m.Unlock()
    fmt.Println("Done")
}

http://play.golang.org/p/fBBwoL7_pm

2
lofcek

Hier ist ein praktisches Beispiel mit zwei Go-Routinen. Sie beginnen nacheinander, aber der zweite wartet auf eine Bedingung, die vom ersten übertragen wird, bevor er fortfährt:

package main

import (
    "sync"
    "fmt"
    "time"
)

func main() {
    lock := sync.Mutex{}
    lock.Lock()

    cond := sync.NewCond(&lock)

    waitGroup := sync.WaitGroup{}
    waitGroup.Add(2)

    go func() {
        defer waitGroup.Done()

        fmt.Println("First go routine has started and waits for 1 second before broadcasting condition")

        time.Sleep(1 * time.Second)

        fmt.Println("First go routine broadcasts condition")

        cond.Broadcast()
    }()

    go func() {
        defer waitGroup.Done()

        fmt.Println("Second go routine has started and is waiting on condition")

        cond.Wait()

        fmt.Println("Second go routine unlocked by condition broadcast")
    }()

    fmt.Println("Main go routine starts waiting")

    waitGroup.Wait()

    fmt.Println("Main go routine ends")
}

Die Ausgabe kann geringfügig variieren, da die Second-Go-Routine vor der ersten und umgekehrt beginnen könnte:

Main go routine starts waiting
Second go routine has started and is waiting on condition
First go routine has started and waits for 1 second before broadcasting condition
First go routine broadcasts condition
Second go routine unlocked by condition broadcast
Main go routine ends

https://Gist.github.com/fracasula/21565ea1cf0c15726ca38736031edc70

2

Ja, Sie können einen Kanal verwenden, um den Header an mehrere Go-Routinen zu übergeben.

headerChan := make(chan http.Header)

go func() { // This routine can be started many times
    header := <-headerChan  // Wait for header
    // Do things with the header
}()

// Feed the header to all waiting go routines
for more := true; more; {
    select {
    case headerChan <- r.Header:
    default: more = false
    }
}
0
Jonas