wake-up-neo.com

So erstellen Sie dynamische Workflows in Airflow

Problem

Gibt es in Airflow eine Möglichkeit, einen Workflow so zu erstellen, dass die Anzahl der Aufgaben B. * bis zum Abschluss von Aufgabe A unbekannt ist? Ich habe mir Subdags angesehen, aber es sieht so aus, als könnte es nur mit statischen Aufgaben funktionieren, die bei der Erstellung von Dag festgelegt werden müssen.

Würden Dag-Auslöser funktionieren? Und wenn ja, geben Sie bitte ein Beispiel an.

Ich habe ein Problem, bei dem es nicht möglich ist, die Anzahl von Aufgaben B zu kennen, die zur Berechnung von Aufgabe C erforderlich sind, bis Aufgabe A abgeschlossen ist. Jede Aufgabe B. * dauert mehrere Stunden zur Berechnung und kann nicht kombiniert werden.

              |---> Task B.1 --|
              |---> Task B.2 --|
 Task A ------|---> Task B.3 --|-----> Task C
              |       ....     |
              |---> Task B.N --|

Idee Nr. 1

Ich mag diese Lösung nicht, weil ich einen blockierenden ExternalTaskSensor erstellen muss und alle Task B. * zwischen 2 und 24 Stunden dauern wird. Daher halte ich dies nicht für eine praktikable Lösung. Sicher gibt es einen einfacheren Weg? Oder war Airflow nicht dafür ausgelegt?

Dag 1
Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C

Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator)
               |-- Task B.1 --|
               |-- Task B.2 --|
Task Dummy A --|-- Task B.3 --|-----> Task Dummy B
               |     ....     |
               |-- Task B.N --|

Edit 1:

Bis jetzt hat diese Frage noch keine großartige Antwort . Ich wurde von mehreren Leuten kontaktiert, die nach einer Lösung suchen. 

41
costrouc

So habe ich es mit einer ähnlichen Anfrage ohne Subdags gemacht:

Erstellen Sie zuerst eine Methode, die die gewünschten Werte zurückgibt

def values_function():
     return values

Erstellen Sie als Nächstes eine Methode, die die Jobs dynamisch generiert:

def group(number, **kwargs):
        #load the values if needed in the command you plan to execute
        dyn_value = "{{ task_instance.xcom_pull(task_ids='Push_func') }}"
        return BashOperator(
                task_id='JOB_NAME_{}'.format(number),
                bash_command='script.sh {} {}'.format(dyn_value, number),
                dag=dag)

Und dann kombinieren Sie sie:

Push_func = PythonOperator(
        task_id='Push_func',
        provide_context=True,
        python_callable=values_function,
        dag=dag)

complete = DummyOperator(
        task_id='All_jobs_completed',
        dag=dag)

for i in values_function():
        Push_func >> group(i) >> complete
15
Oleg Yamin

Ich habe einen Weg gefunden, Workflows basierend auf den Ergebnissen der vorherigen Aufgaben zu erstellen.
Grundsätzlich möchten Sie zwei Sub-Tags mit den folgenden Angaben haben: 

  1. Xcom Schieben Sie eine Liste (oder was auch immer Sie zum späteren Erstellen des dynamischen Workflows benötigen) in das Subdag, das zuerst ausgeführt wird (siehe test1.py def return_list()). 
  2. Übergeben Sie das main-Tag-Objekt als Parameter an Ihren zweiten Sub-Tag
  3. Wenn Sie jetzt das Hauptobjekt dag haben, können Sie es verwenden, um eine Liste seiner Taskinstanzen abzurufen. Aus dieser Liste von Aufgabeninstanzen können Sie eine Aufgabe des aktuellen Laufs herausfiltern, indem Sie parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1]) verwenden. Hier könnten wahrscheinlich weitere Filter hinzugefügt werden.
  4. Mit dieser Task-Instanz können Sie xcom pull verwenden, um den gewünschten Wert abzurufen, indem Sie die dag_id als eine der ersten untergeordneten Tags angeben: dag_id='%s.%s' % (parent_dag_name, 'test1')
  5. Verwenden Sie die Liste/den Wert, um Ihre Aufgaben dynamisch zu erstellen

