Airflow "none_failed" Skipping When Upstream Skips
Solution 1:
Documenting this because this issue has bitten me twice and now I've solved it twice.
Problem Analysis
When you turn the log level to DEBUG, you start to see what's going on:
[2019-10-09 18:30:05,472] {python_operator.py:114} INFO - Done. Returned value was: False
[2019-10-0918:30:05,472] {python_operator.py:159} INFO - Condition result is False
[2019-10-09 18:30:05,472] {python_operator.py:165} INFO - Skipping downstream tasks...
[2019-10-09 18:30:05,472] {python_operator.py:168} DEBUG - Downstream task_ids [<Task(DummyOperator): f>, <Task(DummyOperator): g>, <Task(DummyOperator): d>, <Task(DummyOperator): resolve>, <Task(DummyOperator): e>][2019-10-09 18:30:05,492] {python_operator.py:173} INFO - Done.
From this, you can see that the problem isn't that "none_failed" is handling the tasks incorrectly, but rather that the sentinel simulating the skip condition is marking all downstream dependencies skipped directly. This is a behavior of the ShortCircuitOperator - skipping all of the downstreams, including tasks downstream of downstream tasks.
Solution
The solution to this problem lies in recognizing that it's the behavior of the ShortCircuitOperator, not the TriggerRule, which is causing the problem. Once we realize that, it's time to set about writing an operator better suited to the task we're actually trying to accomplish.
I've included the operator I'm currently using; I'd welcome any input on a better way to handle the modification of the single downstream tasks. I'm sure there's a better idiom for "skip just the next one and let the rest cascade according to their trigger rules", but I've already spent more time than I wanted on this and I suspect the answer lies even deeper in the internals.
"""Sentinel Operator Plugin"""import datetime
from airflow import settings
from airflow.models import SkipMixin, TaskInstance
from airflow.operators.python_operator import PythonOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.state import State
classSentinelOperator(PythonOperator, SkipMixin):
"""
Allows a workflow to continue only if a condition is met. Otherwise, the
workflow skips cascading downstream to the next time a viable task
is identified.
The SentinelOperator is derived from the PythonOperator. It evaluates a
condition and stops the workflow if the condition is False. Immediate
downstream tasks are skipped. If the condition is True, downstream tasks
proceed as normal.
The condition is determined by the result of `python_callable`.
"""defexecute(self, context):
condition = super(SentinelOperator, self).execute(context)
self.log.info("Condition result is %s", condition)
if condition:
self.log.info('Proceeding with downstream tasks...')
return
self.log.info('Skipping downstream tasks...')
session = settings.Session()
for task in context['task'].downstream_list:
ti = TaskInstance(task, execution_date=context['ti'].execution_date)
self.log.info('Skipping task: %s', ti.task_id)
ti.state = State.SKIPPED
ti.start_date = datetime.datetime.now()
ti.end_date = datetime.datetime.now()
session.merge(ti)
session.commit()
session.close()
self.log.info("Done.")
classPlugin_SentinelOperator(AirflowPlugin):
name = "sentinel_operator"
operators = [SentinelOperator]
With the modifications, this then produces the intended dag results:
Solution 2:
This appears to be a bug in Airflow. If you'd like it fixed, add your voice to https://issues.apache.org/jira/browse/AIRFLOW-4453.
Post a Comment for "Airflow "none_failed" Skipping When Upstream Skips"