Microsoft Fabric Updates Blog

Mastering Enterprise T-SQL ETL/ELT: A Guide with Data Warehouse and Fabric Pipelines

Developing ETL/ELT processes can become intricate when incorporating business logic, managing large datasets, and transferring substantial volumes of table data from the source to the target. This complexity is heightened in analytical tasks involving relational data, which may require a complete table reload or incremental updates. While traditionally managed with SQL variants (or any preferred relational database), the challenge arises in executing a robust, dynamic, and scalable ETL/ELT using T-SQL within Microsoft Fabric. The solution lies in utilizing Fabric Pipelines and Data Warehouse.

This article will concentrate on constructing the foundational elements of an enterprise-scale ETL/ELT framework using Fabric Pipelines and a Data Warehouse for performing our transformations in T-SQL. Additionally, we will examine a dynamic SQL script designed to incrementally process tables throughout your enterprise. Armed with the code examples and insights provided, you should feel equipped to begin developing your Data Warehouse with confidence. Furthermore, this metadata framework can be applied to execute workloads dynamically across various Fabric artifacts, even outside of a Data Warehouse context.

In order to handle the logic to incrementally update a table or fully reload a table in Fabric Data Warehouse we will need to create the following assets:

  • Metadata table in Fabric Data Warehouse
    • This will contain the configurations needed to load each table end to end
  • Metadata driven Fabric Pipelines
    • Parent and child pipeline templates that will orchestrate and execute the ETL/ELT end to end
  • Custom SQL logic for incremental processing
    • Dynamic SQL to perform the delete and insert based on criteria the user provides in the metadata table

*Important! This article uses Azure SQL DB as the source database. However, as a source, you can use databases like Fabric Lakehouses, Fabric Data Warehouses, Fabric Mirrored DBs, Azure SQL MI, On-Prem SQL, Synapse Dedicated Pools, MySQL, Oracle, and others. You will just need to adjust the query syntax/connections to match the desired source. This includes using the Lakehouse or other locations for the metadata table and utilize this metadata infrastructure without using a Data Warehouse in your workflow.

Scenario

There is a need to load SQL tables from an Azure SQL DB source on a daily frequency or multiple times a day. The requirements are to land the data first in OneLake within the Lakehouse as Delta Tables, and then finally load the tables into the Data Warehouse with the correct processing (incremental or full) while using a dynamic pipeline strategy to limit the number of objects used. 

Metadata Table

The first set up that is required in our dynamic ETL is going to be a metadata (sometimes called “config” or “control” table) table on the destination Fabric Data Warehouse. This table contains all of the information that is needed to pass into the Fabric pipelines to determine the source query, table information, which stored procedure to execute for each table, processing metadata, staging metadata, and other metadata critical to performing the ETL. An example of a metadata table design and sample are below. 

Metadata table definition (including schema ‘meta’ creation)

/*** Create schema and metadata table in Fabric Data Warehouse ***/
CREATE SCHEMA meta;
GO
CREATE TABLE [meta].[Fabric_Metadata](
	[TargetSchema] [varchar](25) NULL,
	[TargetTable] [varchar](100) NULL,
	[SourceSchema] [varchar](25) NULL,
	[SourceTable] [varchar](100) NULL,
	[StagingSchema] [varchar](25) NULL,
	[StagingTable] [varchar](100) NULL,
	[StagingDatabase] varchar(100) NULL,
	[LoadType] [varchar](25) NULL,
	[LoadIndicator] [varchar](25) NULL,
	[ETLStrategy] [varchar] (50) NULL,
	[StoredProcedureName] VARCHAR(500) NULL,
	[ColumnKey] [varchar](500) NULL,
	[NumberOfPrimaryKeyColumns] smallint null,
	[WaterfallColumn] [varchar](100) NULL,
	[TableColumns] [varchar](1000) NULL
) 
GO
/*** Populate metadata table in Fabric Data Warehouse with Adventure Works metadata. SalesLT sample data in the Azure SQL DB ***/

INSERT [meta].[Fabric_Metadata] ([TargetSchema], [TargetTable], [SourceSchema], [SourceTable], [StagingSchema], [StagingTable], [StagingDatabase], [LoadType], [LoadIndicator], [ETLStrategy], [StoredProcedureName], [ColumnKey],[NumberOfPrimaryKeyColumns], [WaterfallColumn], [TableColumns]) VALUES (N'dim', N'Customer', N'SalesLT', N'Customer', NULL, NULL, NULL, N'Full', N'Daily', N'Stored Procedure', N'[dim].[uspCustomer_FullReload]', NULL, NULL, NULL, N'*')
INSERT [meta].[Fabric_Metadata] ([TargetSchema], [TargetTable], [SourceSchema], [SourceTable], [StagingSchema], [StagingTable], [StagingDatabase], [LoadType], [LoadIndicator], [ETLStrategy], [StoredProcedureName], [ColumnKey],[NumberOfPrimaryKeyColumns], [WaterfallColumn], [TableColumns]) VALUES (N'dim', N'ProductModel', N'SalesLT', N'ProductModel', NULL, NULL, NULL, N'Full', N'Daily', N'Stored Procedure', N'[dim].[uspProductModel_FullReload]', NULL, NULL, NULL, N'*')
INSERT [meta].[Fabric_Metadata] ([TargetSchema], [TargetTable], [SourceSchema], [SourceTable], [StagingSchema], [StagingTable], [StagingDatabase], [LoadType], [LoadIndicator], [ETLStrategy], [StoredProcedureName], [ColumnKey],[NumberOfPrimaryKeyColumns], [WaterfallColumn], [TableColumns]) VALUES (N'dim', N'ProductDescription', N'SalesLT', N'ProductDescription', NULL, NULL, NULL, N'Full', N'Daily', N'Stored Procedure', N'[dim].[uspProductDescription_FullReload]', NULL, NULL, NULL, N'*')
INSERT [meta].[Fabric_Metadata] ([TargetSchema], [TargetTable], [SourceSchema], [SourceTable], [StagingSchema], [StagingTable], [StagingDatabase], [LoadType], [LoadIndicator], [ETLStrategy], [StoredProcedureName], [ColumnKey],[NumberOfPrimaryKeyColumns], [WaterfallColumn], [TableColumns]) VALUES (N'dim', N'Product', N'SalesLT', N'Product', NULL, NULL, NULL, N'Full', N'Daily', N'Stored Procedure', N'[dim].[uspProduct_FullReload]', NULL, NULL, NULL, N'*')
INSERT [meta].[Fabric_Metadata] ([TargetSchema], [TargetTable], [SourceSchema], [SourceTable], [StagingSchema], [StagingTable], [StagingDatabase], [LoadType], [LoadIndicator], [ETLStrategy], [StoredProcedureName], [ColumnKey],[NumberOfPrimaryKeyColumns], [WaterfallColumn], [TableColumns]) VALUES (N'dim', N'ProductCategory', N'SalesLT', N'ProductCategory', NULL, NULL, NULL, N'Full', N'Daily', N'Stored Procedure', N'[dim].[uspProductCategory_FullReload]', NULL, NULL, NULL, N'*')
INSERT [meta].[Fabric_Metadata] ([TargetSchema], [TargetTable], [SourceSchema], [SourceTable], [StagingSchema], [StagingTable], [StagingDatabase], [LoadType], [LoadIndicator], [ETLStrategy], [StoredProcedureName], [ColumnKey],[NumberOfPrimaryKeyColumns], [WaterfallColumn], [TableColumns]) VALUES (N'dim', N'ProductModelProductDescription', N'SalesLT', N'ProductModelProductDescription', NULL, NULL, NULL, N'Full', N'Daily', N'Stored Procedure', N'[dim].[uspProductModelProductDescription_FullReload]', NULL, NULL, NULL, N'*')
INSERT [meta].[Fabric_Metadata] ([TargetSchema], [TargetTable], [SourceSchema], [SourceTable], [StagingSchema], [StagingTable], [StagingDatabase], [LoadType], [LoadIndicator], [ETLStrategy], [StoredProcedureName], [ColumnKey],[NumberOfPrimaryKeyColumns], [WaterfallColumn], [TableColumns]) VALUES (N'dim', N'Address', N'SalesLT', N'Address', NULL, NULL, NULL, N'Full', N'Daily', N'Stored Procedure', N'[dim].[uspAddress_FullReload]', NULL, NULL, NULL, N'*')
INSERT [meta].[Fabric_Metadata] ([TargetSchema], [TargetTable], [SourceSchema], [SourceTable], [StagingSchema], [StagingTable], [StagingDatabase], [LoadType], [LoadIndicator], [ETLStrategy], [StoredProcedureName], [ColumnKey],[NumberOfPrimaryKeyColumns], [WaterfallColumn], [TableColumns]) VALUES (N'dim', N'CustomerAddress', N'SalesLT', N'CustomerAddress', NULL, NULL, NULL, N'Full', N'Daily', N'Stored Procedure', N'[dim].[uspCustomerAddress_FullReload]', NULL, NULL, NULL, N'*')
INSERT [meta].[Fabric_Metadata] ([TargetSchema], [TargetTable], [SourceSchema], [SourceTable], [StagingSchema], [StagingTable], [StagingDatabase], [LoadType], [LoadIndicator], [ETLStrategy], [StoredProcedureName], [ColumnKey],[NumberOfPrimaryKeyColumns], [WaterfallColumn], [TableColumns]) VALUES (N'fact', N'SalesOrderDetail', N'SalesLT', N'SalesOrderDetail', N'dbo', N'STAGE_SalesOrderDetail', N'BronzeLakehouse', N'Incremental', N'Daily', N'Stored Procedure', N'[etl].[uspIncremental_DynamicInsertUpdate]', N'rowguid', 1, N'ModifiedDate', N'*')
INSERT [meta].[Fabric_Metadata] ([TargetSchema], [TargetTable], [SourceSchema], [SourceTable], [StagingSchema], [StagingTable], [StagingDatabase], [LoadType], [LoadIndicator], [ETLStrategy], [StoredProcedureName], [ColumnKey],[NumberOfPrimaryKeyColumns], [WaterfallColumn], [TableColumns]) VALUES (N'fact', N'SalesOrderHeader', N'SalesLT', N'SalesOrderHeader', N'dbo', N'STAGE_SalesOrderHeader', N'BronzeLakehouse', N'Incremental', N'Daily', N'Stored Procedure', N'[etl].[uspIncremental_DynamicInsertUpdate]', N'SalesOrderID, rowguid, SalesOrderNumber', 3, N'ModifiedDate', N'*')

