Fabric Change the Game: Real – time Analytics
As we delve into this comprehensive series on Fabric, one of its pivotal facets we explore is Real-Time Analytics. Within this subject, Real-Time Analytics serves as a formidable tool, diminishing complexity and streamlining data integration processes. Imagine scenarios where a continuous stream of data unfolds, capturing the intricacies of messages posted on a micro-blogging site or chronicling environmental measurements relayed by an internet-connected weather sensor. The essence of streaming data analytics lies in its ability to illuminate the nuances of change over time.
Microsoft Fabric allows you to build Real-Time streaming analytics with eventstream or Spark Stream. You can find more information here about Spark Stream: Streaming data into lakehouse – Microsoft Fabric | Microsoft Learn and here for eventstream:Create and manage an eventstream in Microsoft Fabric – Microsoft Fabric | Microsoft Learn
This post is based on an experience made by https://www.linkedin.com/in/anshuldsharma/ and published here: anshulsharmas/fabric-iss-demo: Monitoring International Space Station with Microsoft Fabric (github.com) , shared with me by my colleague Luca Ferrari.
The GitHub author made an experience tracking the International Space Station (ISS) using the API Open Notify — API Documentation (open-notify.org) which gives the current position of the ISS. Using Logic Apps, he pulls this information sent to eventstream and from there to a Kusto Database where he queries.
Using the same idea, the plan is to show this has been fetched from Spark Notebook inside of Fabric and then sent to eventstream.
You will need also to reference this doc: Stream real-time events from a custom app to a Microsoft Fabric KQL database – Microsoft Fabric | Microsoft Learn
Steps by Step
Inside of the Real-time Analytics options in Fabric
- Real-Time Analytics – Create your KQL Database and eventstream as mentioned in the doc above and Fig 1 – KQL and Fig 2 – Eventstream show:
Inside of the eventstream in Fabric
2. Eventstream – Open eventstream and define your source as a custom app, as Fig 3 – Custom app, show:
3. Copy the primary key in the notepad and keep it there, we will use the Python script to connect to the eventstream and send the events, as can be seen from Fig 4 – Key:
Inside of the Notebook in Fabric
4. Notebook – The Python script will need some libraries to be pip-installed at the lakehouse. More information on how to use notebooks in Fabric is here: https://learn.microsoft.com/en-us/fabric/data-engineering/lakehouse-notebook-explore:
pip install azure-eventhub
pip install apscheduler
Follow more information about the libraries:
APScheduler · PyPI – This one will be used as an example to send the events in a time period. You do not need this one if you configure it in a different way to keep sending the events. I am using it for the purpose of this demo – as the library described itself – “Python library that lets you schedule your Python code to be executed later, either just once or periodically. You can add new jobs or remove old ones on the fly as you please.”
azure-eventhub · PyPI – Azure Event Hubs is a highly scalable publish-subscribe service that can ingest millions of events per second and stream them to multiple consumers. This lets you process and analyze the massive amounts of data produced by your connected devices and applications.
5. Let’s see how we can get the API request. Follow the example with the request library:
import requests
response = requests.get("http://api.open-notify.org/iss-now.json")
# Print the status code of the response.
print(response.status_code)
print(response.content)
5. Results shown in the Fig 5 – Request Results:
6. Add the requests inside of a function – def send_iss_data, and use the APSScheduler to execute and send events every 5 seconds: –
scheduler.add_job(send_iss_data, 'interval', seconds=5).
Inside the Python script, bellow find and replace the endpoint configuration that you extracted from the eventstream in step 3 and copied in a notepad to use later: “Endpoint=sb://…./;SharedAccessKeyName=key_3….;SharedAccessKey=E…”
##Libraries
import requests
from azure.eventhub import EventHubProducerClient, EventData
import time
from apscheduler.schedulers.blocking import BlockingScheduler
import json
##Function
def send_iss_data():
try:
# Get the latest position
response = requests.get("http://api.open-notify.org/iss-now.json")
if response.status_code == 200:
# Convert iss_data to JSON string
iss_data = response.json()
iss_data_json = json.dumps(iss_data)
# replace connection string from eventhub
eventhub_connection_str = (
f"Endpoint=sb://..../;SharedAccessKeyName=key_3....;SharedAccessKey=E....;EntityPath=...."
)
producer = EventHubProducerClient.from_connection_string(eventhub_connection_str)
# Add the event data to the batch
# Send the batch to the event hub
with producer:
event_data_batch = producer.create_batch()
event_data_batch.add(EventData(body=iss_data_json))
producer.send_batch(event_data_batch)
print("Data sent to Azure Event Hub successfully.")
##Inform the failure in case is not possible to send the evnet
else:
print(f"Failed to retrieve ISS data. Status code: {response.status_code}")
except Exception as e:
print(f"Error: {str(e)}")
###################Executing the Function###########################
# Set up a scheduler to run every 5 seconds
scheduler = BlockingScheduler()
scheduler.add_job(send_iss_data, 'interval', seconds=5)
try:
scheduler.start()
except (KeyboardInterrupt, SystemExit):
print("Scheduler stopped.")
Just hit the run button and the events should start to be sent to the eventstream as you can see in Fig 6 Spark Events.
Now let’s get back to the eventstream.
Back to the eventstream in Fabric
7- Eventstream – Add the destination of the data. KQL for us to complete the example proposed by Anushul Sharmas – Fig 7 – KQL
8 – Select Direct Ingestion and add the KQL database we created in Step 1. Fig 8 – Adding:
9 – The format sent is JSON, so create a new table with JSON format, and do not forget about the datatypes for that table and the JSON nested level of two. Fig 9- Json:
10. I also added the Spark Lakehouse as destiny for the Ingestion as an example. You should have at this point if you also added the Lakehouse, something similar to this. Feel free to add or not the Lakehouse. Fig 10 – eventstream:
Query the KQL Database
11. KQL – Now the cool part! Open your KQL Queryset and point to the KQL database created in Step 1. You can query using the example taken from the GitHub mentioned as it follows. The KQL queryset is one of the Real-Time Analytics options as you can see in Fig 11 KQuerySet, and results can be seen in Fig 12 – Kqlquery:
// ISS complete orbit
TABLENAME
|project iss_position_latitude,iss_position_longitude,timestamp
| render scatterchart with ( kind=map )
Back to the eventstream in Fabric
12. Eventstream – Now let’s use the eventstream to send a notification every time the longitude is between 20 and 30 or maybe when the ISS is in a range close to where you live, your choice. Back to the evenstream, create a new destiny for Reflex.
Add a new source: Reflex – Fig 13 reflex
12 – Open your workspace and list the objects that are inside of it. Look for reflex, the new one that you just created – Fig 14 Workspace_Reflex:
Inside of Reflex/ Data Activator in Fabric – <Preview>
13- Reflex – Create a new object to configure the trigger of some actions into the design mode – Fig 15 newObject:
14 – Longitude will be the option chosen since the goal is to be notified every time the Longitude is in a certain range – Fig 16 Longitude:
15 – The next step is the action configuration – Fig 17 Action. Configure the Range or if you want a different action, use the filter to define it:
16 – Once the action is saved you can start the trigger and also send a test to check the results as for example – Fig 18 Example:
Follow some references for the Data activator:
https://learn.microsoft.com/en-us/fabric/data-activator/data-activator-assign-data-objects
https://learn.microsoft.com/en-us/fabric/data-activator/data-activator-create-triggers-design-mode
https://learn.microsoft.com/en-us/fabric/data-activator/data-activator-detection-conditions
https://learn.microsoft.com/en-us/fabric/data-activator/data-activator-limitations
Summary: By employing the provided example from Anshul Sharma – Senior Program Manager from Microsoft, available in GitHub anshulsharmas/fabric-iss-demo: Monitoring International Space Station with Microsoft Fabric (github.com), we successfully implemented a comprehensive solution within Fabric. Utilizing the Open Notify API’s documentation for International Space Station (ISS) position data extraction, we employed a Python script to gather this information. Subsequently, the data was transmitted to the event stream and further forwarded to a Kusto Query Language (KQL) database. Through querying this database, we were able to generate a map illustrating the ISS’s current location. Last but not least we also configure notifications using the Data Activator.