Mendhak / Code

MS Teams Operator for Apache Airflow

This Apache Airflow operator can send messages to specific MS Teams Channels. It can be especially useful if you use MS Teams for your chatops.

Airflow operator that can send messages to MS Teams

Common usages for this would be:

Usage

From your DAG, call the operator.

op1 = MSTeamsWebhookOperator(task_id='msteamtest',
    http_conn_id='msteams_webhook_url',
    message = "Hello from Airflow!",
    subtitle = "This is the **subtitle**",
    theme_color = "00FF00",
    button_text = "My button",
    button_url = "https://example.com",
    #proxy = "https://yourproxy.domain:3128/",
    dag=dag)

http_conn_id : Hook pointing at MS Teams Webhook
message : (Templated) the card’s headline.
subtitle : (Templated) the card’s subtitle
button_text : Text for action button at the bottom of the card
button_url : What URL the button sends the user to
theme_color : Color for the card’s top line, without the #

This sends a card to your channel:

MS Teams

There is a bit of prep work required in Teams as well as Airflow to enable this functionality.

Prepare MS Teams

Pick the channel you want messages sent to, click the > Connectors and search for Incoming Webhook.

Click Configure, give it a name and you will be given a webhook URL.

Webhook

Webhooks don’t usually have additional authentication; you should treat this URL as sensitive and keep it in a safe place.

Prepare Airflow

In Airflow, create a new Connection under Admin > Connections

http

Note the Host field starts directly with outlook.office.com and the Schema is where you specify https.

Copy hook and operator

Copy the MS Teams operator and Hook into your own Airflow project.

Import it into your DAG

from ms_teams_webhook_operator import MSTeamsWebhookOperator

You can now use the operator as shown above.

Notifying MS Teams on DAG failures

You can also use the operator to notify MS Teams whenever a DAG fails. This will create a card with a ‘View Log’ button that developers can click on and go directly to the log of the failing DAG operator. Very convenient.

http

To do this, create a method that receives the failure context, which calls MSTeamsWebhookOperator. Set this method in the on_failure_callback of the DAG.

def on_failure(context):

    dag_id = context['dag_run'].dag_id

    task_id = context['task_instance'].task_id
    context['task_instance'].xcom_push(key=dag_id, value=True)

    logs_url = "https://myairflow/admin/airflow/log?dag_id={}&task_id={}&execution_date={}".format(
         dag_id, task_id, context['ts'])

    teams_notification = MSTeamsWebhookOperator(
        task_id="msteams_notify_failure", trigger_rule="all_done",
        message="`{}` has failed on task: `{}`".format(dag_id, task_id),
        button_text="View log", button_url=logs_url,
        theme_color="FF0000", http_conn_id='msteams_webhook_url')
    teams_notification.execute(context)


default_args = {
    'owner' : 'airflow',
    'description' : 'a test dag',
    'start_date' : datetime(2019,8,8),
    'on_failure_callback': on_failure
}

Of course substitute the logs_url with the address of your own Airflow. For convenience you can move the method out into a common Python module that every DAG imports from.

Proxies

Some corporate environments make use of outbound proxies. If you are behind an outbound proxy for Internet access, there are two ways that you can specify your server.

The easiest way is to put the details in the Extra field when creating the HTTP Connection.

{"proxy":"http://my-proxy:3128"}

You can also pass the proxy argument to the MSTeamsWebhookOperator operator.

proxy = "https://my-proxy:3128/",

How it works

MS Teams allows creating actionable message cards which enable you to send formatted JSON with various things like column layout, text blocks, action buttons, headlines, subtitles and so on. There are plenty of examples on the Message Card Playground.

The incoming webhook connector is already bundled with MS Teams, and is the simplest means of communicating with a channel.

The main work happens in the hook which inherits from Airflow’s own HttpHook; in turn this is simply a Python script which takes the arguments and builds up the MessageCard before performing an HTTP POST.