Big Data · Data Mining · Engineering · Python

Building Data Pipelines with Python and Luigi

As a data scientist, the emphasis of the day-to-day job is often more on the R&D side rather than engineering. In the process of going from prototypes to production though, some of the early quick-and-dirty decisions turn out to be sub-optimal and require a decent amount of effort to be re-engineered. This usually slows down innovation, and generally speaking your project as a whole.

This post will discuss some experience in building data pipelines, e.g. extraction, cleaning, integration, pre-processing of data, in general all the steps that are necessary to prepare your data for your data-driven product. In particular, the focus in on data plumbing, and how a workflow manager like Luigi can come to the rescue, without getting in your way. With a minimal effort, the transition from prototype to production can be smoother.

You can find the code for the examples as GitHub Gist.

Early Days of a Prototype

In the early days of a prototype, the data pipeline often looks like this:

$ python get_some_data.py
$ python clean_some_data.py
$ python join_other_data.py
$ python do_stuff_with_data.py

This is quite common when the data project is in its exploratory stage: you know that you’ll need some pre-processing, you think it’s going to be a quick hack, so you don’t bother with some engineering best practices, then the number of scripts grows and your data pipeline will come back and bite you.

This approach has the only advantage of being quick and hacky. On the downside, it’s tedious: every time you want to re-run the pipeline, you need to manually call the bunch of scripts in sequence. Moreover, if you’re sharing this prototype with a colleague, there is even more room for misinterpretation (“why can’t I do stuff with data?”… “did you clean it first?”, etc.)

The obvious hacky solution seems to be: let’s put everything in one script. After some quick refactoring, the do_everything.py script can look like this:

if __name__ == '__main__':
    get_some_data()
    clean_some_data()
    join_other_data()
    do_stuff_with_data()

This is fairly simple to run:

$ python do_everything.py

(Note: you could also put everything in a bash script, which calls the indiviaul bunch of scripts in sequence, but the shortcomings will be more or less the same)

Boilerplate Code

When moving towards a production-ready pipeline, there are a few more aspects to consider besides the run-everything code. In particular, error handling should be taken into account:

 
try:
    get_some_data()
except GetSomeDataError as e:
    # handle this

But if we chain all the individual tasks together, we end up with a Christmas tree of try/except:

try:
    get_some_data()
    try:
        clean_some_data()
        try:
            # you see where this is going...
        except EvenMoreErrors:
            # ...
    except CleanSomeDataError as e:
        # handle CleanSomeDataError
except GetSomeDataError as e:
    # handle GetSomeDataError

Another important aspect to consider is how to resume a pipeline. For example, if the first few tasks are completed, but then an error occurs half-way through, how do we re-run the pipeline without re-executing the initial successful steps?

# check if the task was already successful
if not i_got_the_data_already():
    # if not, run it
    try:
        get_some_date()
    except GetSomeDataError as e:
        # handle the error

Moving to Luigi

Luigi is a Python tool for workflow management. It has been developed at Spotify, to help building complex data pipelines of batch jobs. To install Luigi:

$ pip install luigi

Some of the useful features of Luigi include:

  • Dependency management
  • Checkpoints / Failure recovery
  • CLI integration / parameterisation
  • Dependency Graph visualisation

There are two core concepts to understand how we can apply Luigi to our own data pipeline: Tasks and Targets. A task is a unit of work, designed by extending the class luigi.Task and overriding some basic methods. The output of a task is a target, which can be a file on the local filesystem, a file on Amazon’s S3, some piece of data in a database etc.

Dependencies are defined in terms of inputs and outputs, i.e. if TaskB depends on TaskA, it means that the output of TaskA will be the input of TaskB.

Let’s look at a couple of template tasks:

# Filename: run_luigi.py
import luigi

class PrintNumbers(luigi.Task):

    def requires(self):
        return []

    def output(self):
        return luigi.LocalTarget("numbers_up_to_10.txt")

    def run(self):
        with self.output().open('w') as f:
            for i in range(1, 11):
                f.write("{}\n".format(i))

class SquaredNumbers(luigi.Task):

    def requires(self):
        return [PrintNumbers()]

    def output(self):
        return luigi.LocalTarget("squares.txt")

    def run(self):
        with self.input()[0].open() as fin, self.output().open('w') as fout:
            for line in fin:
                n = int(line.strip())
                out = n * n
                fout.write("{}:{}\n".format(n, out))
                
if __name__ == '__main__':
    luigi.run()