Sample output of metadata table

The ETL will be facilitated entirely from this metadata table. Any tables that are not included within this table, would not be executed in our ETL pipelines. Any new tables or work that are needed to be added, simply insert them into the metadata table and they will be available when the pipeline is triggered, without needing to alter the Fabric Pipelines. The metadata table allows you dictate where, what, and how of your ETL from one central location. This is a simple metadata table example, but you can make this as robust as you desire by adding in test flags, different load frequency indicators, and many others.

*Important! The examples in this article depict dimension tables as full reloads and fact tables as incremental reloads. However, it’s possible to encounter various scenarios where your business logic or needs do not align with this specific scenario. Such as, dimension tables are incrementally loaded, fact tables being full reload, or some other type of hybrid scenario. In such cases, it is absolutely supported. You will just update the metadata table with the necessary columns/values then adjust the accompanying logic in the pipelines and SQL scripts to reflect your specific ETL needs.

Metadata Driven Pipelines

Now that the metadata table is constructed, time to build the dynamic Fabric pipelines to orchestrate and execute the ETL.

Here are the Fabric Pipeline objects needed to execute the ETL for ‘N’ number of tables. These will be shown in steps below. It is important the note the power of the dynamic metadata driven pipelines, they are able to execute/facilitate an enterprise level ETL with only 3 pipelines and 2-3 connections.

*Note: Currently you are not able to parameterize your connections like in Azure Data Factory, so you will need to create a connection for each source, and Fabric object. Fabric objects in the same workspace are available by default with the correct workspace permissions. This feature is on the public roadmap, and you can refer to this documentation for the latest status of all Fabric Data Factory updates: What’s new and planned for Data Factory in Microsoft Fabric – Microsoft Fabric | Microsoft Learn

  • Connections
    • Source Azure SQL DB
    • Fabric Objects
      • Created by default for the Data Warehouse and other artifacts with the proper permissions.
  • Pipelines
    • Main Orchestration Pipeline
    • Full Reload Processing Pipeline
    • Incremental Processing Pipeline

Connection Creation

First, we need to create our connection to our Azure SQL DB source. We will not need to create connections for the Fabric artifacts within our workspace.

You will navigate to the ‘gear’ icon on the Fabric homepage for the settings page. Then under ‘Resources and extensions’, select ‘Manage connections and gateways’.

Then select the ‘New’ icon

You will need to select your appropriate gateway for your connection/security standards. Either On-Prem Data Gateway, Managed VNET Gateway, or Cloud Connection. For our scenario, we will keep it simple with the Cloud Connection.

Then, type in your server connection information including your authentication method. We will name our connection the same name as our server name for simplicity. You can use whichever naming standards make sense for your organization, as long as the name is meaningful (not something like ‘connection1’)

Pipeline and ETL Creation

With the metadata table, linked services, and datasets created, it is time to build out the metadata driven pipelines. The walkthrough below is split up into the 3 different pipelines, the main orchestration (parent) pipeline, full reload processing pipeline (child), and incremental load processing pipeline (child).

The overall path of the data will be Source System -> Lakehouse -> Data Warehouse.

Main Orchestration Pipeline:

The primary orchestration pipeline in this scenario is named “Main – Source to DW.” In production, this pipeline will be linked to a scheduled trigger that operates at your preferred frequency for running the ETL processes. It will oversee the execution of the entire ETL sequence. The pipeline’s design is laid out in the UI, where each activity is detailed.

There is one pipeline parameter called WaterfallLookBackDays. See below for detailed description about what this parameter means in this context.

  • The amount of days to incrementally process. Used only to find the changed rows in incremental data sources. Requires a reliable date stamp that corresponds to tracked inserts and updates.
    • Ex. rows that have been updated within the last 120 days (-120)
  • There are many ways to incrementally process, and this is just one used in the example. This article discusses using waterfall column/columns. You would just need to adjust the parameters, syntax, and dynamic script to fit your criteria.

There are 2 paths, one for an incremental reload (top flow) and one for a full reload (bottom flow). Both will be executed in parallel with this design. We will focus on the full reload first and cover the activities.

Full Reload Pattern/Queries:

Use a Script Activity to look up to extract the rows based on the SQL query criteria -> pass that result set to the for each loop and iterate over each table to perform the loading in the full load processing pipeline. 

*Note: It is recommended to use a Script Activity instead of a Lookup Activity due to the Lookup Activity having a 5,000-row limit where the Script Activity does not have a limit.

Script Activity- Full Reload:

Inside the Script Activity “Lookup Full Reload Tables”

The connection is set to the ‘Production-DataWarehouse’ which is the Fabric Data Warehouse where the metadata table lives. Also, this connection is already there for us since the Fabric artifacts in the workspace are already available.

The goal of this activity is to pull back all rows matching the criteria and the associated metadata. These columns will be all that is needed to plug and play all the full reload tables in the ETL.

At a high level, for the full reloads, we are grabbing

  1. Where am I pulling the data from
    • SourceSchema & SourceTable
  2. Where am I going with the data
    • TargetSchema & TargetTable
  3. What is the data going to be transformed/loaded into the Data Warehouse
    • StoredProcedureName