Jetzt habe ich dies in meiner lokalen Luftstrominstallation getestet und es funktioniert einwandfrei. Ich weiß nicht, ob der xcom-Pull-Part Probleme hat, wenn mehr als eine Instanz von dag gleichzeitig ausgeführt wird, aber dann würden Sie wahrscheinlich entweder einen eindeutigen Schlüssel oder etwas ähnliches verwenden, um das xcom eindeutig zu identifizieren Wert, den Sie wollen . Man könnte den 3. Schritt wahrscheinlich so optimieren, dass er zu 100% sicher ist, eine bestimmte Aufgabe des aktuellen Hauptdag zu erhalten, aber für meine Verwendung ist dies gut genug xcom_pull.

Außerdem säubere ich die Xcoms für den ersten Subdag vor jeder Ausführung, nur um sicherzustellen, dass ich nicht versehentlich einen falschen Wert erhalte.

Ich bin ziemlich schlecht in der Erklärung, deshalb hoffe ich, dass der folgende Code alles klar machen wird:

test1.py

from airflow.models import DAG
import logging
from airflow.operators.python_operator import PythonOperator
from airflow.operators.postgres_operator import PostgresOperator

log = logging.getLogger(__name__)


def test1(parent_dag_name, start_date, schedule_interval):
    dag = DAG(
        '%s.test1' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date,
    )

    def return_list():
        return ['test1', 'test2']

    list_extract_folder = PythonOperator(
        task_id='list',
        dag=dag,
        python_callable=return_list
    )

    clean_xcoms = PostgresOperator(
        task_id='clean_xcoms',
        postgres_conn_id='airflow_db',
        sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
        dag=dag)

    clean_xcoms >> list_extract_folder

    return dag

test2.py

from airflow.models import DAG, settings
import logging
from airflow.operators.dummy_operator import DummyOperator

log = logging.getLogger(__name__)


def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None):
    dag = DAG(
        '%s.test2' % parent_dag_name,
        schedule_interval=schedule_interval,
        start_date=start_date
    )

    if len(parent_dag.get_active_runs()) > 0:
        test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull(
            dag_id='%s.%s' % (parent_dag_name, 'test1'),
            task_ids='list')
        if test_list:
            for i in test_list:
                test = DummyOperator(
                    task_id=i,
                    dag=dag
                )

    return dag

und der Hauptworkflow:

test.py

from datetime import datetime
from airflow import DAG
from airflow.operators.subdag_operator import SubDagOperator
from subdags.test1 import test1
from subdags.test2 import test2

DAG_NAME = 'test-dag'

dag = DAG(DAG_NAME,
          description='Test workflow',
          catchup=False,
          schedule_interval='0 0 * * *',
          start_date=datetime(2018, 8, 24))

test1 = SubDagOperator(
    subdag=test1(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval),
    task_id='test1',
    dag=dag
)

test2 = SubDagOperator(
    subdag=test2(DAG_NAME,
                 dag.start_date,
                 dag.schedule_interval,
                 parent_dag=dag),
    task_id='test2',
    dag=dag
)

test1 >> test2
4

Ja, das ist möglich. Ich habe eine Beispiel-DAG erstellt, die dies demonstriert.

import airflow
from airflow.operators.python_operator import PythonOperator
import os
from airflow.models import Variable
import logging
from airflow import configuration as conf
from airflow.models import DagBag, TaskInstance
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator

main_dag_id = 'DynamicWorkflow2'

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(2),
    'provide_context': True
}

dag = DAG(
    main_dag_id,
    schedule_interval="@once",
    default_args=args)


def start(*args, **kwargs):

    value = Variable.get("DynamicWorkflow_Group1")
    logging.info("Current DynamicWorkflow_Group1 value is " + str(value))


def resetTasksStatus(task_id, execution_date):
    logging.info("Resetting: " + task_id + " " + execution_date)

    dag_folder = conf.get('core', 'DAGS_FOLDER')
    dagbag = DagBag(dag_folder)
    check_dag = dagbag.dags[main_dag_id]
    session = settings.Session()

    my_task = check_dag.get_task(task_id)
    ti = TaskInstance(my_task, execution_date)
    state = ti.current_state()
    logging.info("Current state of " + task_id + " is " + str(state))
    ti.set_state(None, session)
    state = ti.current_state()
    logging.info("Updated state of " + task_id + " is " + str(state))


