Skip to content

on_failure_callback not being triggered on -9 error #40550

@izu-x

Description

@izu-x

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.1

What happened?

I have a DAG which is getting a -9 error code

[2024-07-01, 16:06:05 CEST] {{xxxxClient.py:198}} INFO - Execute URL: https://......
[2024-07-01, 16:07:34 CEST] {{local_task_job_runner.py:234}} INFO - Task exited with return code -9
[2024-07-01, 16:07:34 CEST] {{taskinstance.py:3280}} INFO - 0 downstream tasks scheduled from follow-on schedule check

On the DAG I've defined a on_failure_callback function, on the default_args, but it is not being triggered. It is only triggered if I call a kind of "manually throw exception" function. So the SMTP settings are fine and work, just the on_failure_callback is somehow not being triggered when the -9 error happens.

Here a part as example from my DAG

import ast
import logging
import os
import traceback
from datetime import datetime, timedelta

from airflow import DAG
from airflow.models import Variable
from airflow.operators.email import EmailOperator
from airflow.operators.python_operator import PythonOperator


# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 3,
    "retry_delay": timedelta(seconds=5),
    "mwaa_secret_name": "xxxxxxxxxxxxxxxxxx",
    "redshift_secret_name": "xxxxxxxxxxxxx",
    "s3_bucket": "xxxxxxxxxxxxxxx",
    "region_name": "xxxxxxxxxxxxxxxxxxxxx",
    "on_failure_callback": "failure_callback",
}

error_email_recipients = ["xxxxxxxxxx@xxxxxxxxxxx.com"]

credentials = ast.literal_eval(
    str(Variable.get("ses-smtp-ca", default_var="undefined"))
)

os.environ["AIRFLOW__SMTP__SMTP_USER"] = credentials["user"]
os.environ["AIRFLOW__SMTP__SMTP_PASSWORD"] = credentials["password"]
os.environ["AIRFLOW__EMAIL__EMAIL_BACKEND"] = credentials["email_backend"]
os.environ["AIRFLOW__SMTP__SMTP_HOST"] = credentials["smtp_host"]
os.environ["AIRFLOW__SMTP__SMTP_PORT"] = credentials["smtp_port"]
os.environ["AIRFLOW__SMTP__SMTP_MAIL_FROM"] = credentials["mail_from"]
os.environ["AIRFLOW__SMTP__SMTP_SSL"] = credentials["smtp_ssl"]
os.environ["AIRFLOW__SMTP__SMTP_STARTTLS"] = credentials["smtp_starttls"]


# Failure callback function
def failure_callback(context):
    print("#### failure_callback is being called")
    exception = context.get("exception")
    print(f"Exception: {exception}")
    exception_str = "".join(
        traceback.format_exception(None, exception, exception.__traceback__)
    )
    print(f"Exception String: {exception_str}")

    email_operator = EmailOperator(
        task_id="send_email_on_failure",
        to=error_email_recipients,
        subject=f"DAG {context['task_instance'].dag_id} - Task {context['task_instance'].task_id} Failed",
        html_content=f"""
        <h3>DAG: {context['task_instance'].dag_id}</h3>
        <p>Task: {context['task_instance'].task_id} failed.</p>
        <p>Execution Time: {context['execution_date']}</p>
        <p>Exception: {exception_str}</p>
        <p>Log URL: <a href='{context['task_instance'].log_url}'>Click here to view log</a></p>
        """,
    )
    try:
        print("#### Sending email about the exception")
        email_operator.execute(context=context)
    except Exception as e:
        print(f"Failed to send email: {str(e)}")


with DAG(
    "TestDAG_NOEMAIL",
    default_args=default_args,
    schedule_interval="0 14 * * *",
    start_date=datetime(2024, 5, 3),
    catchup=False,
) as dag:

    update_xxx1_table_task = PythonOperator(
        task_id="update_xxx1_table",
        python_callable=xxxx.update_xxx1_table,
        on_failure_callback=failure_callback,
    )

    update_xxx2_table_task = PythonOperator(
        task_id="update_hana_SMC_Interactions_table",
        python_callable=xxxx.update_xxx1_table,
        on_failure_callback=failure_callback,
    )


(update_xxx1_table_task >> update_xxx2_table_task)

The task update_xxx1_table_task is getting the -9 from above, and no email is being sent.

If I define something like

def throw_error(*args, **kwargs):
    raise ValueError("Simulated task failure for testing.")

...
with DAG(
    "Test-Email",
    default_args=default_args,
    description="Test-Email",
    schedule_interval="0 14 * * *",
    start_date=datetime(2024, 5, 3),
    catchup=False,
) as dag:

    throw_error_task = PythonOperator(
        task_id="throw_error_task",
        python_callable=throw_error,
    )

This works fine sending the email.

What you think should happen instead?

No response

How to reproduce

Define a DAG as my example from above, put a task which casues a -9 (memory issue) error, and see how the error Email is not being sent.

Operating System

AWS

Versions of Apache Airflow Providers

No response

Deployment

Amazon (AWS) MWAA

Deployment details

No response

Anything else?

Let me know if I can check some concrete logs to provide more information. I've been checking all Airflow Logs I know and I could not find any error or something related to the Email sending or related to the on_failure_callback.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:corekind:bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yet

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions