Cvičenie 10 - Učenie modelov pomocou knižnice MLlib II.¶
Cieľom cvičenia je naučiť sa pracovať s knižnicou MLlib pre vytváranie a validovanie modelov strojového učenia.
Náplňou tohoto cvičenia je ukázať si použitie zhlukovacích modelov a MLlib Pipelines.
Na úvod rovnako ako v predoškých cvičeniach nakonfigurujeme prostredie a Spark aplikáciu.
!pip install findspark
!pip install pyspark
# vytvorenie spark aplikacie
import findspark
findspark.init()
import pyspark
# do skriptu si naimportujeme typ SparkSession z modulu ‘pyspark.sql‘
from pyspark.sql import SparkSession
# vytvoríme objekt ‘spark‘ a ako parameter ‘appName‘ nastavíme názov aplikácie (v distribuovanom prostredí môže naraz bežať
# viacero aplikácií, ktoré je potrebné pomenovať aby sme ich vedeli rozlíšiť)
spark = SparkSession.builder.appName("mllib_example_2").getOrCreate()
sc = spark.sparkContext
# ďalej už môžeme používať objekt rozhrania ‘spark‘ na vytvorenie a spracovanie dátových rámcov
Zhlukovanie¶
Prvý jednoduchý príklad demonštruje úlohu zhlukovania dát pomocou algoritmu K-Means.
Postup spracovania dát je takmer rovnaký ako v prípade klasifikácie. Použijeme rovnako dáta Iris, ale v tomto prípade použijeme iba 4 predikujúce atribúty (robíme popisné modelovanie, cieľový atribút nepotrebujeme) a transformujeme do dataset vektorovej podoby. Rovnako nie je nutné deliť množinu na trénovaciu a testovaciu.
from pyspark.sql import Row
raw_data = sc.textFile("iris.csv")
csv_data = raw_data.map(lambda x: x.split(","))
csv_data = csv_data.map(lambda line: [line[0], line[1], line[2], line[3]])
df_data = csv_data.map(lambda line: Row(
petal_length = float(line[0]),
petal_width = float(line[1]),
sepal_length = float(line[2]),
sepal_width = float(line[3])))
df = spark.createDataFrame(df_data)
from pyspark.ml.feature import VectorAssembler
vector_data = VectorAssembler(inputCols=["petal_length", "petal_width", "sepal_length", "sepal_width"], outputCol="features").transform(df)
Zhlukovacie modely vytvárame potom rovnako ako tie klasifikačné. Ako parameter špecifikujeme vektor príznakov a množinu hyperparametrov modelu. V tomto prípade budeme robiť zhlukovanie pomocou algoritmu K-Means do 3 zhlukov.
from pyspark.ml.clustering import KMeans
kmeans = KMeans(featuresCol="features", k=3, seed=1234)
kmeans_model = kmeans.fit(vector_data)
Ak chceme pre príklady z odvodiť ich príslušnosť k zhluku, použijeme na to (podobne ako pri prediktívnych úlohách) operáciu transform(). Ked sa pozrieme na takto vytvorený dátový rámec, tak ten obsahuje navyše stĺpec prediction, ktorý obsahuje ID zhluku, do ktorého daný príklad patrí.
# funkcia ‘transform’ pri zhlukovaní pridá do dátového rámca indexy zhlukov do ktorých boli zaradené testované príklady
clusters = kmeans_model.transform(vector_data)
clusters.show()
Pri k-means sú zhluky reprezentované vektormi reprezentantov zhlukov - centroidov, ktoré vieme získať z modelu pomocou operácie clusterCenters(). Nasledujúci príkaz vypíše centroid pre prvý zhluk (index 0).
print(kmeans_model.clusterCenters()[0])
ML Pipelines¶
Spark obsahuje aj ML Pipelines - API vyššej úrovne, ktoré umožňuje vytvárať "pipelines" - pracovné toky analytických úloh. Tieto pracovné toky pozostávajú z jednotlivých etáp ("stages"), ktoré predstavujú jednotlivé kroky celého analytického workflowu.
V nasledujúcej ukážke si ukážeme ako vytvoriť jednoduchý workflow analytickej úlohy, ktorá pozostáva z niekoľkých krokov predspracovania, modelovania a vyhodnotenia modelu.
# importujeme potrebné typy
from pyspark.sql.types import StructType,StructField, StringType, DoubleType
# definujeme schému načítavaných dát
schema = StructType([ \
StructField("petal_length", DoubleType(),True), \
StructField("petal_width", DoubleType(),True), \
StructField("sepal_length", DoubleType(),True), \
StructField("sepal_width", DoubleType(), True), \
StructField("label", StringType(), True), \
])
# rovnako použijeme dáta iris z minulého cvičenia
# tentoraz použijeme iba dataframe API a načítame CSV priamo do dátového rámca
# v prípade, že máme csv súbor s hlavičkou, použijeme header=True
df = spark.read.csv('iris.csv', schema=schema, header=False)
df.head()
Pomocou dataframe API sa môžeme pohodlne pozrieť na rôzne zoskupenia a jednoducho spočítať popisné charakteristiky pre jednotlivé atribúty.
df.select('petal_width', 'petal_length').describe().show()
df.groupBy('label').count().show()
Rozdelíme si najprv vstupné dáta na trénovaciu a testovaciu množinu
train_data, test_data = df.randomSplit([0.7, 0.3], seed=123)
train_data.show()
Vytvoríme teraz analytický Pipeline - ten bude pozostávať z niekoľkých krokov (stages):
- zaindexujeme cieľový atribút
- vektorizujeme predikujúce atribúty
- normalizujeme predikujúce atribúty
- naučíme na dátach klasifikátor Logistickej Regresie
Potom tento Pipeline natrénujeme a otestujeme model na testovacích dátach.
# importujeme potrebné typy
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
# definujeme stage 1 : transformácia cieľového atribútu pomocou StringIndexeru
stage_1 = StringIndexer(inputCol= 'label', outputCol= 'label_index')
# definujeme stage 3 : transformujeme dáta na vektory pomocou Vector Assembleru
stage_2 = VectorAssembler(inputCols=['petal_length', 'petal_width', 'sepal_length', 'sepal_width'],
outputCol='features')
# definujeme stage 2 : transformácia predikujúcich atribútov pomocou StandardScaleru
stage_3 = StandardScaler(inputCol = 'features', outputCol = 'features_scaled')
# definujeme stage 4 : naučíme stromový klasifikátor
stage_4 = LogisticRegression(featuresCol='features_scaled', labelCol='label_index')
# nadefinujeme pipeline
pipeline = Pipeline(stages=[stage_1, stage_2, stage_3, stage_4])
# naučíme pipeline na dátovom rámci
pipeline_model = pipeline.fit(train_data)
df_updated = pipeline_model.transform(test_data)
df_updated.show()
Pomocou Pipelines môžeme jednoducho vytvárať aj workflowy, v ktorých použijeme pokročilé metódy hľadania hyperparametrov modelu, ako je napr. krížová validácia.
Ukážka nižšie demonštruje optimalizáciu hyperparametrov modelu rozhodovacieho stromu na Iris dátach pomocou Pipelines.
# importujeme typy potrebné pre modelovanie, krížovú validáciu a vyhodnotenie
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
# definujeme stage 1 : transformácia cieľového atribútu pomocou StringIndexeru
stage_1 = StringIndexer(inputCol= 'label', outputCol= 'label_index')
# definujeme stage 3 : transformujeme dáta na vektory pomocou Vector Assembleru
stage_2 = VectorAssembler(inputCols=['petal_length', 'petal_width', 'sepal_length', 'sepal_width'],
outputCol='features')
# definujeme stage 2 : transformácia predikujúcich atribútov pomocou StandardScaleru
stage_3 = StandardScaler(inputCol = 'features', outputCol = 'features_scaled')
# definujeme stage 4 : naučíme stromový klasifikátor
stage_4 = DecisionTreeClassifier(featuresCol= 'features_scaled', labelCol='label_index')
# definujeme celú analytickú pipeline
pipeline = Pipeline(stages=[stage_1, stage_2, stage_3, stage_4])
# pre hľadanie hyperparametrov modelu krížovou validáciou nastavíme parametre a hodnoty, ktoré budeme skúmať
# pre strom to bude impurity a jeho maximálna hĺbka, špecifikujeme rozsah parametrov
paramGrid = ParamGridBuilder().addGrid(stage_4.impurity, ['entropy', 'gini']).addGrid(stage_4.maxDepth, [2, 3, 4]).build()
# nastavíme krížovú validáciu - estimátor bude pipeline
# pozor na nastavenie hodnoty cieľového atribútu v evaluátore (implicitne použije label stĺpec) a nastavenie metriky evaluátora
crossVal = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=MulticlassClassificationEvaluator(labelCol='label_index', metricName='f1'), numFolds=10)
# Naučíme CV model na trénovacích dátach
cvModel = crossVal.fit(train_data)
Priemerné dosiahnuté metriky počas krížovej validácie si môžeme pozrieť pomocou avgMetrics. V tomto prípade máme definovný evaluátor, ktorý používa metriku úspešnosti (f1), takže priemerná metrika bude predstavovať túto hodnotu.
cvModel.avgMetrics
Ak chceme prehľad modelov, ktoré sme počas krížovej validácie vyhodnocovali a kombinácie ich parametrov, môžeme na to použiť getEstimatorParamMaps(). Výstupom bude mapa všetkých parametrov použitých pri trénovaní modelov.
cvModel.getEstimatorParamMaps()
Pri riešení úloh nás najčastejšie bude zaujímať, ktorá kombinácia parametrov bola najvhodnejšia vzhľadom na metriku špecifikovanú v evaluátore a ktorá je ideálna kombinácia hyperparametrov modelu.
Najlepší z modelov môžeme získať z krížovej validácie pomocou bestModel. Ak používame Pipelines, v ktorých máme viacero krokov a validovali sme celú pipeline, potrebujeme špecifikovať aj krok danej pipeline. V tomto prípade sa pozrieme na v poradí štvrtý krok, kde sme validovali klasifikačný model.
# najlepší model z definovaného kroku pipeline si môžeme uložiť a môžeme ho používať ako štandardné MLlib modely
bestTreeModel = cvModel.bestModel.stages[3]
Ak nás zaujímajú hyperparametre najlepšieho modelu, môžeme ich získať pomocou funkcií pre definované typy parametrov pre daný model. V tomto prípade sme hľadali najlepšiu kombináciu parametrov maxDepth a impurity, ktorým zodpovedajú funkcie getMaxDepth() a getImpurity().
print(bestTreeModel.getMaxDepth())
print(bestTreeModel.getImpurity())
Zvolený model potom môžeme evaluovať na testovacích dátach. Keďže sme použili Pipeline, potrebujeme aj testovacie dáta aplikovať na tú najlepšiu pipeline.
# do bestModel si uložíme najlepšiu Pipeline (s najlepším klasifikátorom)
bestModel = cvModel.bestModel
# a aplikujeme na ňu testovacie dáta
predictions = bestModel.transform(test_data)
V predictions potom budú uložené (okrem pôvodného a trasnformovaného datasetu) aj predikcie.
predictions.show()
A z nich potom už môžeme počítať rôzne metriky pre vyhodnotenie kvality klasifikácie. Oproti prístupu z minulého cvičenia si teraz ukážeme ako spočítame zvolené metriky pomocou MLlib funkcií.
# importujeme triedu pre daný typ klasifikácie
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
# a spočítame metriky
evaluator_a = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="accuracy")
evaluator_f1 = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="f1")
evaluator_r = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="weightedRecall")
evaluator_p = MulticlassClassificationEvaluator(labelCol="label_index", predictionCol="prediction", metricName="weightedPrecision")
print("accuracy :{}".format(evaluator_a.evaluate(predictions)))
print("f1 :{}".format(evaluator_f1.evaluate(predictions)))
print("weighted recall :{}".format(evaluator_r.evaluate(predictions)))
print("weighted precision :{}".format(evaluator_p.evaluate(predictions)))
Ak by sme sa chceli dostať ku confusion matrix, je nutné poznamenať, že niektoré MLlib metriky (ako napr. confusion matrix) sú implementované v RDD API (nie v dataframe API). Preto je potrebné vektory predikcií a skutočných hodnôt cieľového atribútu transformovať do RDD a tak spočítať.
from pyspark.mllib.evaluation import MulticlassMetrics
#important: need to cast to float type, and order by prediction, else it won't work
#preds_and_labels = predictions.select(['predictions','d']).withColumn('label_index', F.col('d').cast(FloatType())).orderBy('prediction')
# z dátového rámca predikcií vyberieme len stĺpce predikcie a zaindexovaného cieľového
preds_and_labels = predictions.select(['prediction','label_index'])
# použijeme MulticlassMetrics - ale z RDD MLlib API
metrics = MulticlassMetrics(preds_and_labels.rdd.map(tuple))
# Spočítame confusion matrix
print(metrics.confusionMatrix().toArray())
Vo vypočítanej confusion matrix sú hodnoty cieľového atribútu usporiadané od najnižšej hodnoty po najvyššiu. Ak chceme zisťiť, ktorá zodpovedá ktorej pôvodnej, nezaindexovanej hodnote, musíme ich transformovať naspať. Môžeme to urobiť pomocou transformácie IndexToString().
from pyspark.ml.feature import IndexToString
index_to_string = IndexToString(inputCol="label_index", outputCol="categoryValue")
index_to_string.transform(preds_and_labels).drop("prediction").distinct().show()
Úloha 10.1¶
Transformujte si úlohu KDD Cup na binárnu klasifikáciu. Ako triedu 0.0 označte normálnu komunikáciu a ako triedu 1.0 všetky útoky. Kategorické atribúty zakódujte vhodným spôsobom. Na takto transformovaných dátach naučte model logistickej regresie a pomocou 10-násobnej krížovej validácie nájdite najlepšiu kombináciu hyperparametrov (napr. maxIter, regParam a pod. - pozrite dokumentaciu). Vyhodnoďte jeho presnosť pomocou metrík pre binárnu klasifikáciu. Realizujte túto úlohu pomocou ML Pipelines.