Big Data · Engineering · Python

Adding Slack Notifications to a Luigi Pipeline in Python

In a previous article, I’ve described how to build a data pipeline in Python using Luigi, a workflow manager written in Python and open sourced by Spotify. I also had the opportunity to give a short talk about Luigi at the local PyData London meetup (see slides).

One of the nice features of Luigi is the possibility of receiving e-mail notifications on error. While this is a useful feature, it’s tailored to errors only, so effectively you don’t know if the Luigi pipeline has completed its execution successfully, unless you manually check. As I wanted to extend the possibility of receiving a notification on Slack, also in case of success, I started looking around for the options.

I ended up developing my own solution: https://github.com/bonzanini/luigi-slack. This blog post is a brief overview on how to use this Python package with your Luigi pipeline.

Getting started with luigi-slack

From your organisation’s Slack page (e.g. yourname.slack.com) you can add a Bot integration. The setup is very quick, and you’ll receive a token that you’ll need to use to interact with the Slack API.

You can get the bleeding edge version of luigi-slackfrom the GitHub link above, but beware that this is a work in progress. A somewhat stable version is available from the cheese shop:

pip install luigi-slack

The key points of this package are:

  • Support for Python 3
  • Easy-to-use interface

Regarding the first point, the discussion on choosing Python 2 vs Python 3 is still never-ending and I’m not going there in this post. For a greenfield project, I prefer to use Python 3 rather than a version with a sunset date already decided. The support for Python 2 in luigi-slack is best-effort (and of course pull requests are always welcome).

In terms of easy-to-use interface, I borrowed the nice idea of using a context manager from luigi-monitor, because it makes it easy to integrate the library with an existing pipeline.

For example, given the basic code to run a Luigi pipeline which ends with the task YourTaskClass:

import luigi

if __name__ == '__main__':
    luigi.run(main_task_cls=YourTaskClass)

All we need in order to have Slack notifications is to refactor as follows:

import luigi
from luigi_slack import SlackBot, notify

if __name__ == '__main__':
    slacker = SlackBot(token='my-token',
                       channels=['mychannel', 'anotherchannel'])
    with notify(slacker):
        luigi.run(main_task_cls=YourTaskClass)

Configuration Options for luigi-slack

The SlackBot takes a number of arguments. Besides the token, which allows you to connect to your organisation’s Slack, all the other parameters are optional:

  • channels (default empty list) is the list of channel names that you want to push the notifications to. For the channel name, you don’t need the initial # symbol. You can also deliver the notifications to a single account, by using the @username syntax
  • events (default to [FAILURE]) is the list of event types, as defined in luigi_slack, that you want to track
  • max_events (default to 5) is the max number of events of a given type that you want to report. With more than max_events events of the same type, a “please check logs” message is reported instead
  • username (default to “Luigi-slack Bot”) is the screen name of your bot
  • task_representation (default to str) is the function used to represent the task in the notification (see explanation below)

In Luigi, representing a task as a string will print the task_id attribute of a luigi.Task, which include the class name as well as all the parameters. In other words, it looks like:

MyTask(param1=”some_value”, param2=”other_value”, your_secret_param=”your_secret_value”, …)

With a huge number of parameters that make the notification less readable, or with sensible parameters that you don’t want to send around in the Slack chat room, it makes sense to display the task a more conservative way. An example of custom string representation could be:

def custom_task_representation(task):
    return "{}(...)".format(task.__class__.__name__)

Once we pass the function as task_representation argument of the SlackBot, the task will appear in the notifications as:

MyTask(…)

Keep in mind that an instance of a Luigi task is identified by the class name AND the value of its parameters, which is why the task_id include them all. In other words, with a more compact representation like the one proposed in the above snippet, you won’t be able to distinguish between tasks with the same class name, but different param values. You’ll need to customise the function based on your needs.

Summary

I’m developing a Python package to include Slack notification support to a Luigi pipeline, with a simple interface, a few optional configuration parameters, and minimal requirements in terms of refactoring.

The code is available at https://github.com/bonzanini/luigi-slack, and you can install the Python package with:

pip install luigi_slack

As this is a work in progress, it’s not widely tested, and the interface could change. Comments and pull requests are welcome.

One thought on “Adding Slack Notifications to a Luigi Pipeline in Python

  1. Hi Marco,
    This was a very useful article, thank you!
    Do you have a simple explanation for setting up email notifications? I haven’t been able to set it up by following the documented instructions

    Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s