Adding Slack Notifications to a Luigi Pipeline in Python

In a previous article, I’ve described how to build a data pipeline in Python using Luigi, a workflow manager written in Python and open sourced by Spotify. I also had the opportunity to give a short talk about Luigi at the local PyData London meetup (see slides).

One of the nice features of Luigi is the possibility of receiving e-mail notifications on error. While this is a useful feature, it’s tailored to errors only, so effectively you don’t know if the Luigi pipeline has completed its execution successfully, unless you manually check. As I wanted to extend the possibility of receiving a notification on Slack, also in case of success, I started looking around for the options.

I ended up developing my own solution: https://github.com/bonzanini/luigi-slack. This blog post is a brief overview on how to use this Python package with your Luigi pipeline.

Getting started with luigi-slack

From your organisation’s Slack page (e.g. yourname.slack.com) you can add a Bot integration. The setup is very quick, and you’ll receive a token that you’ll need to use to interact with the Slack API.

You can get the bleeding edge version of luigi-slackfrom the GitHub link above, but beware that this is a work in progress. A somewhat stable version is available from the cheese shop:

pip install luigi-slack

The key points of this package are:

  • Support for Python 3
  • Easy-to-use interface

Regarding the first point, the discussion on choosing Python 2 vs Python 3 is still never-ending and I’m not going there in this post. For a greenfield project, I prefer to use Python 3 rather than a version with a sunset date already decided. The support for Python 2 in luigi-slack is best-effort (and of course pull requests are always welcome).

In terms of easy-to-use interface, I borrowed the nice idea of using a context manager from luigi-monitor, because it makes it easy to integrate the library with an existing pipeline.

For example, given the basic code to run a Luigi pipeline which ends with the task YourTaskClass:

import luigi

if __name__ == '__main__':
    luigi.run(main_task_cls=YourTaskClass)

All we need in order to have Slack notifications is to refactor as follows:

import luigi
from luigi_slack import SlackBot, notify

if __name__ == '__main__':
    slacker = SlackBot(token='my-token',
                       channels=['mychannel', 'anotherchannel'])
    with notify(slacker):
        luigi.run(main_task_cls=YourTaskClass)

Configuration Options for luigi-slack

The SlackBot takes a number of arguments. Besides the token, which allows you to connect to your organisation’s Slack, all the other parameters are optional:

  • channels (default empty list) is the list of channel names that you want to push the notifications to. For the channel name, you don’t need the initial # symbol. You can also deliver the notifications to a single account, by using the @username syntax
  • events (default to [FAILURE]) is the list of event types, as defined in luigi_slack, that you want to track
  • max_events (default to 5) is the max number of events of a given type that you want to report. With more than max_events events of the same type, a “please check logs” message is reported instead
  • username (default to “Luigi-slack Bot”) is the screen name of your bot
  • task_representation (default to str) is the function used to represent the task in the notification (see explanation below)

In Luigi, representing a task as a string will print the task_id attribute of a luigi.Task, which include the class name as well as all the parameters. In other words, it looks like:

MyTask(param1=”some_value”, param2=”other_value”, your_secret_param=”your_secret_value”, …)

With a huge number of parameters that make the notification less readable, or with sensible parameters that you don’t want to send around in the Slack chat room, it makes sense to display the task a more conservative way. An example of custom string representation could be:

def custom_task_representation(task):
    return "{}(...)".format(task.__class__.__name__)

Once we pass the function as task_representation argument of the SlackBot, the task will appear in the notifications as:

MyTask(…)

Keep in mind that an instance of a Luigi task is identified by the class name AND the value of its parameters, which is why the task_id include them all. In other words, with a more compact representation like the one proposed in the above snippet, you won’t be able to distinguish between tasks with the same class name, but different param values. You’ll need to customise the function based on your needs.

Summary

I’m developing a Python package to include Slack notification support to a Luigi pipeline, with a simple interface, a few optional configuration parameters, and minimal requirements in terms of refactoring.

The code is available at https://github.com/bonzanini/luigi-slack, and you can install the Python package with:

pip install luigi_slack

As this is a work in progress, it’s not widely tested, and the interface could change. Comments and pull requests are welcome.

Building Data Pipelines with Python and Luigi

As a data scientist, the emphasis of the day-to-day job is often more on the R&D side rather than engineering. In the process of going from prototypes to production though, some of the early quick-and-dirty decisions turn out to be sub-optimal and require a decent amount of effort to be re-engineered. This usually slows down innovation, and generally speaking your project as a whole.

