Skip to content Skip to sidebar Skip to footer

Airflow "none_failed" Skipping When Upstream Skips

I have a workflow where I have two parallel processes (sentinel_run and sentinel_skip) which should run or be skipped based on a condition, and then join together (resolve). I need

Solution 1:

Documenting this because this issue has bitten me twice and now I've solved it twice.

Problem Analysis

When you turn the log level to DEBUG, you start to see what's going on:

[2019-10-09 18:30:05,472] {python_operator.py:114} INFO - Done. Returned value was: False
[2019-10-0918:30:05,472] {python_operator.py:159} INFO - Condition result is False
[2019-10-09 18:30:05,472] {python_operator.py:165} INFO - Skipping downstream tasks...
[2019-10-09 18:30:05,472] {python_operator.py:168} DEBUG - Downstream task_ids [<Task(DummyOperator): f>, <Task(DummyOperator): g>, <Task(DummyOperator): d>, <Task(DummyOperator): resolve>, <Task(DummyOperator): e>][2019-10-09 18:30:05,492] {python_operator.py:173} INFO - Done.

From this, you can see that the problem isn't that "none_failed" is handling the tasks incorrectly, but rather that the sentinel simulating the skip condition is marking all downstream dependencies skipped directly. This is a behavior of the ShortCircuitOperator - skipping all of the downstreams, including tasks downstream of downstream tasks.

Solution

The solution to this problem lies in recognizing that it's the behavior of the ShortCircuitOperator, not the TriggerRule, which is causing the problem. Once we realize that, it's time to set about writing an operator better suited to the task we're actually trying to accomplish.

I've included the operator I'm currently using; I'd welcome any input on a better way to handle the modification of the single downstream tasks. I'm sure there's a better idiom for "skip just the next one and let the rest cascade according to their trigger rules", but I've already spent more time than I wanted on this and I suspect the answer lies even deeper in the internals.

"""Sentinel Operator Plugin"""import datetime

from airflow import settings
from airflow.models import SkipMixin, TaskInstance
from airflow.operators.python_operator import PythonOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.state import State


classSentinelOperator(PythonOperator, SkipMixin):
    """
    Allows a workflow to continue only if a condition is met. Otherwise, the
    workflow skips cascading downstream to the next time a viable task
    is identified.

    The SentinelOperator is derived from the PythonOperator. It evaluates a
    condition and stops the workflow if the condition is False. Immediate
    downstream tasks are skipped. If the condition is True, downstream tasks
    proceed as normal.

    The condition is determined by the result of `python_callable`.
    """defexecute(self, context):
        condition = super(SentinelOperator, self).execute(context)
        self.log.info("Condition result is %s", condition)

        if condition:
            self.log.info('Proceeding with downstream tasks...')
            return

        self.log.info('Skipping downstream tasks...')

        session = settings.Session()

        for task in context['task'].downstream_list:
            ti = TaskInstance(task, execution_date=context['ti'].execution_date)
            self.log.info('Skipping task: %s', ti.task_id)
            ti.state = State.SKIPPED
            ti.start_date = datetime.datetime.now()
            ti.end_date = datetime.datetime.now()
            session.merge(ti)

        session.commit()
        session.close()

        self.log.info("Done.")


classPlugin_SentinelOperator(AirflowPlugin):
    name = "sentinel_operator"
    operators = [SentinelOperator]

With the modifications, this then produces the intended dag results:

Correct Dag

Solution 2:

This appears to be a bug in Airflow. If you'd like it fixed, add your voice to https://issues.apache.org/jira/browse/AIRFLOW-4453.

Post a Comment for "Airflow "none_failed" Skipping When Upstream Skips"