Apache Spark is a cluster computing framework, currently one of the most actively developed in the open-source Big Data arena. It aims at being a general engine for large-scale data processing, supporting a number of platforms for cluster management (e.g. YARN or Mesos as well as Spark native) and a variety of distributed storage systems (e.g. HDFS or Amazon S3).
More interestingly, at least from a developer’s perspective, it supports a number of programming languages. Since the latest version 1.4 (June 2015), Spark supports R and Python 3 (to complement the previously available support for Java, Scala and Python 2).
This article is a brief introduction on how to use Spark on Python 3.
After downloading a binary version of Spark 1.4, we can extract it in a custom folder, e.g. ~/apps/spark, which we’ll call $SPARK_HOME:
This folder contains several Spark commands (in $SPARK_HOME/bin) as well as examples of code (in $SPARK_HOME/examples/src/main/YOUR-LANGUAGE).
We can run Spark with Python in two ways: using the interactive shell, or submitting a standalone application.
Let’s start with the interactive shell, by running this command:
You will get several messages on the screen while the shell is loading, and at the end you should see the Spark banner:
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.4.0 /_/ Using Python version 2.7.5 (default, Mar 9 2014 22:15:05) SparkContext available as sc, HiveContext available as sqlContext. >>>
The >>> prompt is the usual Python prompt, as effectively we are using a Python interactive shell.
SparkContext and HiveContext are Spark concepts that we’ll briefly explain below. The interactive shell is telling us that these two contexts have been initialised and are available as sc and sqlContext in this session. The shell is also telling us that we’re using Python 2.7!?
But I want to use Python 3!
Long story short, Python 2 is still the default option in Spark, which you can see if you open the pyspark script with an editor (it’s a shell script). You can simply override this behaviour by setting an environment variable:
Once you re-run the interactive shell, the Spark banner should be updated to reflect your version of Python 3.
Some Basic Spark Concepts
Two Spark concepts have already been mentioned above:
- SparkContext: it’s an object that represents a connection to a computing cluster – an application will access Spark through this context
- HiveContext: it’s an instance of the Spark SQL engine, that integrates data stored in Hive (not used in this article)
Another core concept in Spark is the Resilient Distributed Dataset (RDD), an immutable distributed collection of objects. Each RDD is split into partitions, which might be processed on different nodes of a cluster.
RDDs can be loaded from external sources, e.g. from text files, or can be transformed into new RDDs.
There are two types of operation that can be performed over a RDD:
- a transformation will leave the original RDD intact and create a new one (RDD are immutable); an example of transformation is the use of a filter
- an action will compute a result based on the RDD, e.g. counting the number of lines in a RDD
Running an Example Application
Running the interactive shell can be useful for interactive analysis, but sometimes you need to launch a batch job, i.e. you need to submit a stand-alone application.
Consider the following code and save it as line_count.py:
from pyspark import SparkContext import sys if __name__ == '__main__': fname = sys.argv search1 = sys.argv.lower() search2 = sys.argv.lower() sc = SparkContext("local", appName="Line Count") data = sc.textFile(fname) # Transformations filtered_data1 = data.filter(lambda s: search1 in s.lower()) filtered_data2 = data.filter(lambda s: search2 in s.lower()) # Actions num1 = filtered_data1.count() num2 = filtered_data2.count() print('Lines with "%s": %i, lines with "%s": %i' % (search1, num1, search2, num2))
The application will take three parameters from the command line (via sys.argv), namely a file name and two search terms.
The sc variable contains the SparkContext, initialised as local context (we’re not using a cluster).
The data variable is the RDD, loaded from an external resource (the aforementioned file).
What follows the data import is a series of transformations and actions. The basic idea is to simply count how many lines contain the given search terms.
For this example, I’m using a data-set of tweets downloaded for a previous article, stored in data.json one tweet per line.
I can now launch (submit) the application with:
$SPARK_HOME/bin/spark-submit line_count.py data.json \#ita \#eng
Within a lot of debugging information, the application will print out the final count:
Lines with "#ita": 1339, lines with "#eng": 2278
Notice the use of the backslash from the command-line, because we need to escape the # symbol: effectively the search terms are #ita and #eng.
Also notice that we don’t have information about repeated occurrences of the search terms, nor about partial matches (e.g. “#eng” will also match “#england”, etc.): this example just showcases the use of transformations and actions.
Spark now supports Python 3 :)