This post will discuss some experience in building data pipelines, e.g. extraction, cleaning, integration, pre-processing of data, in general all the steps that are necessary to prepare your data for your data-driven product. In particular, the focus in on data plumbing, and how a workflow manager like Luigi can come to the rescue, without getting in your way. With a minimal effort, the transition from prototype to production can be smoother.

You can find the code for the examples as GitHub Gist.

Early Days of a Prototype

In the early days of a prototype, the data pipeline often looks like this:

$ python get_some_data.py
$ python clean_some_data.py
$ python join_other_data.py
$ python do_stuff_with_data.py

This is quite common when the data project is in its exploratory stage: you know that you’ll need some pre-processing, you think it’s going to be a quick hack, so you don’t bother with some engineering best practices, then the number of scripts grows and your data pipeline will come back and bite you.

This approach has the only advantage of being quick and hacky. On the downside, it’s tedious: every time you want to re-run the pipeline, you need to manually call the bunch of scripts in sequence. Moreover, if you’re sharing this prototype with a colleague, there is even more room for misinterpretation (“why can’t I do stuff with data?”… “did you clean it first?”, etc.)

The obvious hacky solution seems to be: let’s put everything in one script. After some quick refactoring, the do_everything.py script can look like this:

if __name__ == '__main__':
    get_some_data()
    clean_some_data()
    join_other_data()
    do_stuff_with_data()

This is fairly simple to run:

$ python do_everything.py

(Note: you could also put everything in a bash script, which calls the indiviaul bunch of scripts in sequence, but the shortcomings will be more or less the same)

Boilerplate Code

When moving towards a production-ready pipeline, there are a few more aspects to consider besides the run-everything code. In particular, error handling should be taken into account:

 
try:
    get_some_data()
except GetSomeDataError as e:
    # handle this

But if we chain all the individual tasks together, we end up with a Christmas tree of try/except:

try:
    get_some_data()
    try:
        clean_some_data()
        try:
            # you see where this is going...
        except EvenMoreErrors:
            # ...
    except CleanSomeDataError as e:
        # handle CleanSomeDataError
except GetSomeDataError as e:
    # handle GetSomeDataError

Another important aspect to consider is how to resume a pipeline. For example, if the first few tasks are completed, but then an error occurs half-way through, how do we re-run the pipeline without re-executing the initial successful steps?

# check if the task was already successful
if not i_got_the_data_already():
    # if not, run it
    try:
        get_some_date()
    except GetSomeDataError as e:
        # handle the error

Moving to Luigi

Luigi is a Python tool for workflow management. It has been developed at Spotify, to help building complex data pipelines of batch jobs. To install Luigi:

$ pip install luigi

Some of the useful features of Luigi include:

  • Dependency management
  • Checkpoints / Failure recovery
  • CLI integration / parameterisation
  • Dependency Graph visualisation

There are two core concepts to understand how we can apply Luigi to our own data pipeline: Tasks and Targets. A task is a unit of work, designed by extending the class luigi.Task and overriding some basic methods. The output of a task is a target, which can be a file on the local filesystem, a file on Amazon’s S3, some piece of data in a database etc.

Dependencies are defined in terms of inputs and outputs, i.e. if TaskB depends on TaskA, it means that the output of TaskA will be the input of TaskB.

Let’s look at a couple of template tasks:

# Filename: run_luigi.py
import luigi

class PrintNumbers(luigi.Task):

    def requires(self):
        return []

    def output(self):
        return luigi.LocalTarget("numbers_up_to_10.txt")

    def run(self):
        with self.output().open('w') as f:
            for i in range(1, 11):
                f.write("{}\n".format(i))

class SquaredNumbers(luigi.Task):

    def requires(self):
        return [PrintNumbers()]

    def output(self):
        return luigi.LocalTarget("squares.txt")

    def run(self):
        with self.input()[0].open() as fin, self.output().open('w') as fout:
            for line in fin:
                n = int(line.strip())
                out = n * n
                fout.write("{}:{}\n".format(n, out))
                
if __name__ == '__main__':
    luigi.run()

This code showcases two tasks: PrintNumbers, that writes the number from 1 to 10 into a file called numbers_up_to_10.txt, one number per line, and SquaredNumbers, that reads the such file and outputs a list of pairs number-square into squares.txt, also one pair per line.

To run the tasks:

$ python run_luigi.py SquaredNumbers --local-scheduler

Luigi will take care of checking the dependencies between tasks, see that the input of SquaredNumbers is not there, so it will run the PrintNumbers task first, then carry on with the execution.