def bridge1(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 2

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group2 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group2 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group2")
    logging.info("Current DynamicWorkflow_Group2 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.Apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('secondGroup_' + str(i), str(kwargs['execution_date']))


def bridge2(*args, **kwargs):

    # You can set this value dynamically e.g., from a database or a calculation
    dynamicValue = 3

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    logging.info("Setting the Airflow Variable DynamicWorkflow_Group3 to " + str(dynamicValue))
    os.system('airflow variables --set DynamicWorkflow_Group3 ' + str(dynamicValue))

    variableValue = Variable.get("DynamicWorkflow_Group3")
    logging.info("Current DynamicWorkflow_Group3 value is " + str(variableValue))

    # Below code prevents this bug: https://issues.Apache.org/jira/browse/AIRFLOW-1460
    for i in range(dynamicValue):
        resetTasksStatus('thirdGroup_' + str(i), str(kwargs['execution_date']))


def end(*args, **kwargs):
    logging.info("Ending")


def doSomeWork(name, index, *args, **kwargs):
    # Do whatever work you need to do
    # Here I will just create a new file
    os.system('touch /home/ec2-user/airflow/' + str(name) + str(index) + '.txt')


starting_task = PythonOperator(
    task_id='start',
    dag=dag,
    provide_context=True,
    python_callable=start,
    op_args=[])

# Used to connect the stream in the event that the range is zero
bridge1_task = PythonOperator(
    task_id='bridge1',
    dag=dag,
    provide_context=True,
    python_callable=bridge1,
    op_args=[])

DynamicWorkflow_Group1 = Variable.get("DynamicWorkflow_Group1")
logging.info("The current DynamicWorkflow_Group1 value is " + str(DynamicWorkflow_Group1))

for index in range(int(DynamicWorkflow_Group1)):
    dynamicTask = PythonOperator(
        task_id='firstGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['firstGroup', index])

    starting_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge1_task)

# Used to connect the stream in the event that the range is zero
bridge2_task = PythonOperator(
    task_id='bridge2',
    dag=dag,
    provide_context=True,
    python_callable=bridge2,
    op_args=[])

DynamicWorkflow_Group2 = Variable.get("DynamicWorkflow_Group2")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group2))

for index in range(int(DynamicWorkflow_Group2)):
    dynamicTask = PythonOperator(
        task_id='secondGroup_' + str(index),
        dag=dag,
        provide_context=True,
        python_callable=doSomeWork,
        op_args=['secondGroup', index])

    bridge1_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(bridge2_task)

ending_task = PythonOperator(
    task_id='end',
    dag=dag,
    provide_context=True,
    python_callable=end,
    op_args=[])

DynamicWorkflow_Group3 = Variable.get("DynamicWorkflow_Group3")
logging.info("The current DynamicWorkflow value is " + str(DynamicWorkflow_Group3))

for index in range(int(DynamicWorkflow_Group3)):

    # You can make this logic anything you'd like
    # I chose to use the PythonOperator for all tasks
    # except the last task will use the BashOperator
    if index < (int(DynamicWorkflow_Group3) - 1):
        dynamicTask = PythonOperator(
            task_id='thirdGroup_' + str(index),
            dag=dag,
            provide_context=True,
            python_callable=doSomeWork,
            op_args=['thirdGroup', index])
    else:
        dynamicTask = BashOperator(
            task_id='thirdGroup_' + str(index),
            bash_command='touch /home/ec2-user/airflow/thirdGroup_' + str(index) + '.txt',
            dag=dag)

    bridge2_task.set_downstream(dynamicTask)
    dynamicTask.set_downstream(ending_task)

# If you do not connect these then in the event that your range is ever zero you will have a disconnection between your stream
# and your tasks will run simultaneously instead of in your desired stream order.
starting_task.set_downstream(bridge1_task)
bridge1_task.set_downstream(bridge2_task)
bridge2_task.set_downstream(ending_task)

