Azure Stream Analytics

Michal Molka
4 min readNov 12, 2021

Today‘s topic is Azure Stream Analytics — what is it and how it works. Long story short, ASA is a service which stores and analyze events. Quick demo bellow.

Firstly, we need to create an ingestion service — an Event Hub which provides data to a Stream Analytics service. We need some data to work on it — in this case we can use a python script — which generates events and send them to the Event Hub every 10 seconds.

Code:

import time
import os
import uuid
import datetime
import random
import json

from azure.eventhub import EventHubProducerClient, EventData

devices = []
for x in range(0, 2):
devices.append(str(uuid.uuid4()))

producer = EventHubProducerClient.from_connection_string(
conn_str="Endpoint={}/;SharedAccessKeyName={};SharedAccessKey={}",
eventhub_name="{}",
)
countries = [
"Germany",
"Russia",
"Libia",
"RPA",
"Peru",
"Canada",
"France",
"Vietnam",
"Taiwan",
"Australia",
]

for y in range(0, 200):
event_data_batch = producer.create_batch()

for dev in devices:
reading = {
"id": dev,
"country": countries[random.randrange(0, 9)],
"timestamp": str(datetime.datetime.utcnow()),
"value01": random.random(),
"value02": random.randint(70, 100),
"value03": random.randint(70, 100),
}

s = json.dumps(reading)
print(s)
event_data_batch.add(EventData(s))
producer.send_batch(event_data_batch)
time.sleep(10)
producer.close()

Events:

Here is an example of created Event Hub.

As we noticed, events were sent to Event Hub.

Now it is the time for the Stream Analytics.

After a Stream Analytics job is created. We can focus on three fields: an Input, an Output and a Query.

In this case, we have two inputs. An Event Hub and a SQL Database. The Event Hub ingests events generated by the script. The SQL Database contains a table which is joined to the data in the Event Hub.

We have two outputs. An SQL Database and a Blob Storage where data is stored.

A Query — a place where the magic happens. On the left side, you can view data in every stream (inputs and outputs) — the data is visible in the bottom section.

Your inputs and outputs streams are composed together by SQL. The structure is simple, the FROM (with eventual joins) function takes data from an input. The INTO function places data into an output.

In the case above we are using all of the inputs and outputs.

Screens above show saved events inside these two destinations.

Here is data stored as JSON files in the Data Lake:

And the SQL Database.

If you want to store aggregated events by a time window then you can use one of built-in functions.

We have five windowing functions:

  • Tumbling window
  • Hopping window
  • Sliding window
  • Session window
  • Snapshot window

The best explanation of each of them is provided by Microsoft:

Introduction to Azure Stream Analytics windowing functions | Microsoft Docs

I show you one example: a hopping window.

We can recreate previous code to make it a more concise and implements the hopping window.

WITH ingesting_source
AS
(
SELECT
[ref].[continent] AS [Continent],
SUM([inp].[Value02]) as [SumValue],
COUNT(*) AS [EventCount],
System.Timestamp() AS WindowTime
FROM
[eightfive-stream-input] inp TIMESTAMP BY [inp].[timestamp]
LEFT JOIN [eightfive-country-reference] ref
ON ref.Country = inp.country

WHERE ref.continent IN ('Europe')
GROUP BY [ref].[Continent], HoppingWindow(second, 20, 10)
)
SELECT *
INTO [eightfive-stream-analitycs-output]
FROM ingesting_source inp

In the example above we are calculating a Value02 field SUM() and an events quantity. Every 10 seconds, events are aggregated over last 20 seconds . In this case, they are grouped by a [Continent] column. The last column is a System.Timestamp() function — it is the end of the time window.

As you can see, events are aggregated:

--

--