As a data scientist, the emphasis of the day-to-day job is often more on the R&D side rather than engineering. In the process of going from prototypes to production though, some of the early quick-and-dirty decisions turn out to be sub-optimal and require a decent amount of effort to be re-engineered. This usually slows down innovation, and generally speaking your project as a whole.
This post will discuss some experience in building data pipelines, e.g. extraction, cleaning, integration, pre-processing of data, in general all the steps that are necessary to prepare your data for your data-driven product. In particular, the focus in on data plumbing, and how a workflow manager like Luigi can come to the rescue, without getting in your way. With a minimal effort, the transition from prototype to production can be smoother.
You can find the code for the examples as GitHub Gist.
Early Days of a Prototype
In the early days of a prototype, the data pipeline often looks like this:
$ python get_some_data.py
$ python clean_some_data.py
$ python join_other_data.py
$ python do_stuff_with_data.py
This is quite common when the data project is in its exploratory stage: you know that you’ll need some pre-processing, you think it’s going to be a quick hack, so you don’t bother with some engineering best practices, then the number of scripts grows and your data pipeline will come back and bite you.
This approach has the only advantage of being quick and hacky. On the downside, it’s tedious: every time you want to re-run the pipeline, you need to manually call the bunch of scripts in sequence. Moreover, if you’re sharing this prototype with a colleague, there is even more room for misinterpretation (“why can’t I do stuff with data?”… “did you clean it first?”, etc.)
The obvious hacky solution seems to be: let’s put everything in one script. After some quick refactoring, the do_everything.py script can look like this:
if __name__ == '__main__':
get_some_data()
clean_some_data()
join_other_data()
do_stuff_with_data()
This is fairly simple to run:
$ python do_everything.py
(Note: you could also put everything in a bash script, which calls the indiviaul bunch of scripts in sequence, but the shortcomings will be more or less the same)
Boilerplate Code
When moving towards a production-ready pipeline, there are a few more aspects to consider besides the run-everything code. In particular, error handling should be taken into account:
try:
get_some_data()
except GetSomeDataError as e:
# handle this
But if we chain all the individual tasks together, we end up with a Christmas tree of try/except:
try:
get_some_data()
try:
clean_some_data()
try:
# you see where this is going...
except EvenMoreErrors:
# ...
except CleanSomeDataError as e:
# handle CleanSomeDataError
except GetSomeDataError as e:
# handle GetSomeDataError
Another important aspect to consider is how to resume a pipeline. For example, if the first few tasks are completed, but then an error occurs half-way through, how do we re-run the pipeline without re-executing the initial successful steps?
# check if the task was already successful
if not i_got_the_data_already():
# if not, run it
try:
get_some_date()
except GetSomeDataError as e:
# handle the error
Moving to Luigi
Luigi is a Python tool for workflow management. It has been developed at Spotify, to help building complex data pipelines of batch jobs. To install Luigi:
$ pip install luigi
Some of the useful features of Luigi include:
- Dependency management
- Checkpoints / Failure recovery
- CLI integration / parameterisation
- Dependency Graph visualisation
There are two core concepts to understand how we can apply Luigi to our own data pipeline: Tasks and Targets. A task is a unit of work, designed by extending the class luigi.Task and overriding some basic methods. The output of a task is a target, which can be a file on the local filesystem, a file on Amazon’s S3, some piece of data in a database etc.
Dependencies are defined in terms of inputs and outputs, i.e. if TaskB depends on TaskA, it means that the output of TaskA will be the input of TaskB.
Let’s look at a couple of template tasks:
# Filename: run_luigi.py
import luigi
class PrintNumbers(luigi.Task):
def requires(self):
return []
def output(self):
return luigi.LocalTarget("numbers_up_to_10.txt")
def run(self):
with self.output().open('w') as f:
for i in range(1, 11):
f.write("{}\n".format(i))
class SquaredNumbers(luigi.Task):
def requires(self):
return [PrintNumbers()]
def output(self):
return luigi.LocalTarget("squares.txt")
def run(self):
with self.input()[0].open() as fin, self.output().open('w') as fout:
for line in fin:
n = int(line.strip())
out = n * n
fout.write("{}:{}\n".format(n, out))
if __name__ == '__main__':
luigi.run()
This code showcases two tasks: PrintNumbers, that writes the number from 1 to 10 into a file called numbers_up_to_10.txt, one number per line, and SquaredNumbers, that reads the such file and outputs a list of pairs number-square into squares.txt, also one pair per line.
To run the tasks:
$ python run_luigi.py SquaredNumbers --local-scheduler
Luigi will take care of checking the dependencies between tasks, see that the input of SquaredNumbers is not there, so it will run the PrintNumbers task first, then carry on with the execution.
The first argument we’re passing to Luigi is the name of the last task in the pipeline we want to run. The second argument simply tells Luigi to use a local scheduler (more on this later).
You could also use the luigi command:
$ luigi -m run_luigi.py SquaredNumbers --local-scheduler
Anatomy of a Task
To create a Luigi task we simply need to create a class whose parent is luigi.Task, and override some methods. In particular:
- requires() should return the list of dependencies for the given task — in other words a list of tasks
- output() should return the target for the task (e.g. a LocalTarget, a S3Target, etc.)
- run() should contain the logic to execute
Luigi will check the return values of requires() and output() and build the dependency graph accordingly.
Passing Parameters
Hard-coding filenames and config values is generally speaking an anti-pattern. Once you’ve understood the structure and the dynamics of your task, you should look into parameterising all the configuration aspects so that you can dynamically call the same script with different arguments.
The class luigi.Parameter() is the place to look into. Each Luigi task can have a number of parameters. Let’s say for example that we want to modify the previous example to support a custom number. As the parameter we’re using with the range() function is an integer, we can use luigi.IntParameter rather than the default parameter class. This is how the modified tasks can look like:
class PrintNumbers(luigi.Task):
n = luigi.IntParameter()
def requires(self):
return []
def output(self):
return luigi.LocalTarget("numbers_up_to_{}.txt".format(self.n))
def run(self):
with self.output().open('w') as f:
for i in range(1, self.n+1):
f.write("{}\n".format(i))
class SquaredNumbers(luigi.Task):
n = luigi.IntParameter()
def requires(self):
return [PrintNumbers(n=self.n)]
def output(self):
return luigi.LocalTarget("squares_up_to_{}.txt".format(self.n))
def run(self):
with self.input()[0].open() as fin, self.output().open('w') as fout:
for line in fin:
n = int(line.strip())
out = n * n
fout.write("{}:{}\n".format(n, out))
To call the SquaredNumbers tasks up to, say, 20:
$ python run_luigi.py SquaredNumbers --local-scheduler --n 20
Parameters can also have default values, e.g.
n = luigi.IntParameter(default=10)
so in this way, if you don’t specify the --n argument, it will default to 10.
Sample code as GitHub Gist
Local vs Global Scheduler
So far, we’ve used the --local-scheduler option to run Luigi tasks with a local scheduler. This is useful for development, but in a production environment we should make use of the centralised scheduler (see the docs on the scheduler).
This has a few advantages:
- avoid running two instances of the same task simultaneously
- nice web-based visualisation
You can run the Luigi scheduler daemon in the foreground with:
$ luigid
or in the background with:
$ luigid --background
It will default to port 8082, so you can point your browser to http://localhost:8082 to access the visualisation.
With the global Luigi scheduler running, we can re-run the code without the option for the local scheduler:
$ python run_luigi.py SquaredNumbers --n [BIG_NUMBER]
As the sample code will run in milliseconds, if you want to have a chance to switch to the browser and see the dependency graph while the tasks are still running, you should probably use a big number like 10,000,000 or more for the --n option.
This is a cropped screenshot of the dependency graph:

