This post has mainly been written for me, so that in the future I have a general reference guide for spark and so I don’t have to piece together the various bits I’ve found strewn across the internet in order to build an EMR instance that I like. Also I originally wanted to build this to test whether reticulate could be used to link in with Spark on an EMR (elastic map reduce) cluster. However, I believe that this is not possible now and as such had to pivot into utilising a jupyter notebook and rpy2 to allow me to utilise R within the session.
Part 1: A quick introduction to spark
What is spark?
Spark is a computing engine designed for large scale data computation, IE focused on cleaning and analyzing and not storage. It was developed at UC Berkeley and then donated to the Apache Software Federation, where it is maintained on an open source basis.
How does it work?
Spark works through a number of servers joined together in a cluster, with one server acting as the master node in order to control the cluster and the others acting as executor nodes to distribute the data across (known as partitions) and perform actions, as shown below (with the T shaped objects representing a two column dataframe).
These actions are evaluated lazily, meaning that operations are not carried out until an action is required. For example multiplying a column will not occur until a count of the output is required. This strategy allows spark to calculate and optimise a plan in which to run the operations across the cluster, increasing speed.
In addition to being performed lazily, transformations can either be narrow or wide, which is explained below and also shown on the below diagram.
Narrow transformation - A row in a partition will be in the same position in the same modified partition post transformation, for example multiplying a column of numbers by a constant which forms a new column.
Wide transformation - Data in each partition can become spread across multiple other partitions, for example sorting a partition may lead to 4 values from partition 1 being placed in new partitions 1, 2 3 and 4.
The latter of these two is known as a shuffle, as it moves around the partitions, and involves writing the data to disk, where as narrow transformations are performed in memory, leading to differences in speed.
How is a cluster operated?
The majority of times a cluster will be operated via what is referred to as structured API, which acts as a bridge between a more common programming language and the native spark language. Currently these are available for scala, java, python SQL and R, where code written in these languages is translated within a spark session. A summation of the process is shown below, which involves a user propositioning the cluster manager, on the master node, for resources, leading to the creation of a spark driver process which includes the user code and a spark session. Once the full plan is determined, via the spark session, it is communicated to the cluster manager, which then allocates resources to run each process. The results of these processes are then fed back to the master node.
Part 2: Setting up an AWS EMR
Step 1 - setting up the EMR cluster
The first step is finding your was to the emr homepage within AWS, which looks like below, where you can initiate a cluster from.
By clicking on create a cluster you’ll find yourself being able to select your cluster options, including the name and software etc, choose spark and pick your key pair and you’re good to launch. One additional thing you’ll want to do is allow access to your S3 buckets, so that you can pull in data to use. A good guide for this is at the following link
Once you’ve launched you can then set the security options, though only the first time you launch, these can be found on your cluster page post launch. Here you need to edit the inbound rules on both the master and slave nodes as open port 22 for your IP, via SSH, and a custom TCP for port 8888 which is open to all ips for jupyter notebooks.
Once you’ve waiting the 10 or so minutes for the cluster to gear up you can ssh in, which needs to be conducted with port forwarding to allow your machine to connect to the master node and access the jupyter notebook via the browser. The command for this is shown below.
ssh -i your-authentication-key-pem-file -L 8888:localhost:8888 your-machine's-Public-DNS
Once you’re in you, which may requiring entering a pass code you set earlier if you’re using putty, should see the friendly screen below!
Step 2 - Updating the EMR with required software
Now that we have our servers up and running and we’ve ssh’d into the main node it’s time to update our software. For me this involves changing our default python on pyspark to python 3, installing libraries and setting jupyter to drive the spark processes.
Now we need to install some libraries that you may need for analysis later on, including rpy2 which allows us to access R from within the python led jupyter notebook (which also requires an update to setuptools to install).
sudo pip-3.6 install jupyter sudo pip-3.6 install pandas sudo pip-3.6 install --upgrade setuptools sudo pip-3.6 install rpy2
Now we need to change the default version of python used by spark, which is pretty easy going. This requires the following lines of code, which alters the python path in the spark config and sets the driver to our jupyter notebook.
export PYSPARK_PYTHON=/usr/bin/python3 export PYSPARK_DRIVER_PYTHON=jupyter export PYSPARK_DRIVER_PYTHON_OPTS='notebook --no-browser --port=8888' source .bashrc
Step 3 - Launching a Jupyter notebook
Now the infrastructure is all set up its time to open the notebook and login. This is done by accessing pyspark as below and copying the provided token. Once this is done you can start a new notebook which will be connected to your EMR cluster, as such you can utilise it analyse large data sets that are stored within your S3 buckets, providing you have set up the access already.
Now that we’re in the notebook lets see if we can get R running. This involves loading the rpy2 package, running the command to allow cells to run R code and then testing this. The following is shown in the below code snippet and also in the attached image, which also shows the output of a base R plot. In order to do anything outside of base R you’ll need to install packages within the R console, which can be done by running the R() command in the terminal and running install.packages().
Well that’s all that will be covered in this post, but hopefully it’s enough to give you a bit of background into spark and also boot up your first EMR cluster with both python and R available!