Learning models using the MLlib library¶
The goal of the exercise is to learn how to work with the MLlib library for creating and validating machine learning models.
MLlib is a distributed algorithm library that supports learning and validating predictive and descriptive models on large-scale data. MLLib provides algorithms for learning classification and prediction models, algorithms for clustering, generating association rules and frequent tuples, and collaborative filtering models for recommender systems. In addition to learning algorithms, the library also provides functions and objects for validating models and for exporting models in PMML (Predictive Model Markup Language).
!pip install findspark
!pip install pyspark
# creating a spark application
import findspark
findspark.init()
import pyspark
# import the SparkSession type from the 'pyspark.sql' module into the script
from pyspark.sql import SparkSession
# create the 'spark' object and set the name of the application as the 'appName' parameter (in a distributed environment it can run simultaneously
# several applications that need to be named so that we can distinguish them)
spark = SparkSession.builder.appName("mllib_example").getOrCreate()
sc = spark.sparkContext
# we can now use the 'spark' interface object to create and process data frames
In the first sample task, we will work with the Iris dataset. Iris is a simple data set for classification into three classes, examples describe three types of plants (irises) according to the dimensions of their flowers (four input numerical attributes).
# we will import the necessary types
from pyspark.sql import Row
import urllib
# we will download the data from the Internet and save it in the working directory
urllib.request.urlretrieve("http://people.tuke.sk/martin.sarnovsky/tsvd/files/iris.csv", "iris.csv")
# we load the data and remap it to objects of type 'Row'
raw_data = sc.textFile("iris.csv")
csv_data = raw_data.map(lambda x: x.split(","))
Using the take() operation, we can look at the first few records in the dataset.
csv_data.take(5)
In the following example, we will learn a linear SVM (Support Vector Machine) model. This model can only be used for classification into two classes, for the Iris dataset we will train a classifier that will distinguish the species iris-versicolor from other species.
So first we will have to transform the target attribute to binary.
# as class 1 we mark examples of the iris-versicolor species and as class 0 we mark all other examples
csv_data = csv_data.map(lambda line: [line[0], line[1], line[2], line[3],
1.0 if line[4] == "iris-versicolor" else 0.0])
Using the take() operation, we can look again at what a few examples from the transformed RDD look like.
csv_data.take(5)
We then create a data frame from the loaded RDD using the createDataFrame() operation. When converting from an RDD to a data frame, we can specify a lambda expression in the map() method, where we name and type the individual attributes.
df_data = csv_data.map(lambda line: Row(
petal_length = float(line[0]),
petal_width = float(line[1]),
sepal_length = float(line[2]),
sepal_width = float(line[3]),
label = line[4]))
df = spark.createDataFrame(df_data)
We can then apply all operations for Spark data frames to the df data frame. Using head() we can list the first few records.
df.head(5)
We can simply look at e.g. to distribute the values of the target attribute.
df.groupBy('label').count().show()
For model learning, it is necessary to transform the data into vectors. It is necessary to combine all input (predicting) attributes into one numerical vector using the 'VectorAssembler' object. It will create a features column in the data frame, which will represent a vector of predicting attribute values.
# we import Vector Assembler
from pyspark.ml.feature import VectorAssembler
# the following command combines all input attributes into a numeric vector, which it stores in a new column 'features'
vector_data = VectorAssembler(inputCols=["petal_length", "petal_width", "sepal_length", "sepal_width"],
outputCol="features").transform(df)
vector_data.head()
Now that we have the data in vector form, we can divide the input dataset into a training and a test set. We perform the division using the randomSplit() operation, whose parameter we specify the division ratio.
# we divide the data into a training (70%) and a test (30%) set by random selection
training_data, test_data = vector_data.randomSplit([0.7, 0.3], seed=123)
In 'training_data' and 'testing_data' we have stored training and testing sets. Now we can continue training the classification model. In the following example, we will train a linear Support Vector Classifier. Model training is similar to e.g. at the scikit-learn library. First, we create an object of the given model, where we specify the parameters of the model (input data, target attribute, hyperparameters of the model). We then apply the model to the training data using the fit() operation. We can use the trained model to classify the test data using the transform() operation.
# we import the necessary libraries
from pyspark.ml.classification import LinearSVC
# first we create the 'LinearSVC' object and set the parameters of the algorithm
svm_classifier = LinearSVC(
featuresCol="features", # parameter features - a data column containing a vector of input attributes
labelCol="label") # parameter labelCol - data column containing the target attribute (class indexes)
# we will train the model with the 'fit' function, to which we will sell the training data
svm_model = svm_classifier.fit(training_data)
# we will evaluate the accuracy of the classification on the test data using the 'transform' function
predictions = svm_model.transform(test_data)
Look at the predictions data frame. Note that after the classification of the test set, the original training data has been written into this data frame and new columns have been added that contain the predicted class and predicted probabilities. The predicted class is stored in the 'prediction' column. We can then count the number of misclassified examples by comparing where 'prediction' is not equal to the target attribute 'label'.
predictions.head()
test_error = predictions.filter(predictions["prediction"] != predictions["label"]).count() / float(test_data.count())
print("Testing error: {0:.4f}".format(test_error))
More information about algorithms and their settings can be found on the page - https://spark.apache.org/docs/latest/api/python/reference/pyspark.mllib.html (in English).
In the next example, we apply the classification models to the KDD Cup data from the previous exercise.
If you don't have the data in your work directory from past exercises, uncomment the line to download it.
# we will import the necessary modules
from pyspark.sql import Row
import urllib
# we download the data and load it as lists of strings
# urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
raw_data = sc.textFile("./kddcup.data_10_percent.gz")
csv_data = raw_data.map(lambda x: x.split(","))
We will create a data frame from the selected attributes - for the sake of clarity in this task, we will keep only the selected 5 predicting attributes. We will create a data frame from them using the createDataFrame() operation.
df_data = csv_data.map(lambda line: Row(
duration= float(line[0]),
protocol_type = line[1],
src_bytes = float(line[4]),
dst_bytes = float(line[5]),
land = float(line[6]),
attack_type = line[41]))
df = spark.createDataFrame(df_data)
# print the first record
df.first()
As we can see, the data frame contains, in addition to numeric, categorical variables. These must be transformed in a suitable way before the vectorization of the data itself. Let's look at their values.
df.groupBy('protocol_type').count().show()
df.groupBy('attack_type').count().show(40)
To transform numeric attributes into categorical ones, we can use several functions from MLlib (you should have tried them in the tasks from the previous exercises). For attributes where there is no arrangement, it is convenient to use OneHotEncoder, which transforms a categorical attribute into a set of binary attributes that correspond to its values, or StringIndexer, which implements simple indexing of categorical variables by assigning a numerical index to individual values. Care should be taken with attributes where such a transformation can introduce an unwanted arrangement.
from pyspark.ml.feature import StringIndexer
# first we create an index of values by calling the 'fit' function
attack_type_index = StringIndexer(inputCol="attack_type", outputCol="attack_type_index").fit(df)
# after applying the transformation, a new numeric attribute 'attack_type_index' is added to the data frame
df = attack_type_index.transform(df)
df.head()
# a list of nominal values ordered by the assigned indices can be obtained from the index object via the 'labels' attribute
# e.g. the following command will list the number of classes, i.e. number of values of target attribute 'attack_type'
print("Number of classes: {0}".format(len(attack_type_index.labels)))
With OneHotEncoder, however, we must first convert the attribute to a numeric one with StringIndexer and then encode it using OneHotEncoder.
from pyspark.ml.feature import OneHotEncoder
protocol_type_index = StringIndexer(inputCol="protocol_type", outputCol="protocol_type_index").fit(df)
df = protocol_type_index.transform(df)
encoder = OneHotEncoder(inputCol="protocol_type_index", outputCol="protocol_encoded").fit(df)
df = encoder.transform(df)
We will check the result of the operation - we will see that in the data frame, in addition to protocol_type_index (which is just a numerically encoded protocol_index attribute), there is also protocol_encoded, which is already a binary encoded attribute, in the form of a sparse vector.
df.head()
# we will remove the original nominal values (i.e. the data frame will further contain only numeric columns)
# df = df.drop("protocol_type")
# df = df.drop("attack_type")
# df.show()
We will convert the transformed data into vector form again using Vector Assembler. As parameters, we must correctly specify the input attributes - ie. numerical and transformed.
from pyspark.ml.feature import VectorAssembler
# the following command combines all input attributes into a numeric vector, which it stores in a new column 'features'
vector_data = VectorAssembler(inputCols=["dst_bytes", "duration", "land", "src_bytes", "protocol_encoded"],
outputCol="features").transform(df)
vector_data.head()
Now we divide the data into a training (80%) and a test (20%) set by random selection.
training_data, testing_data = vector_data.randomSplit([0.8, 0.2], seed=1234)
And now we will learn a classification model - in this case, a decision tree.
As in the previous example, we set the predictor attributes and the target attribute. Now let's try to set the hyperparameters of the model - max. depth and criterion for division.
from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
tree_classifier = DecisionTreeClassifier(
featuresCol="features", # data column containing vector of input attributes
labelCol="attack_type_index", # data column containing target attribute (class indexes)
impurity="entropy", # the information gain criterion is used to select the attributes when splitting
maxDepth=5) # limit the maximum depth of the generated tree
# we create a classification model by calling the 'fit' function on the training data
tree_model = tree_classifier.fit(training_data)
# we can save the created model to a file using the 'save' function
tree_model.save("decision_tree_1.model")
# you can reload the saved model from a file with the function 'DecisionTreeClassificationModel.load'
tree_model = DecisionTreeClassificationModel.load("decision_tree_1.model")
We can visualize the trained model - or extract classification rules from it, which we can print on the screen. We will use the toDebugString() function for this.
print(tree_model.toDebugString)
If we want to evaluate the model, we need to evaluate it on the test set. We will do this in the same way as in the previous example using transform() on the test data, from where we will get the predictions.
We can then use these to calculate the error (or other metrics).
# we will evaluate the accuracy of the classification on the test data using the 'transform' function
# after classification, new columns are added to the data frame that contain the predicted class and probabilities
predictions = tree_model.transform(testing_data)
# the predicted class is stored in the 'prediction' column, we count the number of misclassified examples
# where 'prediction' is not equal to target attribute 'attack_type_index'
test_error = predictions.filter(predictions["prediction"] != predictions["attack_type_index"]).count() / float(testing_data.count())
print("Testing error: {0:.4f}".format(test_error))
Task 5.1¶
Transform your KDD Cup task into a binary classification. Mark normal communication as class 0.0 and all attacks as class 1.0. Train a logistic regression model on the transformed data and evaluate its accuracy.
Task 5.2¶
Study the documentation for the OneVsRest object and create an LR (Logistic Regression) multi-class classifier, which you can test on the KDD Cup data.
Task 5.3¶
Train a decision tree model on Iris data. Use the predictions.stat.crosstab("prediction", "label").collect() operation on the data frame to compute the pivot table between the predicted class and the actual class. From the pivot table, calculate the accuracy for each class.