Integrating Slack Alerts in Airflow
You just triggered your Airflow DAG that sends data to your clients and you being confident that the DAG will succeed (Why will it not — you wrote it.. There is no way it can fail), you go to have coffee with your colleagues in Company’s kitchen where the awesome Coffee Machine is waiting for you to serve the most delicious coffee ☕. You discuss how you can make the company better (Of course you don’t talk about how awesome the new Avengers trailer is !!). And then you finally decide to go back to your seat being a smirk to see the green status on your DAG. But wait…… what just happened, your DAG failed — of course, it was not your fault, the letters “DAG” decided to change their order to “DGA” when you were having coffee, hence your DAG failed. At the same time, your boss comes to you and asks you — “How’s your work going? Was the data sent to all clients?”⚡⚡⚡. And you wished that there was some way you could have received an alert on your mobile when you were having coffee.
Well, I can go on and on with this stupid story but the fact is you need alerting when your DAG fails so that you can take actions at the earliest. Airflow has a built-in capability to send alerts on emails but well it gets lost in the pile of other 1000 unread emails. And it is just easier to get alerts where your entire team has an eye on — SLACK.
There are 2 ways in which you can integrate Slack with Airflow.
(1) Using Slack Legacy Tokens:
Legacy tokens are an old method of generating tokens for testing and development and Slack themselves don’t recommend to use this but it is the simplest method — hence you can still use it but bear in mind that it can get deprecated anytime.
Follow this steps:
from airflow.operators.slack_operator import SlackAPIPostOperator SlackAPIPostOperator( task_id='failure', token='YOUR_TOKEN', text='Hello World !', channel='SLACK_CHANNEL', # Replace with your Slack username username='airflow' )
You can try this example in iPython or using Jupyter notebook as follows:
However, this is just an example to send a message on slack and not alerts on task failures. Each task in Airflow contains a parameter called on_failure_callback (of callable type) to which you pass a function to be called when a task fails.
Example:
def slack_failed_task(context): failed_alert = SlackAPIPostOperator( task_id='slack_failed', channel="#datalabs", token="...", text = ':red_circle: Task Failed', username = 'airflow',) return failed_alert.execute(context=context) task_with_failed_slack_alerts = BashOperator( task_id='fail_task', bash_command='exit 1', on_failure_callback=slack_failed_task, provide_context=True, dag=dag)
Now when you run the dag with the above task, it would send you an alert as shown in the image below:
This is useful but there are still 2 issues with the above code:
The first issue can be resolved by storing the Slack token in Airflow Connections in the password field as follows:
I also recommend running pip install apache-airflow[crypto] which encrypts connection passwords in metadata db.
Now let’s update our function to use token and channel name from connections and also improve alert format:
from airflow.hooks.base_hook import BaseHook from airflow.operators.slack_operator import SlackAPIPostOperator SLACK_CONN_ID = 'slack' def task_fail_slack_alert(context): """ Sends message to a slack channel. If you want to send it to a "user" -> use "@user", if "public channel" -> use "#channel", if "private channel" -> use "channel" """ slack_channel = BaseHook.get_connection(SLACK_CONN_ID).login slack_token = BaseHook.get_connection(SLACK_CONN_ID).password failed_alert = SlackAPIPostOperator( task_id='slack_failed', channel=slack_channel, token=slack_token, text=""" :red_circle: Task Failed. *Task*: {task} *Dag*: {dag} *Execution Time*: {exec_date} *Log Url*: {log_url} """.format( task=context.get('task_instance').task_id, dag=context.get('task_instance').dag_id, ti=context.get('task_instance'), exec_date=context.get('execution_date'), log_url=context.get('task_instance').log_url, ) ) return failed_alert.execute(context=context)
Sample alert with this function would be as follows:
As you can see it also gives you a Log URL so that you can directly go to the log associated with the failed task.
(2) Using Slack Web Hooks:
Slack recommends Web Hook to send data to it.
Follow the steps below:
You will see something similar to below image:
So go ahead and pick a channel that the app will post to, and then click to Authorize your app. You’ll be sent back to your app settings, and you should now see a new entry under the Webhook URLs for Your Workspace section, with a Webhook URL that’ll look something like this:
From here,
from airflow.hooks.base_hook import BaseHook from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator SLACK_CONN_ID = 'slack' def task_fail_slack_alert(context): slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password slack_msg = """ :red_circle: Task Failed. *Task*: {task} *Dag*: {dag} *Execution Time*: {exec_date} *Log Url*: {log_url} """.format( task=context.get('task_instance').task_id, dag=context.get('task_instance').dag_id, ti=context.get('task_instance'), exec_date=context.get('execution_date'), log_url=context.get('task_instance').log_url, ) failed_alert = SlackWebhookOperator( task_id='slack_test', http_conn_id='slack', webhook_token=slack_webhook_token, message=slack_msg, username='airflow', dag=dag) return failed_alert.execute(context=context)
Bonus Tip
You can add on_failure_callback to default_args when defining DAG as below so that you get alert if any task in the DAG fails:
default_args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(2), 'retries': 0, 'on_failure_callback': task_fail_slack_alert } dag = DAG( dag_id=DAG_NAME, default_args=default_args, schedule_interval=schedule_interval, )
You can follow the same steps to interate Slack with Google Cloud Composer.