wake-up-neo.com

Wie kann man Sellerie mit Asyncio kombinieren?

Wie kann ich einen Wrapper erstellen, der Sellerieaufgaben wie asyncio.Task aussehen lässt? Oder gibt es eine bessere Möglichkeit, Sellerie mit asyncio zu integrieren?

@asksol, der Schöpfer von Sellerie, sagte dies: :

Es ist durchaus üblich, Celery als verteilte Schicht über asynchronen E/A-Frameworks zu verwenden (Top-Tipp: Das Weiterleiten von CPU-gebundenen Aufgaben an einen Prefork-Mitarbeiter bedeutet, dass die Ereignisschleife nicht blockiert wird).

Ich konnte jedoch keine speziellen Codebeispiele für asyncio-Framework finden.

19
max

Dies ist ab der Version 5.0 von Celery möglich, wie auf der offiziellen Website angegeben:

http://docs.celeryproject.org/de/4.0/whatsnew-4.0.html#preface

  1. Die nächste Hauptversion von Celery wird nur Python 3.5 unterstützen, wenn wir die neue Asyncio-Bibliothek nutzen möchten.
  2. Durch den Wegfall der Unterstützung für Python 2 können wir große Mengen an Kompatibilitätscode entfernen. Durch die Verwendung von Python 3.5 können wir die Vorteile von Typisierung, Async/Waitit, Asyncio und ähnlichen Konzepten nutzen, für die ältere Versionen keine Alternative bieten.

Die oben genannten wurden aus dem vorherigen Link zitiert.

Am besten warten Sie also, bis version 5.0 verteilt ist!

In der Zwischenzeit fröhliche Kodierung :)

12
John Moutafis

Sie können jeden blockierenden Aufruf mithilfe von run_in_executor in eine Task einbinden, wie in documentation beschrieben. Ich habe im Beispiel auch ein benutzerdefiniertes timeout hinzugefügt:

def run_async_task(
    target,
    *args,
    timeout = 60,
    **keywords
) -> Future:
    loop = asyncio.get_event_loop()
    return asyncio.wait_for(
        loop.run_in_executor(
            executor,
            functools.partial(target, *args, **keywords)
        ),
        timeout=timeout,
        loop=loop
    )
loop = asyncio.get_event_loop()
async_result = loop.run_until_complete(
    run_async_task, your_task.delay, some_arg, some_karg="" 
)
result = loop.run_until_complete(
    run_async_task, async_result.result 
)
2
danius

Die sauberste Methode, die ich gefunden habe, ist, die async-Funktion in asgiref.sync.async_to_sync (von asgiref ) einzuhüllen:

from asgiref.sync import async_to_sync
from celery.task import periodic_task


async def return_hello():
    await sleep(1)
    return 'hello'


@periodic_task(
    run_every=2,
    name='return_hello',
)
def task_return_hello():
    async_to_sync(return_hello)()

Ich habe dieses Beispiel aus einem Blog-Post entnommen habe ich geschrieben.

1
Franey

Dieser einfache Weg hat für mich gut funktioniert:

import asyncio
from celery import Celery

app = Celery('tasks')

async def async_function(param1, param2):
    # more async stuff...
    pass

@app.task(name='tasks.task_name', queue='queue_name')
def task_name(param1, param2):
    asyncio.run(async_function(param1, param2))
0
juanra