This code showcases two tasks: PrintNumbers, that writes the number from 1 to 10 into a file called numbers_up_to_10.txt, one number per line, and SquaredNumbers, that reads the such file and outputs a list of pairs number-square into squares.txt, also one pair per line.

To run the tasks:

$ python run_luigi.py SquaredNumbers --local-scheduler

Luigi will take care of checking the dependencies between tasks, see that the input of SquaredNumbers is not there, so it will run the PrintNumbers task first, then carry on with the execution.

The first argument we’re passing to Luigi is the name of the last task in the pipeline we want to run. The second argument simply tells Luigi to use a local scheduler (more on this later).

You could also use the luigi command:

$ luigi -m run_luigi.py SquaredNumbers --local-scheduler

Anatomy of a Task

To create a Luigi task we simply need to create a class whose parent is luigi.Task, and override some methods. In particular:

  • requires() should return the list of dependencies for the given task — in other words a list of tasks
  • output() should return the target for the task (e.g. a LocalTarget, a S3Target, etc.)
  • run() should contain the logic to execute

Luigi will check the return values of requires() and output() and build the dependency graph accordingly.

Passing Parameters

Hard-coding filenames and config values is generally speaking an anti-pattern. Once you’ve understood the structure and the dynamics of your task, you should look into parameterising all the configuration aspects so that you can dynamically call the same script with different arguments.

The class luigi.Parameter() is the place to look into. Each Luigi task can have a number of parameters. Let’s say for example that we want to modify the previous example to support a custom number. As the parameter we’re using with the range() function is an integer, we can use luigi.IntParameter rather than the default parameter class. This is how the modified tasks can look like:

class PrintNumbers(luigi.Task):
    n = luigi.IntParameter()

    def requires(self):
        return []

    def output(self):
        return luigi.LocalTarget("numbers_up_to_{}.txt".format(self.n))

    def run(self):
        with self.output().open('w') as f:
            for i in range(1, self.n+1):
                f.write("{}\n".format(i))

class SquaredNumbers(luigi.Task):
    n = luigi.IntParameter()

    def requires(self):
        return [PrintNumbers(n=self.n)]

    def output(self):
        return luigi.LocalTarget("squares_up_to_{}.txt".format(self.n))

    def run(self):
        with self.input()[0].open() as fin, self.output().open('w') as fout:
            for line in fin:
                n = int(line.strip())
                out = n * n
                fout.write("{}:{}\n".format(n, out))

To call the SquaredNumbers tasks up to, say, 20:

$ python run_luigi.py SquaredNumbers --local-scheduler --n 20

Parameters can also have default values, e.g.

n = luigi.IntParameter(default=10)

so in this way, if you don’t specify the --n argument, it will default to 10.

Sample code as GitHub Gist

Local vs Global Scheduler

So far, we’ve used the --local-scheduler option to run Luigi tasks with a local scheduler. This is useful for development, but in a production environment we should make use of the centralised scheduler (see the docs on the scheduler).

This has a few advantages:

  • avoid running two instances of the same task simultaneously
  • nice web-based visualisation

You can run the Luigi scheduler daemon in the foreground with:

$ luigid

or in the background with:

$ luigid --background

It will default to port 8082, so you can point your browser to http://localhost:8082 to access the visualisation.

With the global Luigi scheduler running, we can re-run the code without the option for the local scheduler:

$ python run_luigi.py SquaredNumbers --n [BIG_NUMBER]

As the sample code will run in milliseconds, if you want to have a chance to switch to the browser and see the dependency graph while the tasks are still running, you should probably use a big number like 10,000,000 or more for the --n option.

This is a cropped screenshot of the dependency graph:

dependency-graph-screenshot

Summary

We have described the definition of data pipelines using Luigi, a workflow manager written in Python. Luigi provides a nice abstraction to define your data pipeline in terms of tasks and targets, and it will take care of the dependencies for you.

In terms of code re-use, and with the mindset of going from prototype to production, I’ve found very helpful to define the business logic of the tasks in separate Python packages (i.e. with a setup.py file). In this way, from your Luigi script you can simply import your_package and call it from there.

A task can produce multiple files as output, but if that’s your case, you should probably verify if the task can be broken down into smaller units (i.e. multiple tasks). Do all these outputs logically belong together? Do you have dependencies between them? If you can’t break the task down, I’ve found it simpler/useful just to define the output() as a log file with the names and the timestamps of all the individual files created by the task itself. The log file name can be formatted as TaskName_timestamp_param1value_param2value_etc.