/*** Query the Data Warehouse's metadata table. This is case sensitive ***/
SELECT TargetSchema, TargetTable, SourceSchema, SourceTable, StoredProcedureName
FROM meta.Fabric_Metadata
WHERE LoadType= 'Full'

Sample Query Output

For Each Loop – Full Reload

The next step is to iterate through the output in the for each loop. Use the settings tab to define the items from the full reload lookup.

The ‘Items’ section is where you define which object to iterate over. The output from the previous script activity is used. The output is in a nested JSON array, so you will need to add some custom expression to reference the ‘resultSets’ array and the 1st item in the array.

Custom expression code

@activity('Lookup Full Reload Tables').output.resultSets[0].rows

Sample output from the script activity in JSON

Within the foreach loop, an execute pipeline activity will invoke the full load processing pipeline (child). The child pipeline requires certain parameters for execution (defined at the child pipeline), which will be the same parameters derived from the script activity in the parent pipeline.

Consequently, this pipeline will execute for each item (every table in our result set). It’s not necessary to create a separate pipeline for each child execution since you can include subsequent activities within the For Each Activity. However, encapsulating these executions in a pipeline can simplify monitoring and troubleshooting, in my view.

Full Reload – Processing Pipeline (child)

The processing pipeline called “Child – Full Reload LH to DW” is executed from the parent pipeline with the parameters being passed from parent to child. Because this pipeline is called within a for each loop, each table that is being iterated will have their own execution from this pipeline. The overall purpose and design of this pipeline is:

  1. Extract source data
  2. Load source data to the ‘BronzeLakehouse’
  3. Full reload of data from the ‘BronzeLakehouse’ to the Fabric Data Warehouse
These are pipeline parameters. There is no default value but you can place one if desired.

Copy Raw Data to Bronze Lakehouse

The metadata gathered from the pipeline parameters (originally from the script activity) is used to extract the source table data and load to a Delta table in the specified Fabric Lakehouse.

Within the Copy Activity, in the ‘Source’ is the Azure SQL DB connection that was created previously. As for the additional values, they will be pulled from the pipeline parameters.

*Note: You can parameterize the database to choose from here to pass in within the same server in the same way the other parameters or dynamic content are referenced. For simplicity of this walkthrough, it was not chosen.

Example of expression for pipeline parameter reference.

@pipeline().parameters.SourceSchema

In the ‘Destination’, the Fabric Lakehouse ‘BronzeLakehouse’ is chosen. This simulates the best practice of loading raw data into a data lake (in this specific scenario, OneLake via a Fabric Lakehouse).

It is important that the highlighted values are selected.

  • Root folder = Tables
    • This refers to the managed tables section of the lakehouse. This is reserved ONLY for managed Delta Tables. Fabric Pipelines will automatically convert the data to this format.
    • This is critical to the solution, as the managed Delta Table (Tables section) has a SQL Endpoint (read-only) attached to it. Which allows us to perform cross database T-SQL queries from the Data Warehouse or other SQL engines. The Files section is not readable with SQL, only Spark can read/write.
  • Table Action = Overwrite
    • This is not as critical as the above point.
    • Essentially, this will overwrite the existing files or create new ones if they don’t exist.
    • The append value may be desired, as that option inserts data only and will not overwrite existing data.
    • For this scenario, it is a full reload of the raw data so overwrite is the correct choice.

*Note: This follows a medallion architecture strategy, which is out of scope to cover in this article, but refer to this documentation for further information. Implement medallion lakehouse architecture in Microsoft Fabric – Microsoft Fabric | Microsoft Learn

The ‘Table name’ is chosen to be the ‘TargetTable’ value from the pipeline parameter (from the metadata table).

*Note: Currently Fabric Lakehouse tables do not support schemas. This feature is on the public roadmap which may change this connector to include a schema choice. If that is the case, then simply add in the ‘TargetSchema’ or whichever schema you would like from the metadata table just like in the ‘Source’ section of the Copy Activity. You can view to updates to the Lakehouse roadmaps here. What’s new and planned for Synapse Data Engineering in Microsoft Fabric – Microsoft Fabric | Microsoft Learn

Once the copy activity has completed, you can view the data in the Lakehouse directly. Under the ‘Tables’ section. Note that there was no need to convert the data from SQL to Delta parquet, Fabric Pipelines did it automatically.

Now that the data is loaded into the Fabric Lakehouse, it will be extracted into the Data Warehouse. While there are numerous methods to transfer data within Fabric, our focus will be on utilizing T-SQL stored procedures.

Since the data resides in Delta Tables in the Lakehouse, we can access it using T-SQL. This data can be referenced from the Data Warehouse using a three-part naming syntax for cross-database queries: [database].[schema].[tableName]. Utilizing SSMS (SQL Server Management Studio) provides the best visualization when connected to the Data Warehouse. With the appropriate permissions, you can connect to any Lakehouse, Data Warehouse, and Database Mirror within your workspace directly from the Data Warehouse.

Here are my items in the Fabric workspace ‘FabricDemo’

  • BronzeLakehouse
  • SilverLakehouse
  • GoldLakehouse
  • Production-DataWarehouse
