Microsoft Fabric Updates Blog

Microsoft Fabric changing the game: Exporting data and building the Lakehouse

Fabric changed the game for analytics with more integration and enhancements for the analytics platform. As defined by the learn doc, What is Microsoft Fabric: “Microsoft Fabric is an all-in-one analytics solution for enterprises that covers everything from data movement to data science, Real-Time Analytics, and business intelligence. It offers a comprehensive suite of services, including data lake, data engineering, and data integration, all in one place.”

I plan to develop a post series covering Fabric from an end-to-end perspective. As part of this series, I will incorporate some scripts that were initially developed for the Synapse experience on Fabric. The purpose of reusing these scripts is to demonstrate how easily you can implement the same logic you already have in your Synapse environment in various scenarios.

Scenario: Export Data from Azure SQL Database into OneLake. Data will land in a raw/bronze zone. Data must be exported in parallel and saved in parquet format.

I am reusing part of those 2 blogs scripts and adapting to the feature existing on Fabric.

Check out End-to-end tutorials in Microsoft Fabric. These tutorials guide you through a scenario that covers the entire process, from data acquisition to data consumption in Fabric. Assuming you already have at least one workspace created, let’s explore how to adapt notebook scripts originally developed for Synapse and seamlessly utilize them in the Fabric environment.

Usually in Synapse, we recommend that customers use thread pool which is an abstract of the executor class and it enables them to reuse the same session across multiple notebooks in parallel. However, when working with Fabric, you already have a configuration in place that facilitates parallel execution of notebooks by reusing the same session and fairly distributing executors. Hence, you do not need the thread pool anymore, which is very cool!

You just need to go Workspace > Workspace Settings and check that High Concurrency is set to On:

A screenshot of a computer

Description automatically generated with medium confidence

Fig 1- High Concurrency

I tested the difference in execution with and without the setting. The difference in performance is significant, with the notebooks taking twice as long to execute when this setting is turned off. Considering the notable impact it has on execution time, it is strongly recommended to keep this setting enabled, as it is set to the default configuration.

Back to our scenario solution. Next, let us create a Lakehouse and add a new notebook to run the code and fulfill the requirements. In my Lakehouse example, I name it SQLDB_Synapse.

A screenshot of a computer

Description automatically generated

Fig 2 – LH

From the Lakehouse create a new notebook by selecting New notebook.

A screenshot of a computer

Description automatically generated with medium confidence

Fig 3-NK

Here you have the Lakehouse and notebook code. But…. What happens if you still want to reuse the Synapse thread pool scripts in Fabric? Can you? Yes, you can. I advise you to test and if your tests show any advantage. Why not?

To meet the expectations of my scenario, I am leveraging the logic presented in the article Essential tips for exporting and cleaning data with Spark. In this case, the notebooks are executed in parallel, and the data is exported from an Azure SQL Database. However, instead of exporting to a Storage Account, I am utilizing OneLake as the destination for the data. And again, you can keep the thread pools or remove them since the notebooks in Fabric uses the High Concurrency Configuration.

Follow the Python code:

from concurrent.futures import ThreadPoolExecutor

timeout = 3600 # 3600 seconds = 1 hour

notebooks = [

{"path": "notebook1", "params": {"param1": "value1"}},

{"path": "notebook2", "params": {"param2": "value2"}},

{"path": "notebook3", "params": {"param3": "value3"}},

]

with ThreadPoolExecutor() as ec:

for notebook in notebooks:

ec.submit(mssparkutils.notebook.run, notebook["path"], timeout, notebook["params"])

Notebook interactive code

#set variable to be used to connect the database

database = "AdventureWorks2017"

table = parameterString

user = "USERHERE"

password = "PASSWORDHERE"

jdbcDF = spark.read \

.format("jdbc") \

.option("url", f"jdbc:sqlserver://SERVERNAME.database.windows.net:1433; database=AdventureWorks2017") \

.option("dbtable", table) \

.option("user", user) \

.option("password", password).load()

jdbcDF.write.mode("overwrite").parquet("/SQLDB_intrc/Tables/" + parameterString )

However, while rethinking this example I made some slight changes to my script, and I added here a try\exception to get the error in case of one my notebook executions fail.

Here are the changes – without the thread pool:

notebooks = [

{"path": "notebook1", "params": {"param1": "value1"}},

{"path": "notebook2", "params": {"param2": "value2"}},

{"path": "notebook3", "params": {"param3": "value3"}},

]

for notebook in notebooks:

try:

mssparkutils.notebook.run(notebook["path"], timeout, notebook["params"])

except Exception as e:

print(f"Exception occurred in notebook '{notebook['path']}': {e}")

OR if you prefer with a thread pool, follow the example. Here, I defined the try\except inside of a function first and then called when using the thread pool executor:

def func_notebook_Error_handle(notebook):

try:

mssparkutils.notebook.run(notebook["path"], timeout, notebook["params"])

except Exception as e:

error_message = f"Exception occurred in notebook '{notebook['path']}': {e}\n"

# Create a ThreadPoolExecutor

with ThreadPoolExecutor() as executor:

# Submit notebook executions to the executor

notebook_tasks = [executor.submit(func_notebook_Error_handle, notebook) for notebook in notebooks]

That is the result – my adventureworks was exported from Azure SQL Database to the OneLake in a parquet file format as Fig 5 – OneLake, shows.

A screenshot of a computer

Description automatically generated with medium confidence

Fig 5 – OneLake

The idea of the OneLake as the doc mention is: “OneLake as a single, unified, logical data lake for the whole organization”.

Note: OneLake comes automatically with every Microsoft Fabric tenant with no infrastructure to manage.

So, on top of my OneLake, I can have the Lakehouse using my Spark Scripts or/and the Warehouse using T-SQL language. Moving a little bit forward let me add one more requirement to my scenario. Let’s suppose I need to build a table in my Lakehouse using the information I got from the Azure SQL Database. My requirements are:

  • Join the files from the table Purchasing.PurchaseOrderDetail with Production.Product using ProductID
  • Summarize the columns UnitPrice, OrderQty and StockedQty
  • Null values for the column Color from Production.Product should be handle as “No Values/Unknow”
  • Group by the following columns: purchase_order_detail.ModifiedDate

, purchase_order_detail.ProductID, product.ProductNumber, product.MakeFlag,     product.FinishedGoodsFlag, product.Color, product.SafetyStockLevel

Follow the diagram that I got from Azure SQL Database, as Fig 6- Diagram, shows:

A screenshot of a computer

Description automatically generated with medium confidence

Fig 6- Diagram

Ok. Easy piece requirements, for the solution let’s read the files into data frames using Python, join through the key column, group by as required, and handle the null values with the fillna method from pandas (pandas.DataFrame.fillna — pandas 2.0.2 documentation (pydata.org)). In the end, I will save it as a table in my Lakehouse.

from pyspark.sql import SparkSession

from pyspark.sql.functions import sum

import pandas

# Read the PurchaseOrderDetail and Product tables

purchase_order_detail = spark.read.load('Files/R/SQLDB_test/Tables/Purchasing.PurchaseOrderDetail/*.parquet', format='parquet')

product = spark.read.load('Files/R/SQLDB_test/Tables/Production.Product/*.parquet', format='parquet')

# Perform the join and aggregation

join_result = purchase_order_detail.join(product, "ProductID") \

.groupBy(purchase_order_detail.ModifiedDate, purchase_order_detail.ProductID, product.ProductNumber, product.MakeFlag,\

product.FinishedGoodsFlag, product.Color, product.SafetyStockLevel) \

.agg(

sum("UnitPrice").alias("UnitPrice"),

sum("OrderQty").alias("OrderQty"),

sum("StockedQty").alias("StockedQty")

)

# Fill NA values in the "Color" column

join_result = join_result.fillna("No Values/Unknow", subset=["Color"])

#show results

join_result.show()

#save as table

join_result.write.format("delta").mode("overwrite").saveAsTable("Product_Purchase_Consolidate")

Following results:

A screenshot of a computer

Description automatically generated with medium confidence

Fig 7- results

As you can see, I have my table created on the Lakehouse.

Another interesting new application that I can use to check the files existent at the OneLake in Fabric is the OneLake file explorer. As mentioned by the docs, “It is an application seamlessly integrates OneLake with Windows File Explorer. This application automatically syncs all OneLake items that you have access to in Windows File Explorer.” Using the OneLake file explorer, I can check how this table is stored in Fabric as well as the parquet files that were just exported from Azure SQL Database.

OneLake explorer shows the application integrated with my windows explorer:

A screenshot of a computer

Description automatically generated with medium confidence

Fig 8 – OneLake Explorer

Before concluding this post, I would like to share one intriguing aspect. By exploring the Warehouse functionality, I can create a new table using the Select Into operation pointing to the Lakehouse. This level of integration signifies the ability to perform cross-queries between the Lakehouse and Warehouse seamlessly. Here, I present a simple yet compelling example where I query the recently created table within the Lakehouse, and using the Select Into operation, I create a new table in my Warehouse with identical data.

A close-up of a computer screen

Description automatically generated with low confidence

Fig 9 – Warehouse

Follow the table created:

A screenshot of a computer

Description automatically generated with medium confidence

Fig 10 – Table

Using the OneLake explorer again, here is my Delta table inside of the Warehouse.

A screenshot of a computer

Description automatically generated with medium confidence

Fig 11 – Delta

Summary

The Fabric brings more integration to your analytics environment. If you want, you can reuse your Spark/Python scripts from Azure Synapse to the new Synapse experience in Fabric. Note that Microsoft Fabric is still in preview, so take your time to understand how it works and how it integrates as a solution end-to-end in the analytics world.

Entradas de blog relacionadas

Microsoft Fabric changing the game: Exporting data and building the Lakehouse

septiembre 5, 2024 por Brad Watts

As part of the consumption model for Eventhouse, we are enabling Standard Storage consumption. This change is scheduled to start rolling out the week of September 16th. This means that you will start seeing billable consumption of the OneLake Storage Data Stored meter from the Eventhouse and KQL Database artifacts. For details on all things … Continue reading “Announcement: Eventhouse Standard Storage Billing”

agosto 28, 2024 por Adi Eldar

Anomaly Detector, one of Azure AI services, enables you to monitor and detect anomalies in your time series data. This service is based on advanced algorithms, SR-CNN for univariate analysis and MTAD-GAT for multivariate analysis and is being retired by October 2026. In this blog post we will lay out a migration strategy to Microsoft Fabric, allowing … Continue reading “Advanced Time Series Anomaly Detector in Fabric”