Using a workflow manager like Luigi is in general helpful because it handles dependencies, it reduces the amount of boilerplate code that is necessary for parameters and error checking, it manages failure recovery and overall it forces us to follow a clear pattern when developing the data pipeline.

It’s also important to consider its limitations:

  • It was built for batch jobs, it’s probably not useful for near real-time processing
  • It doesn’t trigger the execution for you, you still need to run the data pipeline (e.g. via a cronjob)

@MarcoBonzanini

21 thoughts on “Building Data Pipelines with Python and Luigi

  1. Hello

    Thank you for good tutorial.

    I have two comments.

    1, Syntax error
    I found syntax error on run_luigi.py.
    Could you review this fix?

    line 23:
    before
    return luigi.LocalTarget(“squares.txt”))

    after
    return luigi.LocalTarget(“squares.txt”)

    2, local-scheduler option

    I couldn’t view dependency graph after execute the following command.

    python run_luigi.py SquaredNumbers –local-scheduler –n [BIG_NUMBER]

    Do I need “–local-scheduler“ option?
    I removed this option, I coluld see dependency graph.

    Thanks.

    Like

    1. Thank you, I’ve applied the fixes. About the dependency graph, you can visualise it only if you use the global scheduler (luigid) so there’s no need for the –local-scheduler option

      Like

  2. I run luigi –address 192.168.100.201 –port 8082
    but it is nothing in my browser page.

    PS: my luigi is installed in linux without having GUI.
    I am using putty to connect to, and type luigid to start the server.

    Like

    1. Hi Hussain, with some abstraction and a mechanism to declare the dependencies dynamically (probably via a Luigi parameter), I can’t see why not

      Like

  3. Hello,
    nice article. Just one question. If I want to automate a building pipeline made by several tasks (for example: WAR deploy + JBOSS restart + properties configuration), may I use Luigi?

    Like

    1. Hi Riccardo,
      Luigi is agnostic from this point of view, you can use it to run any external command. There is extensive support for third-parties, especially databases and big data framework. I’m not aware of jboss-specific extensions but it’s fairly simple to customise. Whether it’s the best tool for the job or not… it depends on your application. If you have a sequence of tasks with dependencies (e.g. the output of a task feeds the next task) it’s probably a good option to consider.
      Cheers,
      Marco

      Like

  4. Is it possible to use a task recursively with Luigi? I have a render task that renders a video file which uses the ouput of the same render task that ouput a video file. I have searched the web without finding any tutorial about recursive pipeline with luigi. Thanks.

    Like

    1. Hi
      the same task definition can be reused with different arguments. Luigi looks at the combination of class name + attribute values to see if a task has been run, so in short this should be potentially possible

      Cheers
      Marco

      Like

  5. Very concise and useful tutorial for beginners. One thing to note is that run the global Luigi scheduler in a separate terminal!

    Like

  6. Nice post, have you used Luigi or a similar framework to log events of data movement from external sources to ADLS (using azure-data-lake-store Python libraries) ?

    I’m thinking on building our own framework (SSISDB-like database) to keep track of our data movement to ADLS, however as you may know, this could be something risky,

    Basically my automated python scripts will be logging events at the same time, so, I’ll be using ADLUploader() and it will log the status, rowcount and other useful attributes.

    Any thoughts?

    Like

    1. Hi Saul,
      I haven’t used azure per se, but I don’t see any particular issue. Luigi simply orchestrates the tasks and I’ve normally used Python’s standard logging mechanism as well as external tools like datadog.

      Cheers
      Marco

      Like

  7. Dear Marco,

    thank you for the nice tutorial.

    I experimented with run_luigi.py: copied it on the disk, then gave:
    python run_luigi.py SquaredNumbers –local-scheduler
    then
    rm numbers_up_to_10.txt
    then again:
    python run_luigi.py SquaredNumbers –local-scheduler
    and got:
    Scheduled 1 tasks of which:
    * 1 present dependencies were encountered:
    – 1 SquaredNumbers()
    Did not run any tasks
    This progress looks :) because there were no failed tasks or missing external dependencies
    Is that the correct behavior? I would have expected that removing numbers_up_to_10.txt would trigger PrintNumbers again.

    Like

    1. Hi Andrea,
      that’s the correct behaviour, because the tool starts from the end of the dependency graph (SquaredNumbers in this case) and if it finds the expected output (the target of SquaredNumbers) it will assume that everything is fine and it doesn’t re-run the task. In other words, you’ll need to remove all the *.txt output files for a full re-run of the pipeline

      Cheers
      Marco

      Like

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s