Data preprocessing in the Apache Spark environment¶
The goal of the exercise is to learn how to pre-process data into a suitable representation so that they can be used as training or test data for model learning.
!pip install findspark
!pip install pyspark
Data frames¶
Data Frames are a distributed collection of data that is organized into named columns. Data frames in Spark can be created from various sources, e.g. from files, external databases, or by transforming from existing RDD collections. The basic interface for working with data frames is the SparkSession object, which also enables queries using the SQL language. In the PySpark interpreter environment, the SparkSession object is available directly in the spark variable. In the script, it is necessary to initialize it and enter at least the name of the application.
# create a spark app
import findspark
findspark.init()
import pyspark
# import the SparkSession type from the 'pyspark.sql' module into the script
from pyspark.sql import SparkSession
# create the 'spark' object and set the name of the application as the 'appName' parameter (in a distributed environment it can run simultaneously
# several applications that need to be named so that we can distinguish them)
spark = SparkSession.builder.appName("example28").getOrCreate()
sc = spark.sparkContext
# we can now use the 'spark' interface object to create and process data frames
In the following example, we apply operations on a data frame created from a real KDD Cup dataset.
the following commands preprocess the network communication data between devices from the previous exercise
# we will download the data from the last exercise from the Internet and save it in the working directory
import urllib.request
urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
# 'raw_data' is an RDD collection of rows (strings) read from a text file
raw_data = sc.textFile("./kddcup.data_10_percent.gz")
# data is represented in data frames by rows as objects of type 'Row'
from pyspark.sql import Row
# we read the text file line by line and divide each line into individual values (strings separated by a comma)
csv_data = raw_data.map(lambda line: line.split(","))
# for each line (list of separated strings) we create an object of type 'Row', enter the data as a list of named parameters, while
# the name of the parameter will correspond to the name of the data attribute, for numeric attributes we will convert values from a string to a decimal number
df_data = csv_data.map(lambda line: Row(
duration = float(line[0]),
protocol_type = line[1],
service = line[2],
flag = line[3],
src_bytes = float(line[4]),
dst_bytes = float(line[5]),
land = float(line[6]),
wrong_fragment = float(line[7]),
urgent = float(line[8]),
hot = float(line[9]),
num_failed_logins = float(line[10]),
logged_in = float(line[11]),
num_compromised = float(line[12]),
root_shell = float(line[13]),
su_attempted = float(line[14]),
num_root = float(line[15]),
num_file_creations = float(line[16]),
num_shells = float(line[17]),
num_access_files = float(line[18]),
num_outbound_cmds = float(line[19]),
is_host_login = float(line[20]),
is_guest_login = float(line[21]),
count = float(line[22]),
srv_count = float(line[23]),
serror_rate = float(line[24]),
srv_serror_rate = float(line[25]),
rerror_rate = float(line[26]),
srv_rerror_rate = float(line[27]),
same_srv_rate = float(line[28]),
diff_srv_rate = float(line[29]),
srv_diff_host_rate = float(line[30]),
dst_host_count = float(line[31]),
dst_host_srv_count = float(line[32]),
dst_host_same_srv_rate = float(line[33]),
dst_host_diff_srv_rate = float(line[34]),
dst_host_same_src_port_rate = float(line[35]),
dst_host_srv_diff_host_rate = float(line[36]),
dst_host_serror_rate = float(line[37]),
dst_host_srv_serror_rate = float(line[38]),
dst_host_rerror_rate = float(line[39]),
dst_host_srv_rerror_rate = float(line[40]),
attack_type = line[41]))
# we can then create a data frame from an RDD collection of 'Row' type objects
df = spark.createDataFrame(df_data)
# we can then create a data frame from an RDD collection of 'Row' type objects
df = spark.createDataFrame(df_data)
# we can use various operations on the data frame object similar to working with RDD collections
# the 'take(n)' operation returns the first n records
df.take(5)
# the 'show()' operation prints the data as a text table, it is optionally possible to enter the number of the first displayed lines
df.show(5)
# we can easily filter the data by rows according to the specified condition, e.g. we filter out the records with normal communication and display them
# their number, we access individual data attributes similarly to maps via their name in square brackets
normal_records = df.filter(df["attack_type"] == "normal.")
# the output of the operation is a new data frame to which we can apply other operations (we can chain the call of several operations)
# e.g. the 'count' method returns the total number of records in the data frame
normal_records.count()
# similarly, you can select only some attributes (columns) into the new frame, e.g. the following command only selects into the new frame
# attributes 'duration' and 'attack_type'
duration_and_type = df.select("duration", "attack_type")
# you can group the data according to the specified attribute, e.g. the following command groups the data by attack type and calculates for each type
# number of entries, the result is a frame with two columns: 'attack_type' and 'count'
type_counts = df.groupBy("attack_type").count()
type_counts.show()
# you can sort the data with the 'orderBy' command, the following command sorts the aggregated values first in descending order by count and then
# ascending alphabetically by type name
type_counts.orderBy(["count", "attack_type"], ascending=[0, 1]).show()
# you can use the 'describe' operation to calculate basic statistics for numeric attributes
stats_df = df.describe(["duration", "dst_bytes"])
# property of the object 'dtypes' contains a field with the names of attributes and their type (i.e. the data frame 'stats_df' consists of three
# attributes: 'summary', 'duration' and 'dst_type' and all are of type string
stats_df.dtypes
# the 'collect' method converts all rows of the data frame to a list of type 'Row' (use this method only for small data
# sets, e.g. only for aggregated results)
stats_list = stats_df.collect()
# we can access objects of type 'Row' in a similar way to a map, the next cycle will print a similar output as the 'show' method
for row in stats_list:
print("{0:<20}\t{1:<20}\t{2:<20}".format(row["summary"], row["duration"], row["dst_bytes"]) )
# operations 'stat.cov' and 'stat.corr' calculate the covariance, respectively correlation (Pearson's correlation coefficient) between the two
# numeric attributes
print(df.stat.cov("src_bytes", "dst_bytes"))
print(df.stat.corr("src_bytes", "dst_bytes"))
# you can calculate a pivot table for nominal attributes
df.stat.crosstab("attack_type", "protocol_type").show()
# you can query data frames directly using SQL
# first it is necessary to register the data frame as a temporary table under the chosen name
df.registerTempTable("attacks")
# You can evaluate the SQL command over the data using the 'SparkSession' object, the result of the query is a new data frame that you can
# apply additional operations
selected_records = spark.sql('SELECT * FROM attacks WHERE duration > 0 AND attack_type = "normal."')
selected_records.show(5)
# you can register multiple tables at once, which you can join via SQL JOIN
# you can save data frames e.g. to a parquet file, which you can then efficiently reload
# 'write' interface also supports other formats (e.g. CSV, JSON, or ORC)
# the following example saves the data frame 'selected_records' to the file 'selected_records.parquet' in the current working directory
# (since the data can be processed in parallel, a subdirectory named 'selected_records.parquet' is created in the working directory in which
# individual parts of the data frame will be stored in separate files)
selected_records.write.parquet("selected_records.parquet")
# you can reload the data using the 'SparkSession.read' interface
selected_records2 = spark.read.parquet("selected_records.parquet")
selected_records2.show(5)
You can find more information about data frames on the page - https://spark.apache.org/docs/2.2.0/sql-programming-guide.html (in English).
Task 4.1¶
Write a function to calculate descriptive characteristics for any class specified as a parameter. That is write a function label_summary(df, label) to be able to calculate statistics for any specified class (eg for the class normal. like this normal_stats = label_summary(df, "normal.")).
Task 4.2¶
Study the documentation ( https://spark.apache.org/docs/2.2.0/ml-features.html ) for Bucketizer from MLlib, which is used to transform numeric (continuous) attributes to categorical (nominal). It divides the numerical values of the selected attribute into intervals (buckets) to which it assigns categorical values. Choose any numeric attribute from the dataset from the exercise and transform it using Bucketizer to a nominal one. Try to choose wisely
Task 4.3¶
Study the documentation ( https://spark.apache.org/docs/2.2.0/ml-features.html ) for StandardScaler. StandardScaler is used to normalize the values of the selected numeric attribute. Choose any numeric attribute from the dataset from the exercise and normalize it using StandardScaler.
Task 4.4¶
Study the documentation ( https://spark.apache.org/docs/2.2.0/ml-features.html ) for OneHotEncoder and StringIndexer. Both are used to index categorical (nominal) attributes, i.e. to transform them into numerical values. In addition to the need to transform some attributes, indexing is necessary for training machine learning models (which must work with numeric vectors). Apply both methods to the selected categorical attribute from the dataset from the exercise. Compare the results of indexing the selected attribute in both ways.