View of Fabric artifacts in the ‘FabricDemo’ workspace

Let’s view these artifacts in SSMS. You can find the connection string for the Data Warehouse under the ellipsis -> ‘Copy SQL connection string’:

After connecting in SSMS, you can view the out-of-the-box access to various artifacts on the SQL endpoint, including the Delta Tables that were created in the previous Copy Activity. These appear as separate databases, which is a crucial aspect of the solution.

Copy Tables from Bronze Lakehouse to Data Warehouse

In this step, an Execute Stored Procedure Activity targets the Data Warehouse. This stored procedure may encompass the full complexity of your ETL within the SQL script. It allows referencing Lakehouse tables, Mirrored DB tables, or Data Warehouse tables within the same query, along with any additional business logic required.

The stored procedure utilized is the one specified in the metadata table within the pipeline parameters. Parameters can be passed to the stored procedure from Fabric Pipelines, although not in this particular scenario.

An example of the stored procedure that is called. This is a simple Create Table As Select (CTAS) that references the BronzeLakehouse table as the source for the full reload. You can make your stored procedures as complex/robust as needed.

*’SELECT *’ is bad practice and is only used for simplicity. DBAs, please let it slide 🙂

CREATE PROCEDURE [dim].[uspAddress_FullReload] AS
BEGIN

	DROP TABLE IF EXISTS dim.Address;

	CREATE TABLE dim.Address
	AS
	SELECT * 
	FROM BronzeLakehouse.dbo.Address

END
GO

Incremental Load Pattern/Queries

The incremental processing load will be quite similar to the full reload processing method. The differences lie in the filtering of script activities, the inclusion of additional parameters, an extra stored procedure execution, and varied methods for executing the loading within the processing pipeline.

Script Activity- Full Reload:

Back in the parent pipeline, this script activity will be the exact same configuration as the full reload version, with the minor changes of ‘Incremental’ being hard coded as the WHERE clause ‘LoadType’ filter instead of ‘Full’, and adding the additional columns to the SELECT query.

Additional Columns & the parent pipeline parameter that will be used:

  • StagingTable
    • As this process is incremental, we cannot overwrite the existing production table. Therefore, there is a distinct table named with the prefix ‘STAGE_’ to differentiate it from the standard tables.
    • This table is intended for a complete reload from the source, after which it will be compared with the Data Warehouse table that holds the entire history.
    • In this solution, the staging table will live in the BronzeLakehouse.
  • ColumnKey
    • This is the unique key of the source table. It will usually be a primary key, composite key, or any combination of columns that identify a unique record on the table.
    • Can be one or many columns. The dynamic SQL script will handle both.
  • NumberOfPrimaryKeyColumns
    • An integer value of the number of key columns on the source table. This is needed in the second step of the dynamic ETL solution.
  • WaterfallLookBackDays
    • Pipeline Parameter
    • Only used in the incremental loading process
    • The amount of days to incrementally process. Used only to find the changed rows in incremental data sources. Requires a reliable date stamp that corresponds to tracked inserts and updates.
      • Ex. rows that have been updated within the last 120 days (-120)
    • There are many ways to incrementally process, and this is just one used in the example. This article discusses using waterfall column/columns. You would just need to adjust the parameters, syntax, and dynamic script to fit your criteria.
  • WaterfallColumn
    • The reliable date or column used to track changes.
    • Can vary from solution to solution just like the WaterfallLookBackDays parameter section.
    • Adjust/use as you see fit.
  • TableColumns
    • The columns on the source that you are looking to extract.
    • Sometimes in schema drift scenarios and in best practice, you should list out the columns that are used in the query. This can be beneficial for several scenarios.
    • In this scenario, ‘*’ is used for simplicity. Again, DBAs please let this slide.
/*** Query the Data Warehouse's metadata table. This is case sensitive ***/
SELECT 
	TargetSchema, 
	TargetTable, 
	SourceSchema, 
	SourceTable, 
	StagingTable, 
	ColumnKey, 
	NumberOfPrimaryKeyColumns, 
	WaterfallColumn, 
	TableColumns
FROM 
	meta.Fabric_Metadata
WHERE 
	LoadType= 'Incremental'

Sample of the query output. Take note of the ‘ColumnKey’ column for the next section.

Execute Stored Procedure – Prep Primary Key Temporary Lookup Table

In order for the dynamic incremental SQL script to function, there needs to be some prep for the column keys. The script (shown in the later steps) will dynamically build a WHERE clause based on the primary keys. To do this, you have to split the string of comma concatenated column names like the ones in the above screenshot.

The Data Warehouse currently does not support temporary tables, table variables, and other T-SQL surface area functionality. So instead of dynamically creating a lookup table unique to each table to house the split-out string values, I used a single procedure to put all of the incremental tables’ column keys in a “temp” table (one that is dropped and rebuilt every call). This is normally completed within the single dynamic SQL script with CTEs and temp tables. There is a link at the bottom of the article to view what that looks like in a fully supported solution with ADF and Azure SQL.

The code within the procedure is admittedly a little clunky, and that is because some functions, in addition to the unsupported features mentioned above, that would make this request a lot easier are not supported yet (I am talking about you STRING_SPLIT). So, I had to get creative with the T-SQL to accomplish this.

