Data processing in the Apache Spark environment¶
!pip install findspark
!pip install pyspark # install and import the libraries needed for Apache spark
# creating a spark application
import findspark
findspark.init()
import pyspark
from pyspark import SparkContext
sc = SparkContext("local", "test app")
Working with RDD - Resilient Distributed Dataset¶
The basic object of the Apache Spark programming interface is the so-called SparkContext. In the PySpark interpreter, the context is automatically set in the sc variable.
The basic data type in Spark is RDD (Resilient Distributed Dataset) representing a large collection of elements that can be process in parallel on several processors or distributed on several computing nodes the programmer approaches the RDD as a local variable even if the individual operations on the data are performed distributed on multiple computers
You can create an RDD collection e.g. from the local array of elements using the parallelize
method.
data = ["spark", "rdd", "example", "sample", "example"]
rdd = sc.parallelize(data)
two types of commands can be executed over the RDD:
- actions return the resulting processing result (e.g. calculated number, etc.)
- transformations transform the original RDD collection into a new RDD collection, multiple transformations can be concatenated
transformations are not performed immediately after the command is entered in the code, chained transformations must be terminated by one of the actions and only when the resulting action is called, all transformations are successively applied to the data (so-called 'lazy execution').
The following commands list basic Spark actions¶
the 'count' action returns the number of elements in the collection
rdd.count() # = 5
if we want to get the elements of the RDD collection as an array of values, we can use the 'collect' action
rdd.collect() # = ["spark", "rdd", "example", "sample", "example"]
'first' returns only the first element of the RDD
rdd.first() # = "spark"
'take(n)' returns the first n elements of the RDD as an array of values
rdd.take(4) # = ["spark", "rdd", "example", "sample"]
an array of n randomly selected elements can be obtained with the action 'takeSample' the first parameter indicates whether it is a selection with repetition (True value) or without (False) optionally, we can also enter the initialization of the random number generator as the third parameter 'seed' (otherwise you can always get a different selection)
rdd.takeSample(True, 3)
using the 'reduce' action, you can gradually apply the specified function to all RDD values of the collection, which will reduce them to one value you can enter a simple function directly as a lambda expression the following call joins the values with the + operator, and since the values are strings, the result is a string that is created by joining all the elements
print(rdd.reduce(lambda x, y: x + y))
if we want to sequentially call the given function for each element of the RDD collection, we can use the 'foreach' action
# first we define a simple function that converts the string to uppercase and displays it on the screen
def print_upper(x):
print(x.upper())
# we call the function print_upper for each element of the RDD collection
rdd.foreach(print_upper)
if the RDD set contains key:value pairs, we can count the number of values for each key using the 'countByKey' action
# first we create an RDD with key:value pairs
kv_pairs = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("a", 1), ("b", 1), ("b", 1), ("b", 1), ("b", 1)])
# and we calculate the number of values for each key, the result of the operation is a map
kv_pairs.countByKey().items()
The following commands list basic Spark transformations¶
the 'map' transformation creates an RDD collection of elements that are created by successively applying the specified function to the elements of the original RDD collection The example below will create an RDD collection of pairs where the first component of the pair will be the original string from the 'rdd' collection and the second component its length
rdd2 = rdd.map(lambda x: (x, len(x)))
# elements of the transformed collection are obtained by the action 'collect'
rdd2.collect()
The 'flatMap' transformation works similarly to 'map', but for one element of the original RDD collection, several elements are generated of the transformed collection (the specified function should return an array of generated values)
# compare e.g. the following command will generate an RDD collection with 3 elements and each element is an array
sc.parallelize([1, 2, 3]).map(lambda x: [x, x, x]).collect()
# = [[1, 1, 1], [2, 2, 2], [3, 3, 3]]
# with a command that generates 3 (identical) transformed elements for each element of the original RDD collection, i.e. the resulting collection will be
# have 3x3=9 elements
sc.parallelize([2, 3, 4]).flatMap(lambda x: [x, x, x]).collect()
# = [2, 2, 2, 3, 3, 3, 4, 4, 4]
the 'filter' transformation creates a new RDD collection by selecting only elements from the original collection that meet the condition of the specified function
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
filtered_rdd = rdd.filter(lambda x: x % 2 == 0) # we only select even numbers
filtered_rdd.collect()
the 'sample' transformation creates an RDD collection with a random selection of elements from the original collection 'sample' has 3 arguments - the first specifies whether the selection is with or without repetition, the second specifies the size of the selection and the last initializing the random number generator
rdd = sc.parallelize(range(1, 10)) # we will generate a sequence of numbers from 1 to 10
sample_rdd = rdd.sample(True, 0.2) # randomly select 20% of elements with repetition
you can perform set operations on RDD collections, such as unification or penetration using transformations 'union' or 'intersection'
rdd1 = sc.parallelize(range(1, 15))
rdd2 = sc.parallelize(range(10, 21))
rdd1.union(rdd2).collect()
# = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
rdd1.intersection(rdd2).collect()
# = [10, 11, 12, 13, 14, 15]
if we have an RDD collection with key:value pairs, we can group the elements using the 'groupByKey' transformation, the result is RDD collection of pairs (key, value iterator), an iterator is a special type of Spark that allows sequentially traversing all grouped elements, iterator e.g. you can convert to a Python list with the 'list' function or iterate in a loop
# first we generate an RDD collection of pairs of type (first letter of the string, string)
rdd = sc.parallelize(["spark", "rdd", "example", "sample", "example"]).map(lambda word: (word[0], word))
# group the words according to the key (first letter)
group_rdd = rdd.groupByKey()
# 'group_rdd' is a collection of pairs (first letter, iterator of words starting with that letter)
# if we want to convert an iterator, ie the second component of the pair to a list, we have to apply the 'list' function to it, which we can write using
# transforms 'mapValues'
group_list = group_rdd.mapValues(lambda x: list(x))
# the result is an RDD collection of pairs of type (first letter, list of words starting with that letter)
group_list.collect()
the 'reduceByKey' transformation aggregates all values with the given key by the specified function, the elements of the resulting collection are pairs (key, aggregated value), e.g.:
kv_pairs = sc.parallelize([("a", 4), ("b", 2), ("a", 7), ("a", 4), ("b", 3)])
kv_pairs_count = kv_pairs.reduceByKey(lambda x, y: x + y) # contains elements ("a", 15), ("b", 5)
kv_pairs_count.collect()
if we want to sort the key:value RDD collection by key, we can use the 'sortByKey' transformation
kv_pairs.sortByKey().collect()
You can read more about operations on RDDs at https://spark.apache.org/docs/3.3.0/programming-guide.html#rdd-operations
In the following example, we apply RDD operations on data from a real KDD Cup dataset.¶
first we download the dataset file directly in Python from the Internet and save it in the working directory
import urllib.request
urllib.request.urlretrieve("https://peter.bednar.website.tuke.sk/tsvd/data/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
the data set contains records of network communication between devices individual communication links are characterized by a set of attributes (both nominal and numerical - see http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html), the last attribute is the type of communication ('normal.' describes standard communication, other various types of attacks)
# we load the data as an RDD collection from the file and display the first 5 records
rawdata = sc.textFile("./kddcup.data_10_percent.gz")
rawdata.take(5)
# [u'0,tcp,http,SF,181,5450,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,
# 9,9,1.00,0.00,0.11,0.00,0.00,0.00,0.00,0.00,normal.',
# u'0,tcp,http,SF,239,486,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,0.00,
# 19,19,1.00,0.00,0.05,0.00,0.00,0.00,0.00,0.00,normal.',
#u'0,tcp,http,SF,235,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,8,8,0.00,0.00,0.00,0.00,1.00,0.00,
#0.00,29,29,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
#u'0,tcp,http,SF,219,1337,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,
#0.00,0.00,39,39,1.00,0.00,0.03,0.00,0.00,0.00,0.00,0.00,normal.',
#u'0,tcp,http,SF,217,2032,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0,6,6,0.00,0.00,0.00,0.00,1.00,
#0.00,0.00,49,49,1.00,0.00,0.02,0.00,0.00,0.00,0.00,0.00,normal.']
# from the listing it can be seen that the data is initially represented as an RDD collection of strings read from the file line by line
# we can use 'count' to count the number of records
rawdata.count()
# we can use csv reader to split rows into values
import csv
rdd = sc.textFile("./kddcup.data_10_percent.gz")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))
# if the file contains a header on the first line, we can remove it using the 'filter' transformation
header = rdd.first()
rdd = rdd.filter(lambda x: x != header)
# the following commands count how many records have the target attribute with the value 'normal'
# first we filter out all lines that contain the string 'normal'
normal_records = rawdata.filter(lambda x: "normal" in x)
# and we will find out their count
print(normal_records.count())
# on the other hand, we get data that only contains data about non-standard communication, e.g. as follows:
attack_raw_data = rawdata.subtract(normal_records)
# using the 'map' transformation, we split the rows into an array of values with a comma as a separator
csv_data = rawdata.map(lambda line: line.split(","))
# we can use 'map' to rearrange the data, e.g. we will generate an RDD collection of type key:value where we will use as the key
# target attribute (attribute index 41) and as a value we will have an array of other attribute values (indexes 0 to 40)
def create_kv(line):
elems = line.split(",") # split the row on a substring of values
tag = elems[41] # tag is the target attribute
return (tag, elems[0:40]) # we return the pair t
# we apply the mapping function to the RDD collection
key_csv_data = rawdata.map(create_kv)
# with 'sample' we can randomly select a subset of the data, we will select 10% of the records without repetition (1234 is the initialization
# random number generator)
rawdata_sample = rawdata.sample(False, 0.1, 1234)
sample_size = rawdata_sample.count()
total_size = rawdata.count()
# we will print the number of selected records and the total number of records
print("sample size is {0} of {1}".format(sample_size, total_size))
Task 3.1¶
In the PySpark environment, write commands that:
- For the selected field of numerical values, they calculate the sum of even numbers.
- For the selected array of key-value pairs, where the value is a number, they calculate the sum of the squares of the values for each key.
Task 3.2¶
Write code in Python using Spark transformations and actions that counts the number of word occurrences in an input text file.
Task 3.3¶
For the data from the KDD Cup dataset from the example in the exercise, write the code that prints for the nominal attributes (attributes with indices 1,2,3) the number of their different values.
Task 3.4¶
For the data from the KDD Cup dataset from the exercise example, write code that calculates for all records with a target attribute (index 41) of normal the average and total connection time (attribute index 0).