Learning models using the MLlib II library¶
The goal of the exercise is to learn how to work with the MLlib library for creating and validating machine learning models.
The goal of this exercise is to demonstrate the use of clustering models and MLlib Pipelines.
At the beginning, as in the previous notebooks, we will configure the environment and the Spark application.
!pip install findspark
!pip install pyspark
# create spark app
import findspark
findspark.init()
import pyspark
# import SparkSession type from the ‘pyspark.sql‘ module
from pyspark.sql import SparkSession
# create the 'spark' object and set the name of the application as the 'appName' parameter (in a distributed environment it can run simultaneously
# multiple applications that need to be named so that we can distinguish them)
spark = SparkSession.builder.appName("mllib_example_2").getOrCreate()
sc = spark.sparkContext
# we can now use the 'spark' interface object to create and process data frames
Clustering¶
The first simple example demonstrates the data clustering using the K-Means algorithm.
The data processing procedure is almost the same as in the classification task. We will also use Iris data, but in this case we will use only 4 predicting attributes (in descriptive modeling, we do not need the target feature) and transform it into a vector dataset. It is also not necessary to divide the set into training and testing.
from pyspark.sql import Row
raw_data = sc.textFile("iris.csv")
csv_data = raw_data.map(lambda x: x.split(","))
csv_data = csv_data.map(lambda line: [line[0], line[1], line[2], line[3]])
df_data = csv_data.map(lambda line: Row(
petal_length = float(line[0]),
petal_width = float(line[1]),
sepal_length = float(line[2]),
sepal_width = float(line[3])))
df = spark.createDataFrame(df_data)
from pyspark.ml.feature import VectorAssembler
vector_data = VectorAssembler(inputCols=["petal_length", "petal_width", "sepal_length", "sepal_width"], outputCol="features").transform(df)
The clustering models are trained in the same fashion as the classification models. As a parameter, we specify a vector of features and a set of hyperparameters of the model. In this case, we will do clustering using the K-Means algorithm into 3 clusters.
from pyspark.ml.clustering import KMeans
kmeans = KMeans(featuresCol="features", k=3, seed=1234)
kmeans_model = kmeans.fit(vector_data)
If we want to predict the cluster assignment for the new/previously unknown samples, we use the transform() operation (similarly to predictive tasks). When we look at the data frame created in this way, it also contains a prediction column, which contains the ID of the cluster to which the given example belongs.
# the 'transform' function during clustering adds to the data frame the indexes of the clusters in which the tested samples were included
clusters = kmeans_model.transform(vector_data)
clusters.show()
With k-means, clusters are represented by vectors of cluster representatives - centroids, which we can obtain from the model using the clusterCenters() operation. The following command prints the centroid for the first cluster (index 0).
print(kmeans_model.clusterCenters()[0])
ML Pipelines¶
Spark also includes ML Pipelines - a higher-level API that allows you to create "pipelines" - workflows of analytical tasks. These workflows consist of individual stages ("stages"), which represent individual steps of the entire analytical workflow.
In the following example, we will show how to create a simple workflow of an analytical task, which consists of several steps of preprocessing, modeling and model evaluation.
# import the types
from pyspark.sql.types import StructType,StructField, StringType, DoubleType
# define the data scheme
schema = StructType([ \
StructField("petal_length", DoubleType(),True), \
StructField("petal_width", DoubleType(),True), \
StructField("sepal_length", DoubleType(),True), \
StructField("sepal_width", DoubleType(), True), \
StructField("label", StringType(), True), \
])
# we will also use the iris data from the previous exercise
# this time we'll just use the dataframe API and load the CSV directly into the dataframe
# if we have a csv file with a header, we use header=True
df = spark.read.csv('iris.csv', schema=schema, header=False)
df.head()
Using the dataframe API, we can conveniently look at different groupings and easily calculate descriptive characteristics for individual attributes.
df.select('petal_width', 'petal_length').describe().show()
df.groupBy('label').count().show()
We first divide the input data into a training and a test set
train_data, test_data = df.randomSplit([0.7, 0.3], seed=123)
train_data.show()
We will now create an analytical Pipeline consisting of several stages:
- index the target attribute
- vectorize predicting attributes
- normalize predicting attributes
- train the Logistic Regression classifier on given data
Then we train this Pipeline and test the model on the test data
# import necessary types
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
# define the stage 1 : target feature transformation using the StringIndexeru
stage_1 = StringIndexer(inputCol= 'label', outputCol= 'label_index')
# define the stage 3 : transform the data to vectors using Vector Assembleru
stage_2 = VectorAssembler(inputCols=['petal_length', 'petal_width', 'sepal_length', 'sepal_width'],
outputCol='features')
# define the stage 2 : transform the predictive features using StandardScaleru
stage_3 = StandardScaler(inputCol = 'features', outputCol = 'features_scaled')
# define the stage 4 : train the tree classifier
stage_4 = LogisticRegression(featuresCol='features_scaled', labelCol='label_index')
# define the pipeline
pipeline = Pipeline(stages=[stage_1, stage_2, stage_3, stage_4])
# train/test the pipeline on a given data
pipeline_model = pipeline.fit(train_data)
df_updated = pipeline_model.transform(test_data)
df_updated.show()
Using the Pipelines, we can easily create workflows in which we use advanced methods of finding model hyperparameters, such as cross validation.
The sample below demonstrates optimizing the hyperparameters of a decision tree model on Iris data using Pipelines.
# import the types
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# define the stage 1 : target feature transformation using the StringIndexeru
stage_1 = StringIndexer(inputCol= 'label', outputCol= 'label_index')
# define the stage 3 : transform the data to vectors using Vector Assembleru
stage_2 = VectorAssembler(inputCols=['petal_length', 'petal_width', 'sepal_length', 'sepal_width'],
outputCol='features')
# define the stage 2 : transform the predictive features using StandardScaleruu
stage_3 = StandardScaler(inputCol = 'features', outputCol = 'features_scaled')
# define the stage 4 : train the tree classifier
stage_4 = DecisionTreeClassifier(featuresCol= 'features_scaled', labelCol='label_index')
# define the pipeline
pipeline = Pipeline(stages=[stage_1, stage_2, stage_3, stage_4])
# to find the optimal hyperparameter values of the model by cross-validation, we set the parameters and values that we will examine
# for the tree it will be impurities and its maximum depth, we specify the range of parameters
paramGrid = ParamGridBuilder().addGrid(stage_4.impurity, ['entropy', 'gini']).addGrid(stage_4.maxDepth, [2, 3, 4]).build()
# set up cross-validation - the estimator will be a pipeline
# pay attention to setting the value of the target attribute in the evaluator (implicitly uses the label column) and setting the metric of the evaluator
crossVal = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=MulticlassClassificationEvaluator(labelCol='label_index', metricName='f1'), numFolds=10)
# train the CV model on the training data
cvModel = crossVal.fit(train_data)
We can view the average metrics achieved during cross-validation using avgMetrics. In this case, we have a definition evaluator that uses the success metric (f1), so the average metric will represent that value.
cvModel.avgMetrics
If we want an overview of the models we evaluated during cross-validation and the combinations of their parameters, we can use getEstimatorParamMaps() for this. The output will be a map of all parameters used in model training.
cvModel.getEstimatorParamMaps()
When solving analytical tasks, we will most often be interested in which combination of parameters was the most suitable with respect to the metric specified in the evaluator and which is the ideal combination of hyperparameters of the model.
We can get the best of the models from cross-validation using bestModel. If we use Pipelines in which we have several steps and we have validated the entire pipeline, we also need to specify the step of the given pipeline. In this case, we will look at the fourth step in the sequence, where we validated the classification model.
# we can save the best model from the defined pipeline step and use it as standard MLlib models
bestTreeModel = cvModel.bestModel.stages[3]
If we are interested in the hyperparameters of the best model, we can obtain them using the functions for the defined parameter types for the given model. In this case, we were looking for the best combination of maxDepth and impurity parameters, to which the functions getMaxDepth() and getImpurity() correspond.
print(bestTreeModel.getMaxDepth())
print(bestTreeModel.getImpurity())
We can then evaluate the selected model on test data. Since we used Pipeline, we also need to apply the test data to the best pipeline.
# store the best Pipeline in the bestModel (with the best performing classifier)
bestModel = cvModel.bestModel
# apply the test data
predictions = bestModel.transform(test_data)
in predictions, the predicted values (in addition to the original and transformed dataset) will be stored.
predictions.show()
And from them we can then calculate various metrics for evaluating the quality of the classification. Compared to the approach from the previous exercise, we will now show how to calculate the selected metrics using MLlib functions.
# import the types for a particular classification task
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# calculate the metrics
evaluator_a = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="f1")
evaluator_r = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="weightedRecall")
evaluator_p = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="weightedPrecision")
print("accuracy :{}".format(evaluator_a.evaluate(predictions)))
print("f1 :{}".format(evaluator_f1.evaluate(predictions)))
print("weighted recall :{}".format(evaluator_r.evaluate(predictions)))
print("weighted precision :{}".format(evaluator_p.evaluate(predictions)))
If we would like to visualize the confusion matrix, it is necessary to note that some MLlib metrics (such as the confusion matrix) are implemented in the RDD API (not in the dataframe API). Therefore, it is necessary to transform the vectors of predictions and actual values of the target attribute into an RDD and thus calculate them.
from pyspark.mllib.evaluation import MulticlassMetrics
# important: need to cast to float type, and order by prediction, else it won't work
# preds_and_labels = predictions.select(['predictions','d']).withColumn('label_index', F.col('d').cast(FloatType())).orderBy('prediction')
# we select only the prediction and indexed target columns from the prediction data frameo
preds_and_labels = predictions.select(['prediction','label_index'])
# use the MulticlassMetrics - but from the RDD MLlib API
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
# compute the confusion matrix
print(metrics.confusionMatrix().toArray())
In the calculated confusion matrix, the values of the target attribute are ordered from the lowest value to the highest. If we want to find out which corresponds to which original, non-indexed value, we have to transform them to sleep. We can do this using the IndexToString() transformation.
from pyspark.ml.feature import IndexToString
index_to_string = IndexToString(inputCol="label_index", outputCol="categoryValue")
index_to_string.transform(preds_and_labels).drop("prediction").distinct().show()
Task 6.1¶
Transform your KDD Cup task into a binary classification. Mark normal communication as class 0.0 and all attacks as class 1.0. Encode categorical attributes appropriately. Train a logistic regression model on the transformed data and find the best combination of hyperparameters (eg maxIter, regParam, etc. - see documentation) using 10-fold cross-validation. Evaluate its accuracy using metrics for binary classification. Implement this task using ML Pipelines.