You can view the full current T-SQL limitations here: T-SQL surface area – Microsoft Fabric | Microsoft Learn

*Note: There are a lot of planned T-SQL feature support on the public release plan roadmap. So, this solution will become less clunky in time. You can view the roadmap for the Data Warehouse here: What’s new and planned for Synapse Data Warehouse in Microsoft Fabric – Microsoft Fabric | Microsoft Learn

Script for the stored procedure in the ‘etl’ schema etl.uspIncremental_PrimaryKeyList_Update

CREATE PROCEDURE etl.uspIncremental_PrimaryKeyList_Update AS
BEGIN

DECLARE @TargetTable VARCHAR (500);
DECLARE @NumberOfPrimaryKeyColumns INT;
DECLARE @PrimaryKeyList VARCHAR(256);
DECLARE @DropTemporaryTableScript NVARCHAR(4000);
DECLARE @CreateTemporaryTableScript NVARCHAR(4000);
DECLARE @InsertTemporaryTableScript NVARCHAR (4000);

SET @NumberOfPrimaryKeyColumns = (SELECT MAX(NumberOfPrimaryKeyColumns) FROM meta.Fabric_Metadata)
 
DROP TABLE IF EXISTS temp.PrimaryKeyList ;
CREATE TABLE temp.PrimaryKeyList (TableName VARCHAR(500) NULL, PrimaryKeyColumn VARCHAR(500) null, PrimaryKeyOrder SMALLINT NULL)


DECLARE @num INT
SET @num = 1

WHILE (@num <= @NumberOfPrimaryKeyColumns)
BEGIN
	INSERT INTO  temp.PrimaryKeyList (TableName, PrimaryKeyColumn, PrimaryKeyOrder) 
	SELECT 							
		TableName = TargetTable,
		PrimaryKeyColumn = REVERSE(PARSENAME(REPLACE(REVERSE(ColumnKey), ',', '.'), @num)),
		PrimaryKeyOrder = @num
	FROM
		meta.Fabric_Metadata
	WHERE 
		NumberOfPrimaryKeyColumns IS NOT NULL
		AND REVERSE(PARSENAME(REPLACE(REVERSE(ColumnKey), ',', '.'), @num)) IS NOT NULL
						 
  SET @num = @num + 1
END

END

Sample output from the “temp” table that gets created

For Each Loop – Incremental Reload

The next step is to iterate through the output in the for each loop. Use the settings tab to define the items from the incremental reload lookup. This will look almost identical to the full reload for each loop, with the exception it is using the output from the incremental script activity instead of the full load version.

Custom expression code

@activity('Lookup Incremental Reload Tables').output.resultSets[0].rows

Sample output from the script activity in JSON

Within the foreach loop, an execute pipeline activity will invoke the incremental load processing pipeline (child). The child pipeline requires certain parameters for execution (defined at the child pipeline), which will be the same parameters derived from the script activity in the parent pipeline and the one exception of the pipeline parameter ‘WaterfallLookBackDays’ being used in addition.

Incremental Reload – Processing Pipeline (child)

The processing pipeline called “Child – Incremental Load LH to DW” is executed from the parent pipeline with the parameters being passed from parent to child. Because this pipeline is called within a for each loop, each table that is being iterated will have their own execution from this pipeline. This is assuming that a full load/sync has already occurred. The overall purpose and design of this pipeline is:

  1. Extract ONLY subset of source data
    • This scenario uses the waterfall column and a defined period of time. But you can use whatever method that you use to identify changes. CDC, Change Tracking, Waterfall columns, Hash columns, etc. You would just need to adjust the logic/metadata.
  2. Load subset of “changed” source data to the ‘BronzeLakehouse’ in a staging table
  3. Incrementally reload of data from the ‘BronzeLakehouse’ to the Fabric Data Warehouse
    • This will be completed by a dynamic SQL stored procedure that is shared by all incremental tables.

Copy Raw Data to Bronze Lakehouse

The metadata gathered from the pipeline parameters (originally from the script activity in the parent pipeline) is used to extract the source table data and load to a Delta table in the specified Fabric Lakehouse. In the incremental side, it will be a custom query instead of a full table read. Only a subset of the changed data is desired.

Within the Copy Activity, in the ‘Source’ is the Azure SQL DB connection that was created previously. As for the additional values, they will be pulled from the pipeline parameters.

*Note: This example utilizes Azure SQL DB as the source; however, you can select any database (compatibility dependent) as the source, modify the identification process, and adjust the syntax and query accordingly.

Custom expression code

@concat(
    'SELECT * 
     FROM ', '[', pipeline().parameters.SourceSchema, '].[', pipeline().parameters.SourceTable, '] ',
     'WHERE ', 'CONVERT(DATE, ', pipeline().parameters.WaterfallColumn, ') >= DATEADD(DAY,', pipeline().parameters.WaterfallLookBackDays, ', GETDATE())'  )

Sample output of pipeline expression

In the ‘Destination’, the Fabric Lakehouse ‘BronzeLakehouse’ is chosen. This simulates the best practice of loading raw data into a data lake (in this specific scenario, OneLake via a Fabric Lakehouse). This will be similar to the Full Reload section, where only the ‘StagingTable’ parameter is used instead of the ‘TargetTable’ for the table name in the Lakehouse.