Bevor Sie die DAG ausführen, erstellen Sie diese drei Luftstromvariablen

airflow variables --set DynamicWorkflow_Group1 1

airflow variables --set DynamicWorkflow_Group2 0

airflow variables --set DynamicWorkflow_Group3 0

Sie werden sehen, dass die DAG davon ausgeht

enter image description here

Dazu nachdem es gelaufen ist

enter image description here

Weitere Informationen zu dieser DAG finden Sie in meinem Artikel zum Erstellen von Dynamic Workflows On Airflow .

3

OA: "Gibt es eine Möglichkeit in Airflow, einen Workflow so zu erstellen, dass die Anzahl der Aufgaben B. * bis zum Abschluss von Aufgabe A unbekannt ist?"

Kurze Antwort ist nein. Airflow baut den DAG-Flow auf, bevor er gestartet wird.

Das heißt, wir kamen zu einer einfachen Schlussfolgerung, das heißt, wir haben kein solches Bedürfnis ... Wenn Sie einige Arbeiten parallelisieren möchten, sollten Sie die verfügbaren Ressourcen und nicht die Anzahl der zu verarbeitenden Elemente auswerten.

Wir haben es so gemacht: Wir generieren dynamisch eine feste Anzahl von Aufgaben, sagen wir 10, die den Job aufteilen. Wenn wir zum Beispiel 100 Dateien bearbeiten müssen, verarbeitet jede Aufgabe 10 von ihnen. Ich werde den Code später heute veröffentlichen.

Update

Hier ist der Code, sorry für die Verzögerung.

from datetime import datetime, timedelta

import airflow
from airflow.operators.dummy_operator import DummyOperator

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 1, 8),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': timedelta(seconds=5)
}

dag = airflow.DAG(
    'parallel_tasks_v1',
    schedule_interval="@daily",
    catchup=False,
    default_args=args)

# You can read this from variables
parallel_tasks_total_number = 10

start_task = DummyOperator(
    task_id='start_task',
    dag=dag
)


# Creates the tasks dynamically.
# Each one will elaborate one chunk of data.
def create_dynamic_task(current_task_number):
    return DummyOperator(
        provide_context=True,
        task_id='parallel_task_' + str(current_task_number),
        python_callable=parallelTask,
        # your task will take as input the total number and the current number to elaborate a chunk of total elements
        op_args=[current_task_number, int(parallel_tasks_total_number)],
        dag=dag)


end = DummyOperator(
    task_id='end',
    dag=dag)

for page in range(int(parallel_tasks_total_number)):
    created_task = create_dynamic_task(page)
    start_task >> created_task
    created_task >> end

Code-Erklärung:

Hier haben wir eine einzelne Startaufgabe und eine einzige Endaufgabe (beide Dummy).

Dann erstellen wir aus der Startaufgabe mit der for-Schleife 10 Aufgaben mit demselben aufrufbaren Python. Die Aufgaben werden in der Funktion create_dynamic_task angelegt.

Zu jedem aufrufbaren Python übergeben wir als Argumente die Gesamtzahl der parallelen Aufgaben und den aktuellen Aufgabenindex. 

Angenommen, Sie haben 1000 Elemente, die Sie ausarbeiten müssen: Die erste Aufgabe wird als Eingabe erhalten, die den ersten Teil aus 10 Teilen ausarbeiten soll. Es wird die 1000 Gegenstände in 10 Stücke teilen und die erste ausarbeiten.

1
Ena

Ich habe diese Medium Post gefunden, die dieser Frage sehr ähnlich ist. Es ist jedoch voller Tippfehler und funktioniert nicht, wenn ich es implementiert habe.

Meine Antwort auf das Obige lautet wie folgt: 

Wenn Sie Aufgaben dynamisch erstellen, müssen Sie dies tun indem Sie über etwas iterieren, das nicht von einer vorgelagerten Aufgabe erstellt wurde oder unabhängig von dieser Aufgabe definiert werden kann.Ich habe gelernt, dass Sie keine Ausführungsdaten übergeben können Luftstromvariablen zu etwas außerhalb einer Vorlage (z. B. einer Aufgabe), wie viele andere bereits zuvor angegeben haben. Siehe auch diesen Beitrag

