ML Model Scoring in Fabric Eventhouse via Update Policy
In this blog post, we will describe how to train an ML model in Fabric Spark notebook, save it in Fabric’s models registry and use it for scoring new streaming data by Fabric Eventhouse via update policy in real time. We describe a typical workflow that can be implemented to monitor cloud resources or IoT devices for performance, resource consumption, anomalies and more. This workflow follows these steps:
- The data source (e.g. Fabric Eventstream) ingests streaming data to a table in the Eventhouse.
- By enabling OneLake availability on that table, this data can be consumed by all Fabric’s analytical engines.
- Specifically, this data is used for training an ML model using a Spark notebook.
- The ML model is saved in Fabric’s MLflow models registry with various parameters (e.g. training data info, accuracy metrics etc.) for its lifecycle management – monitoring its accuracy, triggering retraining upon accuracy drifts etc. (see Train and register machine learning models).
- In addition, this model is serialized and pushed to the Eventhouse using Kqlmagic extension to be available for scoring.
New streaming data that are ingested to the Eventhouse are automatically scored in real time directly on the Eventhouse by using Update Policy.
Solution components
This solution relies on the following components:
- Eventstream: This is the source of streaming events that are ingested into the Eventhouse.
- Eventhouse: The data is initially ingested into an Eventhouse, which is a real-time data processing engine that can handle high-throughput data streams.
- OneLake: Data from the Eventhouse is exposed in the OneLake, which is a shared persistent storage layer that provides a unified view of the data.
- Spark Notebook: used for offline training the ML model on historical data, storing the trained model in Fabric’s MLflow models registry, and storing it in the Eventhouse using Kqlmagic extension.
- KQL queryset: used for automatic real time scoring of incoming data by the trained ML model via an Update Policy that calls the python() plugin of the Eventhouse.
Prerequisites
- A workspace with a Microsoft Fabric-enabled capacity
- Role of Admin, Contributor, or Member in the workspace.
Workflow setup
- Create an Eventhouse MSIT_DEMO_EH (choose your name).
- Enable the Python plugin:

- Enable OneLake availability for MSIT_DEMO_EH database:

- Copy the Eventhouse query URI. We will paste it in the Spark notebook to ingest the trained model to the Eventhouse:

- Create an Eventstream named ‘Weather_ES’ with real time weather source, configure it to Seattle weather. Set its destination to ‘weather_seattle_stream’ table on your Eventhouse.
- Create a Queryset ‘ML-Scoring-via-Update-Policy’, paste the following KQL:
// weather_seattle_stream is a streaming dataset that is updated every 1m
// from an Event Stream of weather service (one of the demo services
// available in Fabric)
// weather_seattle is updated by an update policy to extract few fields from
// the multiple dynamic columns of weather_seattle_stream
// Verify weather_seattle_stream contains some data
weather_seattle_stream
| summarize count(), min(ingestion_time()), max(ingestion_time()), min(dateTime), max(dateTime)
// Create an update policy to extract relevant columns to a new table
// weather_seattle
.create-or-alter function with(folder='Demo') extract_weather_cols()
{
weather_seattle_stream
| project stream_ingestion_time=ingestion_time(), weather_time=dateTime,
temp=toreal(temperature.value), humidity=relativeHumidity
}
// Create the target table
.set weather_seattle <|
extract_weather_cols
| take 0
// We are going to train the temperature forecasting model using Fabric
// Spark notebook. To access the data for training we use OneLake mirroring.
// By default, mirroring has 3h latency (for small ingestions) so update the
// mirroring policy to 5m latency (this is the minimum that can be set).
// To alter it we first need to disable it
.alter-merge table weather_seattle policy mirroring dataformat=parquet with (IsEnabled=false)
.alter-merge table weather_seattle policy mirroring dataformat=parquet with (IsEnabled=true, Backfill=true, TargetLatencyInMinutes=5)
.show table weather_seattle policy mirroring
// Activate the update policy
.alter table weather_seattle policy update
```
[
{
"IsEnabled": true,
"Source": "weather_seattle_stream",
"Query": "extract_weather_cols",
"IsTransactional": false,
"PropagateIngestionProperties": false
}
]
```
.show table weather_seattle policy update
// Wait few minutes and check that weather_seattle is being ingested
weather_seattle
| summarize count(), min(stream_ingestion_time), max(stream_ingestion_time), min(weather_time), max(weather_time)
weather_seattle
| take 10
// To access the full table and/or use python() in the update policy for
// weather_seattle_regression_forecast (see below) we must disable streaming
// ingestion on ALL its sources. In this case we need to disable it on
// both weather_seattle_stream and weather_seattle
.alter table weather_seattle_stream policy streamingingestion disable
.alter table weather_seattle policy streamingingestion disable
// Prepare a table to store the ML models that will be trained on the Spark
// notebook and serialized to this table
.create table ML_Models(name:string, timestamp:datetime, model:string)
// Alter encoding policy to support models whose size is up to 24MB (expanded
// to 32MB due to base64 encoding)
.alter column ML_Models.model policy encoding type='BigObject32'
7. Run these queries and commands sequentially.
8. Select the Eventhouse and copy the OneLake path to ‘weather_seattle’. We will paste it in the Spark notebook to train our model:


