# Spark tutorial

### Install Spark

1. Install Java

```
sudo apt install default-jdk
java -version
```

2. Download Apache Spark and unpack it
https://spark.apache.org/downloads.html

```
sudo mkdir /opt/spark/
sudo mv spark-3.3.2-bin-hadoop3/ /opt/spark/
```

3. Install PySpark

```
pip install pyspark
```

4. Set Spark environment (append the following two lines to ~/.bashrc)

```
vi ~/.bashrc
```

```
export SPARK_HOME=/opt/spark/spark-3.1.1-bin-hadoop2.7
export PATH=$SPARK_HOME/bin:$PATH
```

```
source ~/.bashrc
```


### Spark & Jupyter notebook

To set up Spark in [Jupyter notebook](https://jupyter.org/), do the following:

1. add the following lines into ~/.bashrc
 - local access
```
 export PYSPARK_DRIVER_PYTHON=jupyter
 export PYSPARK_DRIVER_PYTHON_OPTS="notebook"
``` 
 - remote access
```
 export PYSPARK_DRIVER_PYTHON=jupyter
 export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser --port= --ip='*'"
``` 
 - Windows subsystem for Linux
```
 export PYSPARK_DRIVER_PYTHON=jupyter
 export PYSPARK_DRIVER_PYTHON_OPTS="notebook --no-browser"
``` 

Don't forget to run ```source ~/.bashrc``` at the end.

2. run from terminal:
```
pyspark
```

Note that remote access to jupyter notebook requires a tunnel. On Windows machines, you can use [Putty](https://www.putty.org/) to set it up. In Linux environments, the following command can be used:

 ssh -N -L localhost::localhost: 

Finally, you can run the notebook in your browser:

 http://localhost:

In [1]:
import random
import re

## PySpark Python API 

PySpark can be used from standalone Python scripts by creating a `SparkContext`. You can set configuration properties by passing a `SparkConf` object to `SparkContext`.

Documentation: [pyspark package](https://spark.apache.org/docs/latest/api/python/pyspark.html)

In [2]:
from pyspark import SparkContext, SparkConf

In [3]:
# cannot run multiple SparkContexts at once (so stop one just in case)
sc = SparkContext.getOrCreate()
sc.stop()

In [4]:
# spark conf
conf = SparkConf()

In [5]:
# create a Spark context
sc = SparkContext(conf=conf)

## RDD - Resilient Distributed Datasets

resilient:
- (of a person or animal) able to withstand or recover quickly from difficult conditions
- (of a substance or object) able to recoil or spring back into shape after bending, stretching, or being compressed

Spark is RDD-centric!
- RDDs are immutable
- RDDs are computed lazily
- RDDs can be cached
- RDDs know who their parents are
- RDDs that contain only tuples of two elements are “pair RDDs”

## RDD Actions

**RDD** - Resilient Distributed Datasets

Some useful actions:
- take(n) – return the first n elements in the RDD as an array.
- collect() – return all elements of the RDD as an array. Use with caution.
- count() – return the number of elements in the RDD as an int.
- saveAsTextFile(‘path/to/dir’) – save the RDD to files in a directory. Will create the directory if it doesn’t exist and will fail if it does.
- foreach(func) – execute the function against every element in the RDD, but don’t keep any results.

#### Demo files

```
file1.txt:
 Apple,Amy
 Butter,Bob
 Cheese,Chucky
 Dinkel,Dieter
 Egg,Edward
 Oxtail,Oscar
 Anchovie,Alex
 Avocado,Adam
 Apple,Alex
 Apple,Adam
 Dinkel,Dieter
 Doughboy,Pilsbury
 McDonald,Ronald

file2.txt:
 Wendy,
 Doughboy,Pillsbury
 McDonald,Ronald
 Cheese,Chucky
```

In [6]:
# input files
file1 = 'file1.txt'
file2 = 'file2.txt'

In [7]:
# load data
data1 = sc.textFile(file1)
data2 = sc.textFile(file2)

In [8]:
data1.collect()

 

['Apple,Amy',
 'Butter,Bob',
 'Cheese,Chucky',
 'Dinkel,Dieter',
 'Egg,Edward',
 'Oxtail,Oscar',
 'Anchovie,Alex',
 'Avocado,Adam',
 'Apple,Alex',
 'Apple,Adam',
 'Dinkel,Dieter',
 'Doughboy,Pilsbury',
 'McDonald,Ronald']

In [9]:
print("file1: %d lines" % data1.count())

file1: 13 lines


 

In [10]:
data1.take(3)

['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky']

In [11]:
data2.collect()

['Wendy,', 'Doughboy,Pillsbury', 'McDonald,Ronald', 'Cheese,Chucky']

In [12]:
print("file2: %d lines" % data2.count())

file2: 4 lines


In [13]:
data2.take(3)

['Wendy,', 'Doughboy,Pillsbury', 'McDonald,Ronald']

Note: the following produces output on Jupyter notebook server!

In [14]:
# prints each element in the Jupyter notebook output
data2.foreach(print)

Cheese,Chucky
Wendy,
Doughboy,Pillsbury
McDonald,Ronald


## RDD Operations

### map()
Return a new RDD by applying a function to each element of this RDD.
- apply an operation to every element of an RDD
- return a new RDD that contains the results

In [15]:
data = sc.textFile(file1)

In [16]:
data

file1.txt MapPartitionsRDD[10] at textFile at NativeMethodAccessorImpl.java:0

In [17]:
data.take(3)

['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky']

In [18]:
data.map(lambda line: line.split(',')).take(3)

[['Apple', 'Amy'], ['Butter', 'Bob'], ['Cheese', 'Chucky']]

### flatMap()
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
- apply an operation to the value of every element of an RDD
- return a new RDD that contains the results after dropping the outermost container

In [19]:
data = sc.textFile(file1)

In [20]:
data.take(4)

['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky', 'Dinkel,Dieter']

In [21]:
data.flatMap(lambda line: line.split(',')).take(7)

['Apple', 'Amy', 'Butter', 'Bob', 'Cheese', 'Chucky', 'Dinkel']

### mapValues()
Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning.
- apply an operation to the value of every element of an RDD
- return a new RDD that contains the results

Only works with pair RDDs.

In [22]:
data = sc.textFile(file1)

In [23]:
data = data.map(lambda line: line.split(','))

In [24]:
data.take(3)

[['Apple', 'Amy'], ['Butter', 'Bob'], ['Cheese', 'Chucky']]

In [25]:
data.collect()

[['Apple', 'Amy'],
 ['Butter', 'Bob'],
 ['Cheese', 'Chucky'],
 ['Dinkel', 'Dieter'],
 ['Egg', 'Edward'],
 ['Oxtail', 'Oscar'],
 ['Anchovie', 'Alex'],
 ['Avocado', 'Adam'],
 ['Apple', 'Alex'],
 ['Apple', 'Adam'],
 ['Dinkel', 'Dieter'],
 ['Doughboy', 'Pilsbury'],
 ['McDonald', 'Ronald']]

In [26]:
data = data.map(lambda pair: (pair[0], pair[1]))

In [27]:
data.take(3)

[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky')]

In [28]:
data.mapValues(lambda name: name.lower()).take(3)

[('Apple', 'amy'), ('Butter', 'bob'), ('Cheese', 'chucky')]

### flatMapValues()
Pass each value in the key-value pair RDD through a flatMap function without changing the keys; this also retains the original RDD's partitioning.
- apply an operation to the value of every element of an RDD
- return a new RDD that contains the results after removing the outermost container

Only works with pair RDDs.

In [29]:
data = sc.textFile(file1)

In [30]:
data = data.map(lambda line: line.split(','))

In [31]:
data = data.map(lambda pair: (pair[0], pair[1]))

In [32]:
data.take(3)

[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky')]

In [33]:
data.flatMapValues(lambda name: name.lower()).take(9)

[('Apple', 'a'),
 ('Apple', 'm'),
 ('Apple', 'y'),
 ('Butter', 'b'),
 ('Butter', 'o'),
 ('Butter', 'b'),
 ('Cheese', 'c'),
 ('Cheese', 'h'),
 ('Cheese', 'u')]

### filter()
Return a new RDD containing only the elements that satisfy a predicate.
- return a new RDD that contains only the elements that pass a **filter operation**

In [34]:
data = sc.textFile(file1)

In [35]:
data.take(3)

['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky']

In [36]:
data.filter(lambda line: re.match(r'^[AEIOU]', line)).take(3)

['Apple,Amy', 'Egg,Edward', 'Oxtail,Oscar']

In [37]:
data.filter(lambda line: re.match(r'^[AEIOU]', line)).collect()

['Apple,Amy',
 'Egg,Edward',
 'Oxtail,Oscar',
 'Anchovie,Alex',
 'Avocado,Adam',
 'Apple,Alex',
 'Apple,Adam']

In [38]:
data.filter(lambda line: re.match(r'.+[y]$', line)).take(3)

['Apple,Amy', 'Cheese,Chucky', 'Doughboy,Pilsbury']

In [39]:
data.filter(lambda line: re.search(r'[x]$', line)).take(3)

['Anchovie,Alex', 'Apple,Alex']

### groupByKey()
Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.
- apply an operation to the value of every element of an RDD
- return a new RDD that contains the results after removing the outermost container

Only works with pair RDDs.

In [40]:
data = sc.textFile(file1)

In [41]:
data = data.map(lambda line: line.split(','))

In [42]:
data.take(3)

[['Apple', 'Amy'], ['Butter', 'Bob'], ['Cheese', 'Chucky']]

In [43]:
data = data.map(lambda pair: (pair[0], pair[1]))

In [44]:
data.take(3)

[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky')]

In [45]:
data.groupByKey().take(1)

 

[('Apple', )]

In [46]:
for pair in data.groupByKey().take(1):
 print("%s: %s" % (pair[0], ",".join([n for n in pair[1]])))

Apple: Amy,Alex,Adam


### reduceByKey()
Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.
- combine elements of an RDD by key and then 
- apply a reduce operation to pairs of keys
- until only a single key remains.
- return the result in a new RDD

In [47]:
data = sc.textFile(file1)

In [48]:
data = data.map(lambda line: line.split(","))

In [49]:
data = data.map(lambda pair: (pair[0], pair[1]))

In [50]:
data.take(3)

[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky')]

In [51]:
data.reduceByKey(lambda v1, v2: v1 + ":" + v2).take(6)

[('Apple', 'Amy:Alex:Adam'),
 ('Butter', 'Bob'),
 ('Dinkel', 'Dieter:Dieter'),
 ('Doughboy', 'Pilsbury'),
 ('Cheese', 'Chucky'),
 ('Egg', 'Edward')]

### sortBy()
Sorts this RDD by the given keyfunc.
- sort an RDD according to a sorting function
- return the results in a new RDD

In [52]:
data = sc.textFile(file1)

In [53]:
data = data.map(lambda line: line.split(","))

In [54]:
data = data.map(lambda pair: (pair[0], pair[1]))

In [55]:
data.collect()

[('Apple', 'Amy'),
 ('Butter', 'Bob'),
 ('Cheese', 'Chucky'),
 ('Dinkel', 'Dieter'),
 ('Egg', 'Edward'),
 ('Oxtail', 'Oscar'),
 ('Anchovie', 'Alex'),
 ('Avocado', 'Adam'),
 ('Apple', 'Alex'),
 ('Apple', 'Adam'),
 ('Dinkel', 'Dieter'),
 ('Doughboy', 'Pilsbury'),
 ('McDonald', 'Ronald')]

In [56]:
data.sortBy(lambda pair: pair[1][1]).take(10)

[('Egg', 'Edward'),
 ('Avocado', 'Adam'),
 ('Apple', 'Adam'),
 ('Cheese', 'Chucky'),
 ('Dinkel', 'Dieter'),
 ('Dinkel', 'Dieter'),
 ('Doughboy', 'Pilsbury'),
 ('Anchovie', 'Alex'),
 ('Apple', 'Alex'),
 ('Apple', 'Amy')]

### sortByKey()
Sorts this RDD, which is assumed to consist of (key, value) pairs.
- sort an RDD according to the natural ordering of the keys
- return the results in a new RDD

In [57]:
data = sc.textFile(file1)

In [58]:
data = data.map(lambda line: line.split(","))

In [59]:
data = data.map(lambda pair: (pair[0], pair[1]))

In [60]:
data.collect()

[('Apple', 'Amy'),
 ('Butter', 'Bob'),
 ('Cheese', 'Chucky'),
 ('Dinkel', 'Dieter'),
 ('Egg', 'Edward'),
 ('Oxtail', 'Oscar'),
 ('Anchovie', 'Alex'),
 ('Avocado', 'Adam'),
 ('Apple', 'Alex'),
 ('Apple', 'Adam'),
 ('Dinkel', 'Dieter'),
 ('Doughboy', 'Pilsbury'),
 ('McDonald', 'Ronald')]

In [61]:
data.sortByKey().take(6)

[('Anchovie', 'Alex'),
 ('Apple', 'Amy'),
 ('Apple', 'Alex'),
 ('Apple', 'Adam'),
 ('Avocado', 'Adam'),
 ('Butter', 'Bob')]

### subtract()
Return each value in self that is not contained in other.
- return a new RDD that contains all the elements from the original RDD 
- that do not appear in a target RDD

In [62]:
data1 = sc.textFile(file1)

In [63]:
data1.collect()

['Apple,Amy',
 'Butter,Bob',
 'Cheese,Chucky',
 'Dinkel,Dieter',
 'Egg,Edward',
 'Oxtail,Oscar',
 'Anchovie,Alex',
 'Avocado,Adam',
 'Apple,Alex',
 'Apple,Adam',
 'Dinkel,Dieter',
 'Doughboy,Pilsbury',
 'McDonald,Ronald']

In [64]:
data1.count()

13

In [65]:
data2 = sc.textFile(file2)

In [66]:
data2.collect()

['Wendy,', 'Doughboy,Pillsbury', 'McDonald,Ronald', 'Cheese,Chucky']

In [67]:
data2.count()

4

In [68]:
data1.subtract(data2).collect()

['Egg,Edward',
 'Doughboy,Pilsbury',
 'Oxtail,Oscar',
 'Apple,Alex',
 'Apple,Amy',
 'Butter,Bob',
 'Anchovie,Alex',
 'Avocado,Adam',
 'Dinkel,Dieter',
 'Dinkel,Dieter',
 'Apple,Adam']

In [69]:
data1.subtract(data2).count()

11

### join()
Return an RDD containing all pairs of elements with matching keys in self and other. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in self and (k, v2) is in other.
- return a new RDD that contains all the elements from the original RDD
- joined (inner join) with elements from the target RDD

In [70]:
data1 = sc.textFile(file1).map(lambda line: line.split(',')).map(lambda pair: (pair[0], pair[1]))

In [71]:
data1.collect()

[('Apple', 'Amy'),
 ('Butter', 'Bob'),
 ('Cheese', 'Chucky'),
 ('Dinkel', 'Dieter'),
 ('Egg', 'Edward'),
 ('Oxtail', 'Oscar'),
 ('Anchovie', 'Alex'),
 ('Avocado', 'Adam'),
 ('Apple', 'Alex'),
 ('Apple', 'Adam'),
 ('Dinkel', 'Dieter'),
 ('Doughboy', 'Pilsbury'),
 ('McDonald', 'Ronald')]

In [72]:
data1.count()

13

In [73]:
data2 = sc.textFile(file2).map(lambda line: line.split(',')).map(lambda pair: (pair[0], pair[1]))

In [74]:
data2.collect()

[('Wendy', ''),
 ('Doughboy', 'Pillsbury'),
 ('McDonald', 'Ronald'),
 ('Cheese', 'Chucky')]

In [75]:
data2.count()

4

In [76]:
data1.join(data2).collect()

[('Doughboy', ('Pilsbury', 'Pillsbury')),
 ('McDonald', ('Ronald', 'Ronald')),
 ('Cheese', ('Chucky', 'Chucky'))]

In [77]:
data1.join(data2).count()

3

In [78]:
data1.fullOuterJoin(data2).take(2)

[('Dinkel', ('Dieter', None)), ('Dinkel', ('Dieter', None))]

In [79]:
# stop Spark context
sc.stop()

## MapReduce demo

We will now count the occurences of each word. The typical "Hello, world!" app for Spark applications is known as word count. The map/reduce model is particularly well suited to applications like counting words in a document.

In [80]:
# create a Spark context
sc = SparkContext(conf=conf)

In [81]:
# read the target file into an RDD
lines = sc.textFile(file1)
lines.take(3)

['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky']

The `flatMap()` operation first converts each line into an array of words, and then makes
each of the words an element in the new RDD.

In [82]:
# split the lines into individual words
words = lines.flatMap(lambda l: re.split(r'[^\w]+', l))
words.take(3)

['Apple', 'Amy', 'Butter']

The `map()` operation replaces each word with a tuple of that word and the number 1. The
pairs RDD is a pair RDD where the word is the key, and all of the values are the number 1.

In [83]:
# replace each word with a tuple of that word and the number 1
pairs = words.map(lambda w: (w, 1))
pairs.take(3)

[('Apple', 1), ('Amy', 1), ('Butter', 1)]

The `reduceByKey()` operation keeps adding elements' values together until there are no
more to add for each key (word).

In [84]:
# group the elements of the RDD by key (word) and add up their values
counts = pairs.reduceByKey(lambda n1, n2: n1 + n2)
counts.take(3)

[('Apple', 3), ('Amy', 1), ('Butter', 1)]

In [85]:
# sort the elements by values in descending order
counts.sortBy(lambda pair: pair[1], ascending=False).take(10)

[('Apple', 3),
 ('Dinkel', 2),
 ('Alex', 2),
 ('Dieter', 2),
 ('Adam', 2),
 ('Amy', 1),
 ('Butter', 1),
 ('Chucky', 1),
 ('Edward', 1),
 ('Doughboy', 1)]

#### Simplify chained transformations

It is good to know that the code above can also be written in the following way:

In [86]:
sorted_counts = (lines.flatMap(lambda l: re.split(r'[^\w]+', l)) # words
 .map(lambda w: (w, 1)) # pairs
 .reduceByKey(lambda n1, n2: n1 + n2) # counts
 .sortBy(lambda pair: pair[1], ascending=False)) # sorted counts

In [87]:
sorted_counts.take(10)

[('Apple', 3),
 ('Dinkel', 2),
 ('Alex', 2),
 ('Dieter', 2),
 ('Adam', 2),
 ('Amy', 1),
 ('Butter', 1),
 ('Chucky', 1),
 ('Edward', 1),
 ('Doughboy', 1)]

In [88]:
# stop Spark context
sc.stop()