0
Mark Matthews

Das Jobdiagramm wird zur Laufzeit nicht generiert. Das Diagramm wird vielmehr erstellt, wenn es von Airflow aus Ihrem Dags-Ordner abgerufen wird. Daher ist es nicht wirklich möglich, jedes Mal ein anderes Diagramm für den Job zu erstellen. Sie können einen Job so konfigurieren, dass er basierend auf einer Abfrage zum Zeitpunkt Laden ein Diagramm erstellt. Dieses Diagramm bleibt danach für jeden Durchlauf gleich, was wahrscheinlich nicht sehr nützlich ist.

Mithilfe eines Verzweigungsoperators können Sie ein Diagramm entwerfen, das auf der Grundlage der Abfrageergebnisse bei jedem Durchlauf verschiedene Aufgaben ausführt.

Ich habe eine Reihe von Aufgaben vorkonfiguriert, die Abfrageergebnisse übernommen und auf die Aufgaben verteilt. Dies ist wahrscheinlich ohnehin besser, da Sie den Scheduler ohnehin nicht mit vielen gleichzeitigen Aufgaben überfluten möchten, wenn Ihre Abfrage viele Ergebnisse zurückgibt. Um noch sicherer zu sein, habe ich auch einen Pool verwendet, um sicherzustellen, dass meine Nebenläufigkeit bei einer unerwartet großen Abfrage nicht außer Kontrolle gerät.

"""
 - This is an idea for how to invoke multiple tasks based on the query results
"""
import logging
from datetime import datetime

from airflow import DAG
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.mysql_operator import MySqlOperator
from airflow.operators.python_operator import PythonOperator, BranchPythonOperator
from include.run_celery_task import runCeleryTask

########################################################################

default_args = {
    'owner': 'airflow',
    'catchup': False,
    'depends_on_past': False,
    'start_date': datetime(2019, 7, 2, 19, 50, 00),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0,
    'max_active_runs': 1
}

dag = DAG('dynamic_tasks_example', default_args=default_args, schedule_interval=None)

totalBuckets = 5

get_orders_query = """
select 
    o.id,
    o.customer
from 
    orders o
where
    o.created_at >= current_timestamp at time zone 'UTC' - '2 days'::interval
    and
    o.is_test = false
    and
    o.is_processed = false
"""

###########################################################################################################

# Generate a set of tasks so we can parallelize the results
def createOrderProcessingTask(bucket_number):
    return PythonOperator( 
                           task_id=f'order_processing_task_{bucket_number}',
                           python_callable=runOrderProcessing,
                           pool='order_processing_pool',
                           op_kwargs={'task_bucket': f'order_processing_task_{bucket_number}'},
                           provide_context=True,
                           dag=dag
                          )


# Fetch the order arguments from xcom and doStuff() to them
def runOrderProcessing(task_bucket, **context):
    orderList = context['ti'].xcom_pull(task_ids='get_open_orders', key=task_bucket)

    if orderList is not None:
        for order in orderList:
            logging.info(f"Processing Order with Order ID {order[order_id]}, customer ID {order[customer_id]}")
            doStuff(**op_kwargs)


# Discover the orders we need to run and group them into buckets for processing
def getOpenOrders(**context):
    myDatabaseHook = PostgresHook(postgres_conn_id='my_database_conn_id')

    # initialize the task list buckets
    tasks = {}
    for task_number in range(0, totalBuckets):
        tasks[f'order_processing_task_{task_number}'] = []

    # populate the task list buckets
    # distribute them evenly across the set of buckets
    resultCounter = 0
    for record in myDatabaseHook.get_records(get_orders_query):

        resultCounter += 1
        bucket = (resultCounter % totalBuckets)

        tasks[f'order_processing_task_{bucket}'].append({'order_id': str(record[0]), 'customer_id': str(record[1])})

    # Push the order lists into xcom
    for task in tasks:
        if len(tasks[task]) > 0:
            logging.info(f'Task {task} has {len(tasks[task])} orders.')
            context['ti'].xcom_Push(key=task, value=tasks[task])
        else:
            # if we didn't have enough tasks for every bucket
            # don't bother running that task - remove it from the list
            logging.info(f"Task {task} doesn't have any orders.")
            del(tasks[task])

    return list(tasks.keys())