The first argument we’re passing to Luigi is the name of the last task in the pipeline we want to run. The second argument simply tells Luigi to use a local scheduler (more on this later).

You could also use the luigi command:

$ luigi -m run_luigi.py SquaredNumbers --local-scheduler

Anatomy of a Task

To create a Luigi task we simply need to create a class whose parent is luigi.Task, and override some methods. In particular:

  • requires() should return the list of dependencies for the given task — in other words a list of tasks
  • output() should return the target for the task (e.g. a LocalTarget, a S3Target, etc.)
  • run() should contain the logic to execute

Luigi will check the return values of requires() and output() and build the dependency graph accordingly.

Passing Parameters

Hard-coding filenames and config values is generally speaking an anti-pattern. Once you’ve understood the structure and the dynamics of your task, you should look into parameterising all the configuration aspects so that you can dynamically call the same script with different arguments.

The class luigi.Parameter() is the place to look into. Each Luigi task can have a number of parameters. Let’s say for example that we want to modify the previous example to support a custom number. As the parameter we’re using with the range() function is an integer, we can use luigi.IntParameter rather than the default parameter class. This is how the modified tasks can look like:

class PrintNumbers(luigi.Task):
    n = luigi.IntParameter()

    def requires(self):
        return []

    def output(self):
        return luigi.LocalTarget("numbers_up_to_{}.txt".format(self.n))

    def run(self):
        with self.output().open('w') as f:
            for i in range(1, self.n+1):
                f.write("{}\n".format(i))

class SquaredNumbers(luigi.Task):
    n = luigi.IntParameter()

    def requires(self):
        return [PrintNumbers(n=self.n)]

    def output(self):
        return luigi.LocalTarget("squares_up_to_{}.txt".format(self.n))

    def run(self):
        with self.input()[0].open() as fin, self.output().open('w') as fout:
            for line in fin:
                n = int(line.strip())
                out = n * n
                fout.write("{}:{}\n".format(n, out))

To call the SquaredNumbers tasks up to, say, 20:

$ python run_luigi.py SquaredNumbers --local-scheduler --n 20

Parameters can also have default values, e.g.

n = luigi.IntParameter(default=10)

so in this way, if you don’t specify the --n argument, it will default to 10.

Sample code as GitHub Gist

Local vs Global Scheduler

So far, we’ve used the --local-scheduler option to run Luigi tasks with a local scheduler. This is useful for development, but in a production environment we should make use of the centralised scheduler (see the docs on the scheduler).

This has a few advantages:

  • avoid running two instances of the same task simultaneously
  • nice web-based visualisation

You can run the Luigi scheduler daemon in the foreground with:

$ luigid

or in the background with:

$ luigid --background

It will default to port 8082, so you can point your browser to http://localhost:8082 to access the visualisation.

With the global Luigi scheduler running, we can re-run the code without the option for the local scheduler:

$ python run_luigi.py SquaredNumbers --n [BIG_NUMBER]

As the sample code will run in milliseconds, if you want to have a chance to switch to the browser and see the dependency graph while the tasks are still running, you should probably use a big number like 10,000,000 or more for the --n option.

This is a cropped screenshot of the dependency graph:

dependency-graph-screenshot

Summary

We have described the definition of data pipelines using Luigi, a workflow manager written in Python. Luigi provides a nice abstraction to define your data pipeline in terms of tasks and targets, and it will take care of the dependencies for you.

In terms of code re-use, and with the mindset of going from prototype to production, I’ve found very helpful to define the business logic of the tasks in separate Python packages (i.e. with a setup.py file). In this way, from your Luigi script you can simply import your_package and call it from there.

A task can produce multiple files as output, but if that’s your case, you should probably verify if the task can be broken down into smaller units (i.e. multiple tasks). Do all these outputs logically belong together? Do you have dependencies between them? If you can’t break the task down, I’ve found it simpler/useful just to define the output() as a log file with the names and the timestamps of all the individual files created by the task itself. The log file name can be formatted as TaskName_timestamp_param1value_param2value_etc.

Using a workflow manager like Luigi is in general helpful because it handles dependencies, it reduces the amount of boilerplate code that is necessary for parameters and error checking, it manages failure recovery and overall it forces us to follow a clear pattern when developing the data pipeline.

It’s also important to consider its limitations:

  • It was built for batch jobs, it’s probably not useful for near real-time processing
  • It doesn’t trigger the execution for you, you still need to run the data pipeline (e.g. via a cronjob)

@MarcoBonzanini