Be sure to follow the important instructions for the ‘Root folder’ & ‘Table action’ selections. If your incremental data is only inserts then you can certainly choose the ‘Append’ table action to keep a full history in the Lakehouse if desired.

We can visualize our new staging tables within the Fabric UI and SSMS.

Copy Incremental Tables from Bronze Lakehouse to Data Warehouse

In this step, an Execute Stored Procedure Activity targets the Data Warehouse. The stored procedure allows referencing Lakehouse tables, Mirrored DB tables, or Data Warehouse tables within the same query, along with any additional business logic required.

This step is what actually performs the incremental processing of the tables. It will delete the rows that are in the staging table (simulating that data has changed and needs to be either updated or inserted) and then insert the staging table rows into the production table. The script will be provided and explained below. 

*Note: Should you wish to apply distinct procedures to your incremental tables similar to those used for full reloads, this is entirely feasible. The same procedures would be defined in the metadata table for the incremental tables as with the full reloads. You could even parameterize your stored procedure parameters as well and pass them from the metadata table. The purpose of this is to demonstrate an example of straightforward scalability and repeatable intellectual property.

CREATE PROCEDURE etl.uspIncremental_DynamicInsertUpdate @TargetTable VARCHAR(50) AS 
/**************************************************************************************************************
This script is not intended as best practice or a supported Microsoft solution. It is only an example on one method to dynamically
execute an incremental ETL with 1 stored procedure. 
CREATED BY: Marc Bushong, Sr. Cloud Solution Architect @Microsoft
**************************************************************************************************************/
BEGIN


--DECLARE @TargetTable VARCHAR (50) -- Uncomment for manual runs inside of the stored procedure. 
DECLARE @StagingDatabase VARCHAR(50)
DECLARE @StagingSchema VARCHAR (50)
DECLARE @StagingTable VARCHAR (500)
DECLARE @WhereClause VARCHAR(MAX) 
DECLARE @TargetSchema VARCHAR (50)
DECLARE @FullStagingTableName VARCHAR (500)
DECLARE @FullTargetTableName VARCHAR (500)
DECLARE @TargetTableColumnList NVARCHAR(MAX)
DECLARE @DeleteStatementSQL NVARCHAR (MAX)
DECLARE @InsertStatementSQL NVARCHAR (MAX)
DECLARE @StatisticsUpdateSQL NVARCHAR (MAX)

--SET @TargetTable = 'SalesOrderHeader' -- Uncomment for manual runs inside of the stored procedure. 
SET @TargetSchema = (SELECT TargetSchema FROM meta.Fabric_Metadata WHERE TargetTable = @TargetTable)
SET @StagingDatabase = (SELECT StagingDatabase FROM meta.Fabric_Metadata WHERE TargetTable = @TargetTable)
SET @StagingSchema = (SELECT StagingSchema FROM meta.Fabric_Metadata WHERE TargetTable = @TargetTable)
SET @StagingTable = (SELECT StagingTable FROM meta.Fabric_Metadata WHERE TargetTable = @TargetTable)
SET @FullStagingTableName = CONCAT(@StagingDatabase, '.', @StagingSchema, '.', @StagingTable)
SET @FullTargetTableName = CONCAT(@TargetSchema, '.', @TargetTable)
SET @TargetTableColumnList = (	
								SELECT 
									ColumnList = STRING_AGG('[' + col.NAME + ']', ',' )
								FROM
									sys.tables tab
										LEFT JOIN 
									sys.schemas sch
										ON tab.schema_id = sch.schema_id
										LEFT JOIN 
									sys.columns col
										ON tab.object_id = col.object_id
								WHERE 
									sch.name = @TargetSchema
									AND tab.name = @TargetTable
									--AND col.is_identity = 0 --Uncomment when identity columns are supported in the Data Warehouse. Not needed for now.
							)
 ;

 /******* Section for single primary key OR Keys that do not need to be concated to be uniquely identified *********************/
        SELECT
			@WhereClause =  
							STRING_AGG(CASE 
                                            WHEN E.PrimaryKeyColumn IS NOT NULL THEN CONCAT( Beg.PrimaryKeyColumn,' IN (SELECT ', Beg.PrimaryKeyColumn, ' FROM ', @FullStagingTableName, ') AND')
                                            ELSE CONCAT( Beg.PrimaryKeyColumn,' IN (SELECT ', Beg.PrimaryKeyColumn, ' FROM ', @FullStagingTableName, ')' )
                                        END, ' ')

        FROM 
            temp.PrimaryKeyList Beg
                LEFT JOIN
            temp.PrimaryKeyList E
                ON Beg.TableName = E.TableName
				AND Beg.PrimaryKeyOrder = E.PrimaryKeyOrder + 1 
		WHERE
			Beg.TableName = @TargetTable
                ;

/********Build the delete and insert SQL statements. Strings to be executed. ********************************************************/
 
SELECT
    @DeleteStatementSQL = CONCAT('DELETE FROM ', @FullTargetTableName, ' WHERE ', @WhereClause) ;
 
SELECT 
    @InsertStatementSQL = CONCAT('INSERT INTO ', @FullTargetTableName, ' (', @TargetTableColumnList, ') ', ' SELECT ', @TargetTableColumnList, ' FROM ', @FullStagingTableName)
 

