wake-up-neo.com

wie füge ich eine Coroutine zu einer laufenden Asyncio-Schleife hinzu?

Wie kann man einer laufenden Asyncio-Schleife eine neue Coroutine hinzufügen? Ie. eine, die bereits eine Reihe von Coroutinen ausführt.

Ich denke, als Workaround könnte man warten, bis vorhandene Coroutinen abgeschlossen sind und dann eine neue Schleife (mit der zusätzlichen Coroutine) initialisieren. Aber gibt es einen besseren Weg?

18
Petri

Sie können create_task verwenden, um neue Coroutinen zu planen:

import asyncio

async def cor1():
    ...

async def cor2():
    ...

async def main(loop):
    await asyncio.sleep(0)
    t1 = loop.create_task(cor1())
    await cor2()
    await t1

loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()
13
Jashandeep Sohi

Um einer bereits laufenden Ereignisschleife eine Funktion hinzuzufügen, können Sie Folgendes verwenden:

asyncio.ensure_future(my_coro())

In meinem Fall habe ich Multithreading (threading) neben asyncio verwendet und wollte der bereits ausgeführten Ereignisschleife eine Aufgabe hinzufügen. Stellen Sie für alle anderen Benutzer in derselben Situation sicher, dass Sie die Ereignisschleife explizit angeben (da sie in einem Thread nicht vorhanden ist). d.h.

Im globalen Bereich:

event_loop = asyncio.get_event_loop()

Dann später in Ihrem Thread:

asyncio.ensure_future(my_coro(), loop=event_loop)
13
Dotl

Ihre Frage ist sehr nah an "Wie füge ich dem laufenden Programm einen Funktionsaufruf hinzu?"

Wann genau müssen Sie der Ereignisschleife neue Coroutine hinzufügen?

Sehen wir uns einige Beispiele an. Hier ein Programm, das die Ereignisschleife parallel mit zwei Coroutinen startet:

import asyncio
from random import randint


async def coro1():
    res = randint(0,3)
    await asyncio.sleep(res)
    print('coro1 finished with output {}'.format(res))
    return res

async def main():
    await asyncio.gather(
        coro1(),
        coro1()
    ) # here we have two coroutines running parallely

if __== "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Ausgabe:

coro1 finished with output 1
coro1 finished with output 2
[Finished in 2.2s]

Möglicherweise müssen Sie einige Coroutinen hinzufügen, die die Ergebnisse von coro1 übernehmen und verwenden, sobald sie fertig sind. In diesem Fall erstellen Sie einfach Coroutine, die auf coro1 warten, und verwenden Sie den Rückgabewert:

import asyncio
from random import randint


async def coro1():
    res = randint(0,3)
    await asyncio.sleep(res)
    print('coro1 finished with output {}'.format(res))
    return res

async def coro2():
    res = await coro1()
    res = res * res
    await asyncio.sleep(res)
    print('coro2 finished with output {}'.format(res))
    return res

async def main():
    await asyncio.gather(
        coro2(),
        coro2()
    ) # here we have two coroutines running parallely

if __== "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Ausgabe:

coro1 finished with output 1
coro2 finished with output 1
coro1 finished with output 3
coro2 finished with output 9
[Finished in 12.2s]

Stellen Sie sich Coroutinen als reguläre Funktionen mit spezifischer Syntax vor. Sie können eine Reihe von Funktionen starten, die parallel ausgeführt werden sollen (durch asyncio.gather). Sie können die nächste Funktion starten, nachdem Sie sie fertiggestellt haben. Sie können neue Funktionen erstellen, die andere aufrufen.

8

Keine der Antworten scheint die Frage genau zu beantworten. Sie können einer laufenden Ereignisschleife Aufgaben hinzufügen, indem Sie eine "übergeordnete" Aufgabe für Sie erledigen lassen. Ich bin mir nicht sicher, auf welche Art und Weise Pythonic am sichersten ist, dass das Elternteil nicht endet, bis alle Kinder fertig sind (vorausgesetzt, das ist das gewünschte Verhalten), aber das funktioniert.

import asyncio
import random


async def add_event(n):
    print('starting ' + str(n))
    await asyncio.sleep(n)
    print('ending ' + str(n))
    return n


async def main(loop):

    added_tasks = []

    delays = [x for x in range(5)]

    # shuffle to simulate unknown run times
    random.shuffle(delays)

    for n in delays:
        print('adding ' + str(n))
        task = loop.create_task(add_event(n))
        added_tasks.append(task)
        await asyncio.sleep(0)

    print('done adding tasks')

    # make a list of tasks that (maybe) haven't completed
    running_tasks = added_tasks[::]

    # wait until we see that all tasks have completed
    while running_tasks:
        running_tasks = [x for x in running_tasks if not x.done()]
        await asyncio.sleep(0)

    print('done running tasks')

    # extract the results from the tasks and return them
    results = [x.result() for x in added_tasks]
    return results


loop = asyncio.get_event_loop()
results = loop.run_until_complete(main(loop))
loop.close()
print(results)
0
Adam