Data streams processing using Apache Spark Streaming¶
The goal of this exercise is to learn how to work with unbounded, continuous data streams.
Spark Streaming allows you to process data streams in the Spark environment. Spark discretizes streams into so-called of micro-batches, which are then internally represented as RDD sets. RDD operations from previous exercises can then be applied to these stream RDD sets. In addition to these, Spark Streaming extends the set of operations to include operations performed over time windows.
The first example is Streaming Wordcount - a simple task for counting words using the Spark Streaming API.
!pip install findspark
!pip install pyspark
# create a spark app
import findspark
findspark.init()
import pyspark
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# Create a Spark context object and create a Streaming context. It (in addition to the Spark context parameter) also defines a parameter that specifies the size of the window for micro-batches.
# In this case StreamingContext(sc, 1) creates a Streaming context from the Spark context sc and with a micro-batch interval of 1 second
batchIntervalSeconds = 1
sc = SparkContext(appName = "NetworkWordCount")
ssc = StreamingContext(sc, batchIntervalSeconds)
# Now we will create a DStream - a discretized stream - which reads data
# we will use the socketTextStream operation, which will expect input data on localhost, on port 9999
lines = ssc.socketTextStream("localhost", 9999)
# The DStream contains text data received from localhost on port 9999. Each record contains
# a line of text. We will then process them with operations similar to the wordcount script
# from static data
# First, we separate the words in the string separated by a space and count the individual occurrences of the words
wordCounts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda x,y: x + y)
# print the output using pprint()
wordCounts.pprint()
To test the script, open a new terminal window in the Jupyter launcher. In it, run Netcat (utility for network services). We will use it to generate a text stream of data sent to a specific network port. Start Netcat with the command: nc -lk 9999
Then run a streaming query. Operation .start() starts it, operation awaitTermination() with a specified parameter (timeout) waits a defined time before terminating the streaming query (stop()). In this case, the query will work for 10 seconds.
Run the query, switch to sleep in the terminal and type any words/characters into Netcat (send with enter). Then switch to sleep on your laptop and check the query output.
ssc.start()
ssc.awaitTermination(10)
ssc.stop()
Working with structured streams¶
In the following task, we will show how to transform stream data into a structured form and how to work with them using data frames and SQL operations. Spark in version 2.0.0 and above supports the so-called Structured Streaming, which is a way of processing streams using data frames and SQL operations. We can thus process stream structured data. Spark processes them internally as so-called "unbounded tables" - unbounded tables - to which additional records are continuously added.
The programmer then specifies the calculation in a similar way to working with traditional data frames. The only difference is in the specification of the queries - the programmer must also represent the query itself as a continuous data stream, the calculation of which Spark starts as an incremental query on an unbounded input table. The query then generates the result. Every time the table is updated with new data (e.g. every second), the demand for additional data is then recalculated in individual batches. The following example demonstrates the previous job transformed using Spark Structured Streaming.
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
# To use Structured Streaming, we will use the Spark Session object
spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()
# Similar to the first example, we will read data from localhost on port 9999
# The difference is that this time we will not create a 'DStream' (RDD) but a data frame
# Using the 'readStream' function, we can specify the source and type of stream ('socket'), optional parameters ('host' and 'port'), the load() function loads
lines = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
# We transform the data frame with the loaded data into another data frame - by splitting it according to the separator
# The explode function creates a new record for each element for a given column of the data frame - it transforms the data frame
# consisting of rows per data frame of words
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# Next, we can use operations for data frames and count the number of occurrences of words (by grouping by words and counting)
wordCounts = words.groupBy("word").count()
# With Structured Streaming, the result of the operation is also a structured burst. So we will create it and it will be
# continuously perform the specified calculation on the given current
query = wordCounts.writeStream.outputMode("complete").queryName("counts").format("memory").start()
query.awaitTermination(20)
query.stop()
spark.table("counts").show()
# After executing this code, the calculation will start in the background and will be saved to the console.
# In this case, the "complete" output method is set.
With Structured Streaming, the result of the operation is also a structured data stream. So we will create it and it will continuously perform the specified calculation on the given stream.
Sensory data stream processing¶
In the following example, we will work with data from the PubNub resource. The public stream from this source contains the sensor data collected within the IoT platform. Using the PubNub connector, we will connect to a public source of sensory stream data, and each sensory record will be received as a JSON object with the following attributes:
'timestamp', 'ambient_temperature', 'humidity', 'photosensor', 'sensor_uuid', 'radiation_level'
First, we start the connector, which will allow us to load data from the source. In the Jupyter launcher, open a terminal window, go to your working directory and download the Subscriber.py script using the command: wget http://people.tuke.sk/martin.sarnovsky/tsvd/files/Subscriber.py
This script will allow you to periodically download data from the PubNub source and start writing them to the /stream directory in your working directory. For each record, the Subscriber.py script creates one JSON file. Before you can run it, you need to install the necessary Python package for PubNub. You install it in the terminal using the command: pip install pubnub==3.9.0
The Subscriber for the examples below does not need to be running all the time. You just use it to download some data, Spark will then read these records in sequence as a data stream.
The following script contains several tasks that process a stream of such objects into a data frame using the Structured Streaming API and presents several queries.
from pyspark import SparkContext, Row, SQLContext
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
# Let's create Spark Context and Spark Session objects
#sc = SparkContext(appName="pubnub_dir_streaming_app")
#spark = SparkSession.builder.appName("StreamingPubNub").getOrCreate()
# We specify the JSON schema of the objects that we will process. For attributes (except for the 'timestamp' attribute) type String, since received JSON objects contain variable values as strings
schema = StructType([
StructField("timestamp", TimestampType(), True),
StructField('ambient_temperature', StringType(), True),
StructField('humidity', StringType(), True),
StructField('photosensor', StringType(), True),
StructField('sensor_uuid', StringType(), True),
StructField('radiation_level', StringType(), True)
])
# We will create a Streaming Data Frame from a source that contains additional JSON files. As a parameter when creating the data frame, we define the schema of the JSON objects that we will process
# set the 'maxFilesPerTrigger' attribute as a parameter, which specifies how many files will be processed at once and the JSON source of streamed objects
df = spark.readStream.schema(schema).option("maxFilesPerTrigger",1).json("stream/")
# In the next step, we transform the attributes of the data frame to the required values (values of sensory quantities from strings to numeric)
df = df.withColumn("ambient_temperature", df["ambient_temperature"].cast(FloatType()))
df = df.withColumn("humidity", df["humidity"].cast(FloatType()))
df = df.withColumn("photosensor", df["photosensor"].cast(FloatType()))
df = df.withColumn("radiation_level", df["radiation_level"].cast(FloatType()))
# If we want to check a streaming data frame, we must specify a streaming query that will continuously monitor it
# So we create a 'df_stream' from the data frame, specify the output method to the console ("console") and run
df_stream = df.writeStream.format("memory").queryName("stream").start()
# This query will be active for 10 seconds
df_stream.awaitTermination(10)
df_stream.stop()
spark.table("stream").show()
Several queries to the thus created data frame are specified below.
The first selects from the DF data frame those sensors whose measured value of the humidity parameter is > 10
df_stream = df.writeStream.format("memory").queryName("stream").start()
query = df.select(df["sensor_uuid"]).where(df["humidity"] >= 10.0)
query_stream = query.writeStream.format("memory").queryName("humidity").start()
df_stream.awaitTermination(10)
df_stream.stop()
query_stream.stop()
spark.table("humidity").show()
The second one filters sensors with a measured radiation value > 200 from the DF data frame
df_stream = df.writeStream.format("memory").queryName("stream").start()
query = df.filter(df['radiation_level'] >= 200.0)
query_stream = query.writeStream.format("memory").queryName("radiation").start()
df_stream.awaitTermination(10)
df_stream.stop()
query_stream.stop()
spark.table("radiation").show()
The third is an aggregation query - it groups sensors in micro-dosages by their ID and counts their occurrences. For aggregation queries, it is also possible to specify 'outputMode' - the way the result will be presented.
df_stream = df.writeStream.format("memory").queryName("stream").start()
query = df.groupBy("sensor_uuid").count()
query_stream = query.writeStream.outputMode("complete").format("memory").queryName("uuid_counts").start()
df_stream.awaitTermination(10)
df_stream.stop()
query_stream.stop()
spark.table("uuid_counts").show()
The last query is the same as the previous one, but extended by functions of calculation with time windows, where we can specify a time window for evaluating the given query.
df_stream = df.writeStream.format("memory").queryName("stream").start()
query = df.groupBy(df['sensor_uuid'], window(df['timestamp'], "5 seconds")).count()
query_stream = query.writeStream.format("memory").outputMode("complete").queryName("windows").start()
df_stream.awaitTermination(10)
df_stream.stop()
query_stream.stop()
spark.table("windows").show()
More information about stream processing and operations can be found on this page (in English). You can find more about Spark Structured Streaming here (in English).
Task 7.1¶
Consult the documentation for sliding window operations and modify the NetworkWordcount.py script to count occurrences of words in the last 20 seconds at intervals of every 5 seconds. Run the script and test it. Also, in the first example, play with the size of the micro-batch specified in the Streaming Context.
Task 7.2¶
Study the documentation with Structured Streaming and implement the same script modification for the Streaming Wordcount task from the second example using Structured Streaming.
Task 7.3¶
In the third example, choose a numerical attribute to discretize. Using the categorical attribute created in this way, write a streaming query that will group senors by its values and calculate the average values of the other numeric attributes for these groupings.