Templating does not work in Airflow's on_failure_callback

Related searches

Issue: Templates in operators used in on_failure_callback are not rendered.

def report_failure(context):
    send_email = EmailOperator(
        task_id="email_failed",
        to=emailreceipients,
        subject="{{execution_date}}",
        html_content=get_email_body() # Which returns "Body: {{execution_date}}"
    )

    # One solution I tried fails on: AttributeError: 'EmailOperator' object has no attribute 'render_template_fields'
    # Which is weird as this method is present on EmailOperator's base: BaseOperator
    # send_email.render_template_fields(context)

    send_email.execute(context)

default_args = {
    'on_failure_callback': report_failure
}

The email getting sent includes literally {{execution_date}} instead of rendered value.

In this simple case I could format these templated strings in place using .format(**context), but I have this email sending reused in other places where context is not available and templating works fine.

The bigger goal is to have this "error handler" fired anytime any steps of the DAG (or its subdags) fail.

After multiple tries with different approaches I found following solutions working:

Solution A: Don't use on_failure_callback, but create tasks with trigger_rule='all_failed'/'one_failed;

Is what Jacob proposed on Airflow's Slack and it seems to work fine. It's conceptually the easiest IMHO.

status_failed = SimpleHttpOperator(
    trigger_rule='all_failed', # See https://airflow.apache.org/docs/stable/concepts.html
    task_id='updateStatus',
    ...
)
email_failed = EmailOperator(
    trigger_rule='all_failed',
)

start_task >> do_the_thing >> status_success >> email_success
# Handling errors in case any job fails
email_success >> email_failed
email_success >> status_failed

Exampl: Successful run that sets whole DAG's status as "success" and error handlers as skipped.

Solution B: Use on_failure_callback and render templates manually

As Ash said above The reason templates don't work there is that templates are not rendered by execute() but before hand. I found the following solution working:

def report_failure(context):
    send_email = EmailOperator(
        task_id="email_failed",
        start_date=datetime(2015, 12, 1), # Any date in the past, if you won't set it you will get an error
        to=emailreceipients,
        subject="{{execution_date}}",
        html_content=get_email_body() # Which returns "Body: {{execution_date}}"
    )

    # Set DAG, otherwise we will get errors
    send_email.dag = context['dag']

    # Manually render templates
    # send_email.render_template_fields(context) # Working in Airflow 1.10.6
    # send_email.html_content = send_email.render_template('', send_email.html_content, context) # Working in Airflow 1.10.4
    # Looking at codebase seems to be working in both versions
    send_email.html_content = send_email.get_template_env().from_string(send_email.html_content).render(**context)

    send_email.execute(context)

The issue with it is that it seems that those functions change very often so it might be hard to update it.

Solution C: format message using context variable

As Jacob suggested above

It does not solve my particular case when the code generating the template is shared with operators not using context. Nevertheless it might be helpful for simpler cases, so I'm posting it here.

Templating does not work in Airflow's on_failure_callback, Since templated information is rendered at run-time, it can be helpful to see what the final inputs are for templated tasks. From the Github to Redshift workflow we� Templating and Macros in Airflow Templating and Macros in Airflow Macros are used to pass dynamic information into task instances at runtime. Since all top-level code in DAG files is interpreted every scheduler "heartbeat," macros and templating allow run-time tasks to be offloaded to the executor instead of the scheduler.

The way you're doing this might work, but you definitely need double curly braces for jinja templating: {execution_date} -> {{ execution_date }}

You should also be able to use the context argument to get the execution date:

def report_failure(context):
    send_email = EmailOperator(
        task_id="email_failed",
        to=emailreceipients,
        subject=context['execution_date'],
        html_content="Body: {execution_date}".format(execution_date=context['execution_date'])
    )

You also might find this useful: https://airflow.apache.org/docs/stable/macros.html

Templating and Macros in Airflow, Do not worry if this looks complicated, a line by line explanation follows below. The actual tasks defined here will run in a different context from the context of This tutorial barely scratches the surface of what you can do with templating in� Congratulation if you have reached this point! I hope you have a learned new exciting concepts about Airflow. Templates and macros in Apache Airflow are really powerful to make your tasks dynamic and idempotent when you need time as input. You can’t hard code a date as the task won’t work anymore if you want to run it in the past or in the

The reason templates don't work there is that templates are not rendered by execute() but before hand.

What you need to do to make this work is this

def report_failure(context):
    send_email = EmailOperator(
        task_id="email_failed",
        to=emailreceipients,
        subject="{{execution_date}}",
        html_content=get_email_body() # Which returns "Body: {{execution_date}}"
    )

    send_email.dag = context['dag']
    send_email.start_date = send_email.dag.start_date
    send_email.render_template_fields(context, jinja_env=context['dag'].get_template_env())
    send_email.execute(context)      

Tutorial — Airflow Documentation, BashOperator. Templating; Troubleshooting. Jinja template not found. PythonOperator. Passing in arguments; Templating. Google Cloud Platform Operators. You can configure the email that is being sent in your airflow.cfg by setting a subject_template and/or a html_content_template in the email section. [ email ] email_backend = airflow . utils . email . send_email_smtp subject_template = / path / to / my_subject_template_file html_content_template = / path / to / my_html_content_template_file

Need help on rendering the jinja template email ID in the On_failure_callback ..

It works fine with Variable.get('email_edw_alert'), but i don't want to use Variable method to avoid hitting DB

Below is the Dag file

