Cvičenie 10 - Spracovanie prúdov dát pomocou Apache Spark Streaming¶
Cieľom jedenásteho cvičenia je naučiť sa pracovať s neohraničenými prúdmi dát.
Spark Streaming umožňuje v prostredí Spark spracovávať dátové prúdy. Spark prúdy diskretizuje do tzv. mikro-dávok, ktoré sú potom vnútorne reprezentované ako RDD množiny. Na tieto prúdové RDD množiny je potom možné aplikovať RDD operácie z predošlých cvičení. Okrem nich, Spark Streaming rozširuje množinu operácií o operácie vykonávané nad časovými oknami.
Prvý príklad je Streaming Wordcount - jednoduchá úloha pre spočítanie slov pomocou Spark Streaming API.
!pip install findspark
!pip install pyspark
# vytvorenie spark aplikacie
import findspark
findspark.init()
import pyspark
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Vytvorime objekt Spark kontext a vytvorime Streaming kontext. Ten (okrem parametra Spark kontextu) definuje aj parameter, ktory specifikuje velkost okna pre mikro-davky. V tomto pripade
# V tomto pripade StreamingContext(sc, 1) vytvori Streaming kontext zo Spark kontextu sc a s intervalom mikro-davok 1 sekunda
batchIntervalSeconds = 1
sc = SparkContext(appName = "NetworkWordCount")
ssc = StreamingContext(sc, batchIntervalSeconds)
# Teraz vytvorime DStream - diskretizovany stream - ktory nacitava data
# použijeme operaciu socketTextStream ktora bude ocakavat vstupne data na localhoste, na porte 9999
lines = ssc.socketTextStream("localhost", 9999)
# Tento DStream potom obsahuje textove data prijate z localhostu na porte 9999. Kazdy zaznam obsahuje
# riadok textu. Tie potom spracujeme operaciami podobne, ako pri skripte wordcount
# zo statickych dat
# Najprv teda odseparujeme v retazci slova oddelene medzerou a spocitame jednotlive vyskyty slov
wordCounts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda x,y: x + y)
# Vypiseme ich na obrazovku pomocou pprint()
wordCounts.pprint()
Pre otestovanie skriptu si otvorte v Jupyter launcheri nové okno terminálu. V ňom si spustite Netcat (utilitu pre sieťové služby). Budeme ju používať pre generovanie textového prúdu dát odosielaných na špecifický sieťový port. Netcat spustite príkazom: nc -lk 9999
Potom spustite streamovaný dopyt. Operácia .start() ho spustí, operácia awaitTermination() so špecifikovaným parametrom (timeout) čaká definovaný čas, kým streamovaný dopyt ukončí (stop()). V tomto prípade bude dopyt fungovať 10 sekúnd.
Spustite dopyt, prepnite sa naspať do terminálu a do Netcatu píšte ľubovoľné slová/znaky (odosielajte enterom). Potom sa prepnite naspať do notebooku a skontrolujte výstup dopytu.
ssc.start()
ssc.awaitTermination(10)
ssc.stop()
Práca so štruktúrovanými prúdmi¶
V nasledujúcej úlohe si ukážeme ako prúdové dáta transformovat do štruktúrovanej podoby a ako s nimi pracovať pomocou dátových rámcov a SQL operácií. Spark vo verzi 2.0.0 a vyššie podporuje tzv. Structured Streaming, čo je spôsob spracovania prúdov pomocou dátových rámcov a SQL operácií. Môžeme tak spracovávat prúdové štruktúrované dáta. Spark ich vnútorne spracováva ako tzv. "unbounded tables" - neohraničené tabuľky - do ktorej sú nepretržite pridávané pribúdajúce záznamy.
Programátor potom špecifikuje výpočet podobným spôsobom ako pri práci s tradičnými dátovými rámcami. Rozdiel je jedine v špecifikácii dopytov - programátor musí aj samotný dopyt reprezentovať ako kontinuálny dátový prúd, ktorého výpočet Spark spustí ako inkrementálny dopyt na neohraničenej vstupnej tabuľke. Dopyt potom vygeneruje výsledok. Pri každom aktualizovaní tabuľky novými dátami (napr. každú sekundu) sa potom dopyt na pribúdajúcich dátach v jednotlivých dávkach prepočítava. Nasledujúci príklad demonštruje predchádzajúcu úlohu transformovanú pomocou Spark Structured Streaming.
# Najprv nacitame potrebne kniznice
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# Pre pouzivanie Structured Streamingu budeme pouzivat objekt Spark Session
spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
# Podobne ako v prvom priklade budeme nacitavat data z localhostu na porte 9999
# Rozdiel je v tom, ze tentokrat nebudeme vytvarat 'DStream' (RDD) ale datovy ramec
# Pomocou funkcie 'readStream' vieme specifikovat zdroj a typ prudu ('socket'), volitelne parametre ('host' a 'port'), funkcia load() prud nacita
lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
# Datovy ramec s nacitanymi datami transformujeme na iny datovy ramec - rozdelenim podla oddelovaca
# Funkcia explode vytvori novy zaznam pre kazdy element pre dany stlpec datoveho ramca - transformuje datovy ramec
# pozostavajuci z riadkov na datovy ramec slov
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Nasledne mozme pouzit operacie pre datove ramce a spocitat pocet vyskytov slov (zoskupenim podla slov a spocitanim)
wordCounts = words.groupBy("word").count()
# Pri Structured Streamingu je ale aj vysledok operacie strukturovany prud. Ten teda vytvorime a bude
# kontinualne vykonavat na danom prude specifikovany vypocet
query = wordCounts.writeStream.outputMode("complete").queryName("counts").format("memory").start()
query.awaitTermination(20)
query.stop()
spark.table("counts").show()
# Po vykonani tohoto kodu sa na pozadi spusti vypocet a bude sa ukladat do konzoly.
# V tomto pripade je nastaveny sposob vystupu "complete".
Pri Structured Streamingu je ale aj výsledok operácie štruktúrovaný dátový prúd. Ten teda vytvoríme a bude kontinuálne vykonávať na danom prúde špecifikovaný výpočet.
Spracovanie streamu senzorických dát¶
V nasledujúcom príklade budeme pracovať s dátami zo zdroja PubNub. Verejný prúd z tohto zdroja obsahuje senzorické dáta nazbierané v rámci IoT platformy. Pomocou PubNub konektoru sa pripojíme na verejný zdroj senzorických prúdových dát a každý senzorický záznam bude prijatý ako JSON objekt s nasledovnými atribútmi:
'timestamp', 'ambient_temperature', 'humidity', 'photosensor', 'sensor_uuid', 'radiation_level'
Najprv spustíme konektor, ktorý nám umožní dáta zo zdroja načítať. V launcheri prostredia Jupyter si otvorte si okno terminálu, vojdite do Vášho pracovného adresára a stiahnite skript Subscriber.py pomocou príkazu: wget http://people.tuke.sk/martin.sarnovsky/tsvd/files/Subscriber.py
Tento skript vám umožní periodicky sťahovať dáta z PubNub zdroja a začne ich zapisovať do adresára /stream vo Vašom pracovnom adresári. Pre každý záznam vytvorí skrip Subscriber.py jeden JSON súbor. Predtým ako ho buďete môcť spustiť, musíte nainštalovať potrebný Python balík pre PubNub. Nainštalujete ho v termináli pomocou príkazu: pip install pubnub==3.9.0
Subscriber pre príklady nižšie nie je potrebné mať spustený neustále. Stačí, aby ste pomocou neho stiahli nejaké dáta, Spark potom bude čítať tieto záznamy zaradom ako dátový prúd.
Nasledujúci skript obsahuje niekoľko úloh, ktoré stream takýchto objektov spracujú do dátového rámca pomocou Structured Streaming API a prezentuje niekoľko dopytov.
# Importujeme potrebne moduly
from pyspark import SparkContext, Row, SQLContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Vytvorime Spark Context a Spark Session objekty
#sc = SparkContext(appName="pubnub_dir_streaming_app")
#spark = SparkSession.builder.appName("StreamingPubNub").getOrCreate()
# Specifikujeme schemu JSON objektov, ktore budeme spracovavat. Pre atributy (okrem 'timestamp' atributu) typ String, kedze prijate JSON objekty obsahuju hodnoty velicin ako retazce
schema = StructType([
StructField("timestamp", TimestampType(), True),
StructField('ambient_temperature', StringType(), True),
StructField('humidity', StringType(), True),
StructField('photosensor', StringType(), True),
StructField('sensor_uuid', StringType(), True),
StructField('radiation_level', StringType(), True)
])
# Vytvorime Streaming Data Frame zo zdroja, ktory obsahuje pribudajuce JSON subory. Ako parameter pri vytvarani datoveho ramca definujeme schemu JSON objektov, ktore budeme spracovavat
# ako parameter nastavime atribut 'maxFilesPerTrigger', ktory specifikuje, kolko suborov sa bude naraz spracovavat a zdroj JSON streamovanych objektov
df = spark.readStream.schema(schema).option("maxFilesPerTrigger",1).json("stream/")
# V dalsom kroku transformujeme atributy datoveho ramca na pozadovane hodnty (hodnoty senzorickych velicin na z retazcov na numericke)
df = df.withColumn("ambient_temperature", df["ambient_temperature"].cast(FloatType()))
df = df.withColumn("humidity", df["humidity"].cast(FloatType()))
df = df.withColumn("photosensor", df["photosensor"].cast(FloatType()))
df = df.withColumn("radiation_level", df["radiation_level"].cast(FloatType()))
# Ak chceme skontrolovat streamovany datovy ramec, musime specifikovat streamovany dopyt, ktory ho bude kontinualne po monitorovat
# Vytvorime teda 'df_stream' z datoveho ramca, specifikujeme sposob vypisu do konzoly ("console") a spustime
df_stream = df.writeStream.format("memory").queryName("stream").start()
# Tato query bude aktivna 10 sekund
df_stream.awaitTermination(10)
df_stream.stop()
spark.table("stream").show()
Nižšie je špecifikovaných niekoľko dopytov do takto vytvoreného dátového rámca.
Prvý vyberie z dátového rámca DF tie senzory, ktorých nameraná hodnota parametra vlhkosti je > 10
df_stream = df.writeStream.format("memory").queryName("stream").start()
query = df.select(df["sensor_uuid"]).where(df["humidity"] >= 10.0)
query_stream = query.writeStream.format("memory").queryName("humidity").start()
df_stream.awaitTermination(10)
df_stream.stop()
query_stream.stop()
spark.table("humidity").show()
Druhý filtruje z dátového rámca DF senzory s nameranou hodnotou radiácie > 200
df_stream = df.writeStream.format("memory").queryName("stream").start()
query = df.filter(df['radiation_level'] >= 200.0)
query_stream = query.writeStream.format("memory").queryName("radiation").start()
df_stream.awaitTermination(10)
df_stream.stop()
query_stream.stop()
spark.table("radiation").show()
Tretí je agregačný dopyt - zoskupuje senzory v mikro-dávkach podľa ich ID a spočítava ich výskyty. Pre agregačné dopyty je možné špecifikovať aj 'outputMode' - spôsob ako bude prezentovať výsledok.
df_stream = df.writeStream.format("memory").queryName("stream").start()
query = df.groupBy("sensor_uuid").count()
query_stream = query.writeStream.outputMode("complete").format("memory").queryName("uuid_counts").start()
df_stream.awaitTermination(10)
df_stream.stop()
query_stream.stop()
spark.table("uuid_counts").show()
Posledný dopyt je rovnaký ako predchádzajúci, avšak rozšírený o funkcie počítania s časovými oknami, kde vieme špecifikovať časové okno pre vyhodnocovanie daného dopytu.
df_stream = df.writeStream.format("memory").queryName("stream").start()
query = df.groupBy(df['sensor_uuid'], window(df['timestamp'], "5 seconds")).count()
query_stream = query.writeStream.format("memory").outputMode("complete").queryName("windows").start()
df_stream.awaitTermination(10)
df_stream.stop()
query_stream.stop()
spark.table("windows").show()
Viac informácií o spracovávaní prúdov a operáciách nájdete na tejto stránke (v angličtine). Viac o Spark Structured Streaming-u nájdete tu (v angličtine).
Úloha 10.1¶
Naštudujte si dokumentáciu k operáciám s posuvnými oknami a modifikujte skript NetworkWordcount.py tak, aby spočítaval výskyty slov za posledných 20 sekúnd v intervaloch každých 5 sekúnd. Skript spustite a otestujte. Rovnako sa v prvom príklade pohrajte s veľkosťou mikro-dávok špecifikovaných v Streaming Contexte.
Úloha 10.2¶
Naštudujte si dokumentáciu s Structured Streaming a rovnakú modifikáciu skriptu realizujte aj pre Streaming Wordcount úlohu z druhého príkladu pomocou Structured Streaming-u.
Úloha 10.3¶
V treťom príklade si zvoľte numerický atribút, ktorý diskretizujte. Pomocou takto vytvoreného kategorického atribútu napíšte streamovaný dopyt, ktorý bude zoskupovať senozory podľa jeho hodnôt a počítať priemerné hodnoty ostatných numerických atribútov pre tieto zoskupenia.