###################################################################################################


# this just makes sure that there aren't any dangling xcom values in the database from a crashed dag
clean_xcoms = MySqlOperator(
    task_id='clean_xcoms',
    mysql_conn_id='airflow_db',
    sql="delete from xcom where dag_id='{{ dag.dag_id }}'",
    dag=dag)


# Ideally we'd use BranchPythonOperator() here instead of PythonOperator so that if our
# query returns fewer results than we have buckets, we don't try to run them all.
# Unfortunately I couldn't get BranchPythonOperator to take a list of results like the
# documentation says it should (Airflow 1.10.2). So we call all the bucket tasks for now.
get_orders_task = PythonOperator(
                                 task_id='get_orders',
                                 python_callable=getOpenOrders,
                                 provide_context=True,
                                 dag=dag
                                )
open_order_task.set_upstream(clean_xcoms)

# set up the parallel tasks -- these are configured at compile time, not at run time:
for bucketNumber in range(0, totalBuckets):
    taskBucket = createOrderProcessingTask(bucketNumber)
    taskBucket.set_upstream(get_orders_task)


###################################################################################################
0
rotten

Ich denke, ich habe unter https://github.com/mastak/airflow_multi_dagrun eine schönere Lösung gefunden, die das einfache Einreihen von DagRuns durch Auslösen mehrerer Dagruns ähnlich wie TriggerDagRuns verwendet. Die meisten Kredite gehen an https://github.com/mastak , obwohl ich einige Details [...] patchen musste, damit es mit dem neuesten Luftstrom funktioniert.

Die Lösung verwendet einen custom-Operator, der mehrere DagRuns auslöst:

from airflow import settings
from airflow.models import DagBag
from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator
from airflow.utils.decorators import apply_defaults
from airflow.utils.state import State
from airflow.utils import timezone


class TriggerMultiDagRunOperator(TriggerDagRunOperator):
    CREATED_DAGRUN_KEY = 'created_dagrun_key'

    @apply_defaults
    def __init__(self, op_args=None, op_kwargs=None,
                 *args, **kwargs):
        super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
        self.op_args = op_args or []
        self.op_kwargs = op_kwargs or {}

    def execute(self, context):

        context.update(self.op_kwargs)
        session = settings.Session()
        created_dr_ids = []
        for dro in self.python_callable(*self.op_args, **context):
            if not dro:
                break
            if not isinstance(dro, DagRunOrder):
                dro = DagRunOrder(payload=dro)

            now = timezone.utcnow()
            if dro.run_id is None:
                dro.run_id = 'trig__' + now.isoformat()

            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                execution_date=now,
                state=State.RUNNING,
                conf=dro.payload,
                external_trigger=True,
            )
            created_dr_ids.append(dr.id)
            self.log.info("Created DagRun %s, %s", dr, now)

        if created_dr_ids:
            session.commit()
            context['ti'].xcom_Push(self.CREATED_DAGRUN_KEY, created_dr_ids)
        else:
            self.log.info("No DagRun created")
        session.close()

Sie können dann mehrere Dagruns von der aufrufbaren Funktion in Ihrem PythonOperator übergeben, zum Beispiel:

from airflow.operators.dagrun_operator import DagRunOrder
from airflow.models import DAG
from airflow.operators import TriggerMultiDagRunOperator
from airflow.utils.dates import days_ago


def generate_dag_run(**kwargs):
    for i in range(10):
        order = DagRunOrder(payload={'my_variable': i})
        yield order

args = {
    'start_date': days_ago(1),
    'owner': 'airflow',
}

dag = DAG(
    dag_id='simple_trigger',
    max_active_runs=1,
    schedule_interval='@hourly',
    default_args=args,
)

gen_target_dag_run = TriggerMultiDagRunOperator(
    task_id='gen_target_dag_run',
    dag=dag,
    trigger_dag_id='common_target',
    python_callable=generate_dag_run
)

Ich habe eine Verzweigung mit dem Code unter https://github.com/flinz/airflow_multi_dagrun erstellt.

0
flinz