import datetime
import os
from functools import partial
from datetime import timedelta
from airflow.models import DAG,Variable
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator
from alerts.email_operator import dag_failure_email


def get_db_dag(
    *,
    dag_id,
    start_date,
    schedule_interval,
    max_taskrun,
    max_dagrun,
    proc_nm,
    load_sql
):

    default_args = {
        'owner': 'airflow',
        'start_date': start_date,
        'provide_context': True,
        'execution_timeout': timedelta(minutes=max_taskrun),
        'retries': 0,
        'retry_delay': timedelta(minutes=3),
        'retry_exponential_backoff': True,
        'email_on_retry': False,
    }


    dag = DAG(
        dag_id=dag_id,
        schedule_interval=schedule_interval,
        dagrun_timeout=timedelta(hours=max_dagrun),
        template_searchpath=tmpl_search_path,
        default_args=default_args,
        max_active_runs=1,
        catchup='{{var.value.dag_catchup}}',
        on_failure_callback=partial(dag_failure_email, config={'email_address': '{{var.value.email_edw_alert}}'}),
    )


    load_table = SnowflakeOperator(
        task_id='load_table',
        sql=load_sql,
        snowflake_conn_id=CONN_ID,
        autocommit=True,
        dag=dag,
    )

    load_table

    return dag

# ======== DAG DEFINITIONS #

edw_table_A = get_db_dag(
    dag_id='edw_table_A',
    start_date=datetime.datetime(2020, 5, 21),
    schedule_interval='0 5 * * *',
    max_taskrun=3,  # Minutes
    max_dagrun=1,  # Hours
    load_sql='recon/extract.sql',
)

Below is the python code

import logging
from airflow.utils.email import send_email
from airflow.models import Variable

logger = logging.getLogger(__name__)

TIME_FORMAT = "%Y-%m-%d %H:%M:%S"

def dag_failure_email(context, config=None):

    config = {} if config is None else config
    task_id = context.get('task_instance').task_id
    dag_id = context.get("dag").dag_id
    execution_time = context.get("execution_date").strftime(TIME_FORMAT)
    reason = context.get("reason")

    alerting_email_address = config.get('email_address')

    dag_failure_html_body = f"""<html>
    <header><title>The following DAG has failed!</title></header>
    <body>
    <b>DAG Name</b>: {dag_id}<br/>
    <b>Task Id</b>: {task_id}<br/>
    <b>Execution Time (UTC)</b>: {execution_time}<br/>
    <b>Reason for Failure</b>: {reason}<br/>
    </body>
    </html>
    """

    try:
        if reason != 'dagrun_timeout':
            send_email(
                to=alerting_email_address,
                subject=f"Airflow alert: <DagInstance: {dag_id} - {execution_time} [failed]",
                html_content=dag_failure_html_body,
            )
    except Exception as e:
        logger.error(
            f'Error in sending email to address {alerting_email_address}: {e}',
            exc_info=True,
        )

I have also tried another way too, even below one is not working

 try:
        if reason != 'dagrun_timeout':
            send_email = EmailOperator(
               to=alerting_email_address,
               task_id='email_task',
               subject=f"Airflow alert: <DagInstance: {dag_id} - {execution_time} [failed]",
               params={'content1': 'random'},
               html_content=dag_failure_html_body,
           )
            send_email.dag = context['dag']
            #send_email.to = send_email.get_template_env().from_string(send_email.to).render(**context)
            send_email.to = send_email.render_template(alerting_email_address, send_email.to, context)
            send_email.execute(context)
    except Exception as e:
        logger.error(
            f'Error in sending email to address {alerting_email_address}: {e}',
            exc_info=True,
        )

Using Operators — Airflow Documentation, Rendering variables at runtime with templating; Variable templating with the Whether or not Airflow is running in test mode (configuration property). False. This tutorial barely scratches the surface of what you can do with templating in Airflow, but the goal of this section is to let you know this feature exists, get you familiar with double curly brackets, and point to the most common template variable: {{ds}} (today’s “date stamp”).

4 Templating Tasks Using the Airflow Context, airflow initdb will create all default connections, charts etc that we might not use and don’t want in our production database. airflow upgradedb will instead just apply any missing migrations

Working with Variables. Variables can be listed, created, updated and deleted from the UI (Admin -> Variables). In addition, json settings files can be bulk uploaded through the UI. Please look at an example here for a variable json setting file; Restrict the number of Airflow variables in your DAG

Have staff work in “clean” ventilation zones that do not include higher-risk areas such as visitor reception or exercise facilities (if open). Consider using portable high-efficiency particulate air (HEPA) fan/filtration systems to help enhance air cleaning pdf icon external icon (especially in higher-risk areas).

Comments
  • send_email.render_template_fields(context) did not work because I browsed version 1.10.6, whereas in Docker there was 1.10.4.
  • My mistake while giving this example. I'm using double braces and it doesn't work. I can't / don't want to use context argument directly, because email body comes from utility function as I mentioned. I updated the example to be more clear.
  • I'm not totally certain on_failure_callback is meant for triggering more operators, which is possibly why templating doesn't work. I suggest working around this problem by using: format and context, or add this EmailOperator to your dag, with trigger_rule one_failed
  • I received error Operator <Task(EmailOperator): email_failed> has not been assigned to a DAG yet. Added send_email.dag = context['dag'] then got DAG does not have start_date set (or similar). I'm trying different methods..
  • I needed also to add start_date=datetime(2015, 12, 1) to EmailOperator.