--PRINT @DeleteStatementSQL
--PRINT @InsertStatementSQL
 
EXECUTE sp_executesql @DeleteStatementSQL ; 

EXECUTE sp_executesql @InsertStatementSQL ;

END

See examples of the different steps of the script below for the table ‘SalesOrderHeader’

Metadata table results for the staging table, target table, and the primary keys for the target table ‘SalesOrderHeader’. You will notice that this table has multiple primary keys to provide the unique record for the data. This script will handle multiple primary keys or single primary keys in a method shown later. 

First, the variables are built. One important variable is @TargetTableColumnList which compiles a comma separated list of the target table columns from the system tables. You will not need to maintain the columns in the target table since the script will compile a list from the system tables and, when identity columns are supported in the Data Warehouse, exclude identity columns since these are not updated or inserted. If that is needed, then logic can be added to turn the identity insert on and off in the script. 

The next step is to build the WHERE clause of our delete statement. This is done by using the column keys and splitting them out into different predicates. Executing the code down to the @WhereClause creation will produce this output. This step is using the “temp” table that we executed in the beginning of the pipeline.

Output of @WhereClause

@WhereClause = SalesOrderNumber IN (SELECT  SalesOrderNumber FROM BronzeLakehouse.dbo.STAGE_SalesOrderHeader) AND  rowguid IN (SELECT  rowguid FROM BronzeLakehouse.dbo.STAGE_SalesOrderHeader) AND SalesOrderID IN (SELECT SalesOrderID FROM BronzeLakehouse.dbo.STAGE_SalesOrderHeader)

Next, the delete and insert statements are created using the dynamic SQL in the script and previous steps. Here are the outputs from our example. 

Delete statement

DELETE FROM fact.SalesOrderHeader WHERE  SalesOrderNumber IN (SELECT  SalesOrderNumber FROM BronzeLakehouse.dbo.STAGE_SalesOrderHeader) AND  rowguid IN (SELECT  rowguid FROM BronzeLakehouse.dbo.STAGE_SalesOrderHeader) AND SalesOrderID IN (SELECT SalesOrderID FROM BronzeLakehouse.dbo.STAGE_SalesOrderHeader)

Insert statement

INSERT INTO fact.SalesOrderHeader ([SalesOrderID],[RevisionNumber],[OrderDate],[DueDate],[ShipDate],[Status],[OnlineOrderFlag],[SalesOrderNumber],[PurchaseOrderNumber],[AccountNumber],[CustomerID],[ShipToAddressID],[BillToAddressID],[ShipMethod],[CreditCardApprovalCode],[SubTotal],[TaxAmt],[Freight],[TotalDue],[Comment],[rowguid],[ModifiedDate])  SELECT [SalesOrderID],[RevisionNumber],[OrderDate],[DueDate],[ShipDate],[Status],[OnlineOrderFlag],[SalesOrderNumber],[PurchaseOrderNumber],[AccountNumber],[CustomerID],[ShipToAddressID],[BillToAddressID],[ShipMethod],[CreditCardApprovalCode],[SubTotal],[TaxAmt],[Freight],[TotalDue],[Comment],[rowguid],[ModifiedDate] FROM BronzeLakehouse.dbo.STAGE_SalesOrderHeader

Finally, those statements are passed into sp_executesql to be executed. 

Notes on Future Design

As mentioned throughout the article, there are features that are planned or may come in the future that simplify, improve the scalability, and enhance the overall design of these solutions. I will try to update this article with those features accordingly. Below I have provided some additional documentation to keep track of these potential feature updates.

Summary

Using the provided template and scripts, you can construct a dynamic, metadata-driven ETL process within a T-SQL infrastructure in Fabric at an enterprise scale. This can be achieved with as few as three pipelines to manage any number of tables in the Data Warehouse. The approach is highly adaptable and scalable, allowing for customization to meet specific needs. Complex requirements or change tracking logic, beyond waterfall columns or composite keys, can also be integrated into this process. Furthermore, this metadata framework can be applied to execute workloads dynamically across various Fabric artifacts, even outside of a Data Warehouse context.

Povezane objave v spletnem dnevniku

Mastering Enterprise T-SQL ETL/ELT: A Guide with Data Warehouse and Fabric Pipelines

junij 24, 2024 avtor: Justin Barry

When we talk about Microsoft Fabric workspace collaboration, a common scenario is developers and their teams using a shared workspace environment, which means they have access to “live items”. A change made directly within a workspace would override and affect all other developers or users utilizing that workspace. This is where git becomes increasingly important … Continue reading “Microsoft Fabric Lifecycle Management: Getting started with development in isolation using a Private Workspace”

junij 18, 2024 avtor: RK Iyer

✎ Co-Author – Abhishek Narain Overview Building an effective Lakehouse starts with establishing a robust ingestion layer. Ingestion refers to the process of collecting, importing, and processing raw data from various sources into the data lake. Data ingestion is fundamental to the success of a data lake as it enables the consolidation, exploration, and processing … Continue reading “Demystifying Data Ingestion in Fabric: Fundamental Components for Ingesting Data into a Fabric Lakehouse using Fabric Data Pipelines”