Summary
We have described the definition of data pipelines using Luigi, a workflow manager written in Python. Luigi provides a nice abstraction to define your data pipeline in terms of tasks and targets, and it will take care of the dependencies for you.
In terms of code re-use, and with the mindset of going from prototype to production, I’ve found very helpful to define the business logic of the tasks in separate Python packages (i.e. with a setup.py file). In this way, from your Luigi script you can simply import your_package and call it from there.
A task can produce multiple files as output, but if that’s your case, you should probably verify if the task can be broken down into smaller units (i.e. multiple tasks). Do all these outputs logically belong together? Do you have dependencies between them? If you can’t break the task down, I’ve found it simpler/useful just to define the output() as a log file with the names and the timestamps of all the individual files created by the task itself. The log file name can be formatted as TaskName_timestamp_param1value_param2value_etc.
Using a workflow manager like Luigi is in general helpful because it handles dependencies, it reduces the amount of boilerplate code that is necessary for parameters and error checking, it manages failure recovery and overall it forces us to follow a clear pattern when developing the data pipeline.
It’s also important to consider its limitations:
- It was built for batch jobs, it’s probably not useful for near real-time processing
- It doesn’t trigger the execution for you, you still need to run the data pipeline (e.g. via a cronjob)
@MarcoBonzanini