9. From the Workspace screen create Fabric-Weather-Model-Training.ipynb notebook

10. Copy the following code (split it to cells as you like):
import numpy as np
import pandas as pd
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split
import mlflow
import mlflow.sklearn
# Spark needs an ABFSS URI to securely connect to OneLake storage, so define a function to convert the OneLake URI to ABFSS URI.
def convert_onelake_to_abfss(onelake_uri):
if not onelake_uri.startswith('https://'):
raise ValueError("Invalid OneLake URI. It should start with 'https://'.")
uri_without_scheme = onelake_uri[8:]
parts = uri_without_scheme.split('/')
if len(parts) < 3:
raise ValueError("Invalid OneLake URI format.")
account_name = parts[0].split('.')[0]
container_name = parts[1]
path = '/'.join(parts[2:])
abfss_uri = f"abfss://{container_name}@{parts[0]}/{path}"
return abfss_uri
# Use the Onelake URI of the table in Eventhouse and convert it to ABFSS URI
onelake_uri = "Replace with your OneLake table URI"
abfss_uri = convert_onelake_to_abfss(onelake_uri)
print(abfss_uri)
# Load the table and convert it to a pandas dataframe
dfs = spark.read.format('delta').load(abfss_uri)
df = dfs.toPandas()
print(df.shape)
df[-3:]
# sampling rate of weather service is ~1m, so we try to predict the temperature in few minutes into the future based on last 3h
n_prediction = 3
n_lookback = 180
offset = -n_prediction
# Prepare the dataframe for the model, add few simple features
df1 = df.sort_values(by='stream_ingestion_time', ascending=True)
df1 = df1.iloc[-(n_lookback+n_prediction):][['stream_ingestion_time', 'temp', 'humidity', 'weather_time']]
df1['temp_diff_1'] = df1['temp'].diff(1)
df1['temp_diff_10'] = df1['temp'].diff(10)
df1['temp_diff_20'] = df1['temp'].diff(20)
df1['label_ingestion_time'] = df1['stream_ingestion_time'].shift(offset)
df1['label_weather_time'] = df1['weather_time'].shift(offset)
df1['label_temp'] = df1['temp'].shift(offset)
df1 = df1[df1[['label_temp', 'temp_diff_20']].notna().all(axis=1)]
print(df1.shape)
# print(df1)
# Split data to train & test
X = df1[['temp', 'humidity', 'temp_diff_1', 'temp_diff_10', 'temp_diff_20']]
y = df1['label_temp']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
print("X: ", X.shape)
print("Y: ", y.shape)
print("X_train: ", X_train.shape)
print("X_test: ", X_test.shape)
print("y_train: ", y_train.shape)
print("y_test: ", y_test.shape)
# Train the model
model = LinearRegression()
model.fit(X_train, y_train)
# Predict & evaluate the accuracy
y_pred = model.predict(X_test)
rmse = mean_squared_error(y_test, y_pred)
print(f"RMSE: {rmse:.2f}")
# Log the model and few parameters to Fabric MLflow models registry
min_times = df1[['stream_ingestion_time', 'weather_time']].min()
max_times = df1[['stream_ingestion_time', 'weather_time']].max()
min_ingestion_time = f'{min_times.stream_ingestion_time}'
max_ingestion_time = f'{max_times.stream_ingestion_time}'
min_weather_time_time = f = f'{min_times.weather_time}'
max_weather_time_time = f = f'{max_times.weather_time}'
model_name = "Predict_temperature"
params = {
"model_type": "LinearRegression",
"n_prediction": n_prediction,
"n_lookback": n_lookback,
"min_ingestion_time": min_ingestion_time,
"max_ingestion_time": max_ingestion_time,
"min_weather_time": min_weather_time_time,
"max_weather_time": max_weather_time_time
}
if mlflow.active_run():
mlflow.end_run()
with mlflow.start_run():
mlflow.set_tag("Training Info", "80% training data")
mlflow.log_params(params)
mlflow.log_metric("rmse", rmse)
model_info = mlflow.sklearn.log_model(
model,
artifact_path="weather",
registered_model_name=model_name,
)
# Test loading the model using pyfunc flavor and predict
loaded_model = mlflow.pyfunc.load_model(model_info.model_uri)
y_pred1 = loaded_model.predict(X_test)
rmse1 = mean_squared_error(y_test, y_pred1)
print(f"RMSE: {rmse1:.2f}")
# Serialize the model to base64 string to be stored in KQL DB models table
import pickle
import base64
bmodel = pickle.dumps(loaded_model)
smodel = base64.b64encode(bmodel)
# Test loading the model from serialized string and predict
bmodel1 = base64.b64decode(smodel)
loaded_model1 = pickle.loads(bmodel1)
y_pred2 = loaded_model1.predict(X_test)
rmse2 = mean_squared_error(y_test, y_pred2)
print(f"RMSE: {rmse2:.2f}")
# Export the serialized model to ML models tables in the Eventhouse.
# Best practice to export the model name, training time and model string
import datetime
models_tbl = 'ML_Models'
now = datetime.datetime.now()
dfm = pd.DataFrame({'name':[model_name], 'timestamp':[now], 'model':[smodel]})
print(dfm)
# Use KqlMagic extension to store it in the Eventhouse (alternatively we could
# use the Spark connector)
reload_ext Kqlmagic
# Paste your Eventhouse query URI (from step #4)
%kql kusto://code;cluster='https://trd-m1ys9v9h1yb500xrgx.z5.kusto.fabric.microsoft.com';database='ADI_MSIT_EH_DEMO'
# Create a KQL query for storing the serialized model in the ML models table
store_model_query = '''
.set-or-append {0} <|
let tbl = dfm;
tbl
'''.format(models_tbl)
print(store_model_query)
# Store it in the Eventhouse
%kql -query store_model_query
10. Run the notebook, verify there were no errors in all cells.
11. In the queryset ‘ML-Scoring-via-Update-Policy’ paste the following KQL:
// Forecast using linear regression model from Scikit-learn
// This model was trained to forecast the temperature in 3 minutes based on
// current temperature & humidity, and temperature differences from 1m, 10m and // 20m ago
// predict_mlflow_b64_fl() - Predict using a trained MLflow model extracted as
// base64 string from ML models table
//
.create-or-alter function with (folder = "Packages\\ML", docstring = "Predict using a trained MLflow model extracted as base64 string from ML models table")
predict_mlflow_b64_fl(samples:(*), models_tbl:(name:string, timestamp:datetime, model:string), model_name:string, features_cols:dynamic, pred_col:string)
{
let model_str = toscalar(models_tbl | where name == model_name | top 1 by timestamp desc | project model);
let kwargs = bag_pack('smodel', model_str, 'features_cols', features_cols, 'pred_col', pred_col);
let code = ```if 1:
import pickle
import base64
smodel = kargs["smodel"]
features_cols = kargs["features_cols"]
pred_col = kargs["pred_col"]
bmodel = bmodel = base64.b64decode(smodel)
clf1 = pickle.loads(bmodel)
df1 = df[features_cols]
predictions = clf1.predict(df1)
result = df
result[pred_col] = pd.DataFrame(predictions, columns=[pred_col])
```;
samples
| evaluate python(typeof(*), code, kwargs)
}
//
// Create and set the update policy to ingest from weather_seattle to
// weather_seattle_regression_forecast
//
.create-or-alter function with (folder = "Packages\\UpdatePolicy", docstring = "Forecast temperature in 3 minutes using scikit-learn linear regression model")
update_regression_forecast_temp()
{
let recent_extents=toscalar(weather_seattle | summarize make_list(extent_id()));
let weather_seattle_full=database('ADI_MSIT_EH_DEMO').table('weather_seattle');
//
let dt=1m;
let horizon=3m;
let horizon_bins=toint(horizon/dt); // this is not needed for scoring real time new data but used in this demo as the labeled forecast
let n_lookback=20; // to calculate the features for the model.The features must match the training process that was done on Spark
let min_max_times_num = weather_seattle | summarize num=count(), min(stream_ingestion_time), max(stream_ingestion_time);
let min_time=toscalar(min_max_times_num | project min_stream_ingestion_time);
let max_time=toscalar(min_max_times_num | project max_stream_ingestion_time);
let new_samples=toscalar(min_max_times_num | project num);
let n_samples = new_samples + n_lookback;
let etime=max_time;
//
weather_seattle_full
| top n_samples by stream_ingestion_time
| order by stream_ingestion_time asc
| project stream_ingestion_time, temp, humidity, weather_time
| extend temp_diff_1=temp-prev(temp, 1), temp_diff_10=temp-prev(temp, 10), temp_diff_20=temp-prev(temp, 20)
| extend label_ingestion_time=next(stream_ingestion_time, horizon_bins), label_weather_time=next(weather_time, horizon_bins), label_temp=next(temp, horizon_bins)
| where isnotnull(temp_diff_20) // and isnotnull(label_temp)
| extend pred_temp=real(null)
| invoke predict_mlflow_b64_fl(ML_Models, 'Predict_temperature', pack_array('temp', 'humidity', 'temp_diff_1', 'temp_diff_10', 'temp_diff_20'), 'pred_temp')
| extend eid=recent_extents, etime // for debugging only
}
// Create the target table
.set weather_seattle_regression_forecast <|
update_regression_forecast_temp
| take 0
weather_seattle_regression_forecast
| getschema
.alter table weather_seattle_regression_forecast policy update
```
[
{
"IsEnabled": true,
"Source": "weather_seattle",
"Query": "update_regression_forecast_temp",
"IsTransactional": false,
"PropagateIngestionProperties": false
}
]
```
.show table weather_seattle_regression_forecast policy update
// Wait few minutes and check that the target table is being populated:
weather_seattle_regression_forecast
12. Run these queries and commands sequentially.
Summary
This blog demonstrates how to set up a workflow that processes streaming events for training an ML model on Spark notebook and use that model to score new streaming data directly on the Eventhouse, near the data. By leveraging Eventhouse python() plugin and update policy mechanism we can score new streaming events in real time without the need for set up any external orchestration.