{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Spark tutorial\n", "\n", "### Install Spark\n", "\n", "1. Install Java\n", "\n", "```\n", "sudo apt install default-jdk\n", "java -version\n", "```\n", "\n", "2. Download Apache Spark and unpack it\n", "https://spark.apache.org/downloads.html\n", "\n", "```\n", "sudo mkdir /opt/spark/\n", "sudo mv spark-3.3.2-bin-hadoop3/ /opt/spark/\n", "```\n", "\n", "3. Install PySpark\n", "\n", "```\n", "pip install pyspark\n", "```\n", "\n", "4. Set Spark environment (append the following two lines to ~/.bashrc)\n", "\n", "```\n", "vi ~/.bashrc\n", "```\n", "\n", "```\n", "export SPARK_HOME=/opt/spark/spark-3.1.1-bin-hadoop2.7\n", "export PATH=$SPARK_HOME/bin:$PATH\n", "```\n", "\n", "```\n", "source ~/.bashrc\n", "```\n", "\n", "\n", "### Spark & Jupyter notebook\n", "\n", "To set up Spark in [Jupyter notebook](https://jupyter.org/), do the following:\n", "\n", "1. add the following lines into ~/.bashrc\n", " - local access\n", "```\n", " export PYSPARK_DRIVER_PYTHON=jupyter\n", " export PYSPARK_DRIVER_PYTHON_OPTS=\"notebook\"\n", "``` \n", " - remote access\n", "```\n", " export PYSPARK_DRIVER_PYTHON=jupyter\n", " export PYSPARK_DRIVER_PYTHON_OPTS=\"notebook --no-browser --port= --ip='*'\"\n", "``` \n", " - Windows subsystem for Linux\n", "```\n", " export PYSPARK_DRIVER_PYTHON=jupyter\n", " export PYSPARK_DRIVER_PYTHON_OPTS=\"notebook --no-browser\"\n", "``` \n", "\n", "Don't forget to run ```source ~/.bashrc``` at the end.\n", "\n", "2. run from terminal:\n", "```\n", "pyspark\n", "```\n", "\n", "Note that remote access to jupyter notebook requires a tunnel. On Windows machines, you can use [Putty](https://www.putty.org/) to set it up. In Linux environments, the following command can be used:\n", "\n", " ssh -N -L localhost::localhost: \n", "\n", "Finally, you can run the notebook in your browser:\n", "\n", " http://localhost:" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "import random\n", "import re" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## PySpark Python API \n", "\n", "PySpark can be used from standalone Python scripts by creating a `SparkContext`. You can set configuration properties by passing a `SparkConf` object to `SparkContext`.\n", "\n", "Documentation: [pyspark package](https://spark.apache.org/docs/latest/api/python/pyspark.html)" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [], "source": [ "from pyspark import SparkContext, SparkConf" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "# cannot run multiple SparkContexts at once (so stop one just in case)\n", "sc = SparkContext.getOrCreate()\n", "sc.stop()" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "# spark conf\n", "conf = SparkConf()" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "# create a Spark context\n", "sc = SparkContext(conf=conf)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## RDD - Resilient Distributed Datasets\n", "\n", "resilient:\n", "- (of a person or animal) able to withstand or recover quickly from difficult conditions\n", "- (of a substance or object) able to recoil or spring back into shape after bending, stretching, or being compressed\n", "\n", "Spark is RDD-centric!\n", "- RDDs are immutable\n", "- RDDs are computed lazily\n", "- RDDs can be cached\n", "- RDDs know who their parents are\n", "- RDDs that contain only tuples of two elements are “pair RDDs”" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## RDD Actions\n", "\n", "**RDD** - Resilient Distributed Datasets\n", "\n", "Some useful actions:\n", "- take(n) – return the first n elements in the RDD as an array.\n", "- collect() – return all elements of the RDD as an array. Use with caution.\n", "- count() – return the number of elements in the RDD as an int.\n", "- saveAsTextFile(‘path/to/dir’) – save the RDD to files in a directory. Will create the directory if it doesn’t exist and will fail if it does.\n", "- foreach(func) – execute the function against every element in the RDD, but don’t keep any results." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Demo files\n", "\n", "```\n", "file1.txt:\n", " Apple,Amy\n", " Butter,Bob\n", " Cheese,Chucky\n", " Dinkel,Dieter\n", " Egg,Edward\n", " Oxtail,Oscar\n", " Anchovie,Alex\n", " Avocado,Adam\n", " Apple,Alex\n", " Apple,Adam\n", " Dinkel,Dieter\n", " Doughboy,Pilsbury\n", " McDonald,Ronald\n", "\n", "file2.txt:\n", " Wendy,\n", " Doughboy,Pillsbury\n", " McDonald,Ronald\n", " Cheese,Chucky\n", "```" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [], "source": [ "# input files\n", "file1 = 'file1.txt'\n", "file2 = 'file2.txt'" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "# load data\n", "data1 = sc.textFile(file1)\n", "data2 = sc.textFile(file2)" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] }, { "data": { "text/plain": [ "['Apple,Amy',\n", " 'Butter,Bob',\n", " 'Cheese,Chucky',\n", " 'Dinkel,Dieter',\n", " 'Egg,Edward',\n", " 'Oxtail,Oscar',\n", " 'Anchovie,Alex',\n", " 'Avocado,Adam',\n", " 'Apple,Alex',\n", " 'Apple,Adam',\n", " 'Dinkel,Dieter',\n", " 'Doughboy,Pilsbury',\n", " 'McDonald,Ronald']" ] }, "execution_count": 8, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data1.collect()" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "file1: 13 lines\n" ] }, { "name": "stderr", "output_type": "stream", "text": [ " \r" ] } ], "source": [ "print(\"file1: %d lines\" % data1.count())" ] }, { "cell_type": "code", "execution_count": 10, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky']" ] }, "execution_count": 10, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data1.take(3)" ] }, { "cell_type": "code", "execution_count": 11, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['Wendy,', 'Doughboy,Pillsbury', 'McDonald,Ronald', 'Cheese,Chucky']" ] }, "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data2.collect()" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "file2: 4 lines\n" ] } ], "source": [ "print(\"file2: %d lines\" % data2.count())" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "scrolled": true }, "outputs": [ { "data": { "text/plain": [ "['Wendy,', 'Doughboy,Pillsbury', 'McDonald,Ronald']" ] }, "execution_count": 13, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data2.take(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Note: the following produces output on Jupyter notebook server!" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ "Cheese,Chucky\n", "Wendy,\n", "Doughboy,Pillsbury\n", "McDonald,Ronald\n" ] } ], "source": [ "# prints each element in the Jupyter notebook output\n", "data2.foreach(print)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## RDD Operations" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### map()\n", "Return a new RDD by applying a function to each element of this RDD.\n", "- apply an operation to every element of an RDD\n", "- return a new RDD that contains the results" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [], "source": [ "data = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "file1.txt MapPartitionsRDD[10] at textFile at NativeMethodAccessorImpl.java:0" ] }, "execution_count": 16, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky']" ] }, "execution_count": 17, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.take(3)" ] }, { "cell_type": "code", "execution_count": 18, "metadata": { "scrolled": false }, "outputs": [ { "data": { "text/plain": [ "[['Apple', 'Amy'], ['Butter', 'Bob'], ['Cheese', 'Chucky']]" ] }, "execution_count": 18, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.map(lambda line: line.split(',')).take(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### flatMap()\n", "Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.\n", "- apply an operation to the value of every element of an RDD\n", "- return a new RDD that contains the results after dropping the outermost container" ] }, { "cell_type": "code", "execution_count": 19, "metadata": {}, "outputs": [], "source": [ "data = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": 20, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky', 'Dinkel,Dieter']" ] }, "execution_count": 20, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.take(4)" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['Apple', 'Amy', 'Butter', 'Bob', 'Cheese', 'Chucky', 'Dinkel']" ] }, "execution_count": 21, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.flatMap(lambda line: line.split(',')).take(7)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### mapValues()\n", "Pass each value in the key-value pair RDD through a map function without changing the keys; this also retains the original RDD's partitioning.\n", "- apply an operation to the value of every element of an RDD\n", "- return a new RDD that contains the results\n", "\n", "Only works with pair RDDs." ] }, { "cell_type": "code", "execution_count": 22, "metadata": {}, "outputs": [], "source": [ "data = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": 23, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda line: line.split(','))" ] }, { "cell_type": "code", "execution_count": 24, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[['Apple', 'Amy'], ['Butter', 'Bob'], ['Cheese', 'Chucky']]" ] }, "execution_count": 24, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.take(3)" ] }, { "cell_type": "code", "execution_count": 25, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[['Apple', 'Amy'],\n", " ['Butter', 'Bob'],\n", " ['Cheese', 'Chucky'],\n", " ['Dinkel', 'Dieter'],\n", " ['Egg', 'Edward'],\n", " ['Oxtail', 'Oscar'],\n", " ['Anchovie', 'Alex'],\n", " ['Avocado', 'Adam'],\n", " ['Apple', 'Alex'],\n", " ['Apple', 'Adam'],\n", " ['Dinkel', 'Dieter'],\n", " ['Doughboy', 'Pilsbury'],\n", " ['McDonald', 'Ronald']]" ] }, "execution_count": 25, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.collect()" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda pair: (pair[0], pair[1]))" ] }, { "cell_type": "code", "execution_count": 27, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky')]" ] }, "execution_count": 27, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.take(3)" ] }, { "cell_type": "code", "execution_count": 28, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Apple', 'amy'), ('Butter', 'bob'), ('Cheese', 'chucky')]" ] }, "execution_count": 28, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.mapValues(lambda name: name.lower()).take(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### flatMapValues()\n", "Pass each value in the key-value pair RDD through a flatMap function without changing the keys; this also retains the original RDD's partitioning.\n", "- apply an operation to the value of every element of an RDD\n", "- return a new RDD that contains the results after removing the outermost container\n", "\n", "Only works with pair RDDs." ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [], "source": [ "data = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda line: line.split(','))" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda pair: (pair[0], pair[1]))" ] }, { "cell_type": "code", "execution_count": 32, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky')]" ] }, "execution_count": 32, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.take(3)" ] }, { "cell_type": "code", "execution_count": 33, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Apple', 'a'),\n", " ('Apple', 'm'),\n", " ('Apple', 'y'),\n", " ('Butter', 'b'),\n", " ('Butter', 'o'),\n", " ('Butter', 'b'),\n", " ('Cheese', 'c'),\n", " ('Cheese', 'h'),\n", " ('Cheese', 'u')]" ] }, "execution_count": 33, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.flatMapValues(lambda name: name.lower()).take(9)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### filter()\n", "Return a new RDD containing only the elements that satisfy a predicate.\n", "- return a new RDD that contains only the elements that pass a **filter operation**" ] }, { "cell_type": "code", "execution_count": 34, "metadata": {}, "outputs": [], "source": [ "data = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": 35, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky']" ] }, "execution_count": 35, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.take(3)" ] }, { "cell_type": "code", "execution_count": 36, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['Apple,Amy', 'Egg,Edward', 'Oxtail,Oscar']" ] }, "execution_count": 36, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.filter(lambda line: re.match(r'^[AEIOU]', line)).take(3)" ] }, { "cell_type": "code", "execution_count": 37, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['Apple,Amy',\n", " 'Egg,Edward',\n", " 'Oxtail,Oscar',\n", " 'Anchovie,Alex',\n", " 'Avocado,Adam',\n", " 'Apple,Alex',\n", " 'Apple,Adam']" ] }, "execution_count": 37, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.filter(lambda line: re.match(r'^[AEIOU]', line)).collect()" ] }, { "cell_type": "code", "execution_count": 38, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['Apple,Amy', 'Cheese,Chucky', 'Doughboy,Pilsbury']" ] }, "execution_count": 38, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.filter(lambda line: re.match(r'.+[y]$', line)).take(3)" ] }, { "cell_type": "code", "execution_count": 39, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['Anchovie,Alex', 'Apple,Alex']" ] }, "execution_count": 39, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.filter(lambda line: re.search(r'[x]$', line)).take(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### groupByKey()\n", "Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.\n", "- apply an operation to the value of every element of an RDD\n", "- return a new RDD that contains the results after removing the outermost container\n", "\n", "Only works with pair RDDs." ] }, { "cell_type": "code", "execution_count": 40, "metadata": {}, "outputs": [], "source": [ "data = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": 41, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda line: line.split(','))" ] }, { "cell_type": "code", "execution_count": 42, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[['Apple', 'Amy'], ['Butter', 'Bob'], ['Cheese', 'Chucky']]" ] }, "execution_count": 42, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.take(3)" ] }, { "cell_type": "code", "execution_count": 43, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda pair: (pair[0], pair[1]))" ] }, { "cell_type": "code", "execution_count": 44, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky')]" ] }, "execution_count": 44, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.take(3)" ] }, { "cell_type": "code", "execution_count": 45, "metadata": {}, "outputs": [ { "name": "stderr", "output_type": "stream", "text": [ " \r" ] }, { "data": { "text/plain": [ "[('Apple', )]" ] }, "execution_count": 45, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.groupByKey().take(1)" ] }, { "cell_type": "code", "execution_count": 46, "metadata": { "scrolled": false }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Apple: Amy,Alex,Adam\n" ] } ], "source": [ "for pair in data.groupByKey().take(1):\n", " print(\"%s: %s\" % (pair[0], \",\".join([n for n in pair[1]])))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### reduceByKey()\n", "Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.\n", "- combine elements of an RDD by key and then \n", "- apply a reduce operation to pairs of keys\n", "- until only a single key remains.\n", "- return the result in a new RDD" ] }, { "cell_type": "code", "execution_count": 47, "metadata": {}, "outputs": [], "source": [ "data = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": 48, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda line: line.split(\",\"))" ] }, { "cell_type": "code", "execution_count": 49, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda pair: (pair[0], pair[1]))" ] }, { "cell_type": "code", "execution_count": 50, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Apple', 'Amy'), ('Butter', 'Bob'), ('Cheese', 'Chucky')]" ] }, "execution_count": 50, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.take(3)" ] }, { "cell_type": "code", "execution_count": 51, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Apple', 'Amy:Alex:Adam'),\n", " ('Butter', 'Bob'),\n", " ('Dinkel', 'Dieter:Dieter'),\n", " ('Doughboy', 'Pilsbury'),\n", " ('Cheese', 'Chucky'),\n", " ('Egg', 'Edward')]" ] }, "execution_count": 51, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.reduceByKey(lambda v1, v2: v1 + \":\" + v2).take(6)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### sortBy()\n", "Sorts this RDD by the given keyfunc.\n", "- sort an RDD according to a sorting function\n", "- return the results in a new RDD" ] }, { "cell_type": "code", "execution_count": 52, "metadata": {}, "outputs": [], "source": [ "data = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": 53, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda line: line.split(\",\"))" ] }, { "cell_type": "code", "execution_count": 54, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda pair: (pair[0], pair[1]))" ] }, { "cell_type": "code", "execution_count": 55, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Apple', 'Amy'),\n", " ('Butter', 'Bob'),\n", " ('Cheese', 'Chucky'),\n", " ('Dinkel', 'Dieter'),\n", " ('Egg', 'Edward'),\n", " ('Oxtail', 'Oscar'),\n", " ('Anchovie', 'Alex'),\n", " ('Avocado', 'Adam'),\n", " ('Apple', 'Alex'),\n", " ('Apple', 'Adam'),\n", " ('Dinkel', 'Dieter'),\n", " ('Doughboy', 'Pilsbury'),\n", " ('McDonald', 'Ronald')]" ] }, "execution_count": 55, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.collect()" ] }, { "cell_type": "code", "execution_count": 56, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Egg', 'Edward'),\n", " ('Avocado', 'Adam'),\n", " ('Apple', 'Adam'),\n", " ('Cheese', 'Chucky'),\n", " ('Dinkel', 'Dieter'),\n", " ('Dinkel', 'Dieter'),\n", " ('Doughboy', 'Pilsbury'),\n", " ('Anchovie', 'Alex'),\n", " ('Apple', 'Alex'),\n", " ('Apple', 'Amy')]" ] }, "execution_count": 56, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.sortBy(lambda pair: pair[1][1]).take(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### sortByKey()\n", "Sorts this RDD, which is assumed to consist of (key, value) pairs.\n", "- sort an RDD according to the natural ordering of the keys\n", "- return the results in a new RDD" ] }, { "cell_type": "code", "execution_count": 57, "metadata": {}, "outputs": [], "source": [ "data = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": 58, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda line: line.split(\",\"))" ] }, { "cell_type": "code", "execution_count": 59, "metadata": {}, "outputs": [], "source": [ "data = data.map(lambda pair: (pair[0], pair[1]))" ] }, { "cell_type": "code", "execution_count": 60, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Apple', 'Amy'),\n", " ('Butter', 'Bob'),\n", " ('Cheese', 'Chucky'),\n", " ('Dinkel', 'Dieter'),\n", " ('Egg', 'Edward'),\n", " ('Oxtail', 'Oscar'),\n", " ('Anchovie', 'Alex'),\n", " ('Avocado', 'Adam'),\n", " ('Apple', 'Alex'),\n", " ('Apple', 'Adam'),\n", " ('Dinkel', 'Dieter'),\n", " ('Doughboy', 'Pilsbury'),\n", " ('McDonald', 'Ronald')]" ] }, "execution_count": 60, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.collect()" ] }, { "cell_type": "code", "execution_count": 61, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Anchovie', 'Alex'),\n", " ('Apple', 'Amy'),\n", " ('Apple', 'Alex'),\n", " ('Apple', 'Adam'),\n", " ('Avocado', 'Adam'),\n", " ('Butter', 'Bob')]" ] }, "execution_count": 61, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data.sortByKey().take(6)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### subtract()\n", "Return each value in self that is not contained in other.\n", "- return a new RDD that contains all the elements from the original RDD \n", "- that do not appear in a target RDD" ] }, { "cell_type": "code", "execution_count": 62, "metadata": {}, "outputs": [], "source": [ "data1 = sc.textFile(file1)" ] }, { "cell_type": "code", "execution_count": 63, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['Apple,Amy',\n", " 'Butter,Bob',\n", " 'Cheese,Chucky',\n", " 'Dinkel,Dieter',\n", " 'Egg,Edward',\n", " 'Oxtail,Oscar',\n", " 'Anchovie,Alex',\n", " 'Avocado,Adam',\n", " 'Apple,Alex',\n", " 'Apple,Adam',\n", " 'Dinkel,Dieter',\n", " 'Doughboy,Pilsbury',\n", " 'McDonald,Ronald']" ] }, "execution_count": 63, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data1.collect()" ] }, { "cell_type": "code", "execution_count": 64, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "13" ] }, "execution_count": 64, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data1.count()" ] }, { "cell_type": "code", "execution_count": 65, "metadata": {}, "outputs": [], "source": [ "data2 = sc.textFile(file2)" ] }, { "cell_type": "code", "execution_count": 66, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['Wendy,', 'Doughboy,Pillsbury', 'McDonald,Ronald', 'Cheese,Chucky']" ] }, "execution_count": 66, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data2.collect()" ] }, { "cell_type": "code", "execution_count": 67, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "4" ] }, "execution_count": 67, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data2.count()" ] }, { "cell_type": "code", "execution_count": 68, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['Egg,Edward',\n", " 'Doughboy,Pilsbury',\n", " 'Oxtail,Oscar',\n", " 'Apple,Alex',\n", " 'Apple,Amy',\n", " 'Butter,Bob',\n", " 'Anchovie,Alex',\n", " 'Avocado,Adam',\n", " 'Dinkel,Dieter',\n", " 'Dinkel,Dieter',\n", " 'Apple,Adam']" ] }, "execution_count": 68, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data1.subtract(data2).collect()" ] }, { "cell_type": "code", "execution_count": 69, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "11" ] }, "execution_count": 69, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data1.subtract(data2).count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### join()\n", "Return an RDD containing all pairs of elements with matching keys in self and other. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in self and (k, v2) is in other.\n", "- return a new RDD that contains all the elements from the original RDD\n", "- joined (inner join) with elements from the target RDD" ] }, { "cell_type": "code", "execution_count": 70, "metadata": {}, "outputs": [], "source": [ "data1 = sc.textFile(file1).map(lambda line: line.split(',')).map(lambda pair: (pair[0], pair[1]))" ] }, { "cell_type": "code", "execution_count": 71, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Apple', 'Amy'),\n", " ('Butter', 'Bob'),\n", " ('Cheese', 'Chucky'),\n", " ('Dinkel', 'Dieter'),\n", " ('Egg', 'Edward'),\n", " ('Oxtail', 'Oscar'),\n", " ('Anchovie', 'Alex'),\n", " ('Avocado', 'Adam'),\n", " ('Apple', 'Alex'),\n", " ('Apple', 'Adam'),\n", " ('Dinkel', 'Dieter'),\n", " ('Doughboy', 'Pilsbury'),\n", " ('McDonald', 'Ronald')]" ] }, "execution_count": 71, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data1.collect()" ] }, { "cell_type": "code", "execution_count": 72, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "13" ] }, "execution_count": 72, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data1.count()" ] }, { "cell_type": "code", "execution_count": 73, "metadata": {}, "outputs": [], "source": [ "data2 = sc.textFile(file2).map(lambda line: line.split(',')).map(lambda pair: (pair[0], pair[1]))" ] }, { "cell_type": "code", "execution_count": 74, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Wendy', ''),\n", " ('Doughboy', 'Pillsbury'),\n", " ('McDonald', 'Ronald'),\n", " ('Cheese', 'Chucky')]" ] }, "execution_count": 74, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data2.collect()" ] }, { "cell_type": "code", "execution_count": 75, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "4" ] }, "execution_count": 75, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data2.count()" ] }, { "cell_type": "code", "execution_count": 76, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Doughboy', ('Pilsbury', 'Pillsbury')),\n", " ('McDonald', ('Ronald', 'Ronald')),\n", " ('Cheese', ('Chucky', 'Chucky'))]" ] }, "execution_count": 76, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data1.join(data2).collect()" ] }, { "cell_type": "code", "execution_count": 77, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "3" ] }, "execution_count": 77, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data1.join(data2).count()" ] }, { "cell_type": "code", "execution_count": 78, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Dinkel', ('Dieter', None)), ('Dinkel', ('Dieter', None))]" ] }, "execution_count": 78, "metadata": {}, "output_type": "execute_result" } ], "source": [ "data1.fullOuterJoin(data2).take(2)" ] }, { "cell_type": "code", "execution_count": 79, "metadata": {}, "outputs": [], "source": [ "# stop Spark context\n", "sc.stop()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## MapReduce demo\n", "\n", "We will now count the occurences of each word. The typical \"Hello, world!\" app for Spark applications is known as word count. The map/reduce model is particularly well suited to applications like counting words in a document." ] }, { "cell_type": "code", "execution_count": 80, "metadata": {}, "outputs": [], "source": [ "# create a Spark context\n", "sc = SparkContext(conf=conf)" ] }, { "cell_type": "code", "execution_count": 81, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['Apple,Amy', 'Butter,Bob', 'Cheese,Chucky']" ] }, "execution_count": 81, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# read the target file into an RDD\n", "lines = sc.textFile(file1)\n", "lines.take(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `flatMap()` operation first converts each line into an array of words, and then makes\n", "each of the words an element in the new RDD." ] }, { "cell_type": "code", "execution_count": 82, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['Apple', 'Amy', 'Butter']" ] }, "execution_count": 82, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# split the lines into individual words\n", "words = lines.flatMap(lambda l: re.split(r'[^\\w]+', l))\n", "words.take(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `map()` operation replaces each word with a tuple of that word and the number 1. The\n", "pairs RDD is a pair RDD where the word is the key, and all of the values are the number 1." ] }, { "cell_type": "code", "execution_count": 83, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Apple', 1), ('Amy', 1), ('Butter', 1)]" ] }, "execution_count": 83, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# replace each word with a tuple of that word and the number 1\n", "pairs = words.map(lambda w: (w, 1))\n", "pairs.take(3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `reduceByKey()` operation keeps adding elements' values together until there are no\n", "more to add for each key (word)." ] }, { "cell_type": "code", "execution_count": 84, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Apple', 3), ('Amy', 1), ('Butter', 1)]" ] }, "execution_count": 84, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# group the elements of the RDD by key (word) and add up their values\n", "counts = pairs.reduceByKey(lambda n1, n2: n1 + n2)\n", "counts.take(3)" ] }, { "cell_type": "code", "execution_count": 85, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Apple', 3),\n", " ('Dinkel', 2),\n", " ('Alex', 2),\n", " ('Dieter', 2),\n", " ('Adam', 2),\n", " ('Amy', 1),\n", " ('Butter', 1),\n", " ('Chucky', 1),\n", " ('Edward', 1),\n", " ('Doughboy', 1)]" ] }, "execution_count": 85, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# sort the elements by values in descending order\n", "counts.sortBy(lambda pair: pair[1], ascending=False).take(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Simplify chained transformations\n", "\n", "It is good to know that the code above can also be written in the following way:" ] }, { "cell_type": "code", "execution_count": 86, "metadata": {}, "outputs": [], "source": [ "sorted_counts = (lines.flatMap(lambda l: re.split(r'[^\\w]+', l)) # words\n", " .map(lambda w: (w, 1)) # pairs\n", " .reduceByKey(lambda n1, n2: n1 + n2) # counts\n", " .sortBy(lambda pair: pair[1], ascending=False)) # sorted counts" ] }, { "cell_type": "code", "execution_count": 87, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('Apple', 3),\n", " ('Dinkel', 2),\n", " ('Alex', 2),\n", " ('Dieter', 2),\n", " ('Adam', 2),\n", " ('Amy', 1),\n", " ('Butter', 1),\n", " ('Chucky', 1),\n", " ('Edward', 1),\n", " ('Doughboy', 1)]" ] }, "execution_count": 87, "metadata": {}, "output_type": "execute_result" } ], "source": [ "sorted_counts.take(10)" ] }, { "cell_type": "code", "execution_count": 88, "metadata": {}, "outputs": [], "source": [ "# stop Spark context\n", "sc.stop()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" } }, "nbformat": 4, "nbformat_minor": 2 }