Microsoft Fabric Updates Blog

Fabric Data Pipelines – Advanced Scheduling Techniques (Part 1)

Sean Mirabile | Sr. Program Manager | Microsoft Fabric CAT

Special thanks to all of the technical reviewers: Kevin Lee, Bret Myers, Sundar Easwaran, and Mallikarjun Appani

Content

Introduction

Welcome to the blog series on Advanced Scheduling techniques in Microsoft Fabric Data Pipelines. We are going to cover a few different scheduling scenarios and the data pipeline techniques for solving them. This guide is intended to be short-lived as many of the scenarios will be achieved using native features in the future. However, if you need to handle one or more of these scenarios, this guide aims to help guide you to a temporary working alternative.

The three scenarios we will cover in this series:

  • ADLS Gen2 Event Trigger (on create) – executing a data pipeline when a new file is created in a specific ADLS location
  • Specific Day – executing a data pipeline on a specific day of the month (first of the month, last of the month, specific date, etc…)
  • Multiple Data Pipelines for a single Schedule

Part 1: ADLS Gen2 Event Triggers (on create)

In ADF and Synapse, Storage Event Triggers will execute specified pipelines when there is an event in an Azure Data Lake Storage Gen2 account, such as the creation or deletion of a file.

When a file drops in the location, the Last Modified field is updated is the event that we want to capture. Using the Get Metadata Advanced Filter by Last Modified option, we can limit the returning Child Items to only files created or modified to a specified range. Please note that we would want to point to the lowest level folder where files will land. If we point the Get Metadata activity to a folder with another folder, it will not pick up the files landing in that child-folder.

To replicate the ADLS Gen2 (storage) Event Trigger logic, we can create a Data Pipeline that is tied to some Schedule. When this Schedule invokes the Data Pipeline below, the Data Pipeline will calculate a datetime value equal to the last time it ran. Then the Data Pipeline will use this datetime to retrieve a list of files that were created and/or last modified since that datetime. We can then iterate over this list of files and route each to some ETL/ELT process, such as Copying from ADLS Gen2 into the Fabric Lakehouse. So, let’s look into the components of this Data Pipeline and walk through how we can create it.

A screenshot of a computer

Description automatically generated

Data Pipeline Settings

Parameters

To make this Data Pipeline a bit more robust, we are going to leverage a few parameters.

Parameter Name

Data Type

Is Required

p_int_schedule_frequency

Int

Yes

p_string_file_system

String

Yes

p_string_directory

String

No

p_int_schedule_frequency: This required parameter is going to match the frequency (in minutes) that is set within the Schedule. This value is used to calculate the Last Modified Start Time (UTC) field within the Get Metadata activity.

A screenshot of a schedule

Description automatically generated

p_string_file_system: Required parameter used to dynamically set the Azure Data Lake Storage Gen2 (ADLS) Container that we want to trigger on new files at.

p_string_directory: This is an optional parameter specifying the path to the folder where you want to trigger on new files at. By keeping this options, we will default to an empty string, allowing us to use the ADLS Container as our source to trigger on.

A screenshot of a computer

Description automatically generated

Variables

We will leverage a few variables within this pipeline. Two are optional and used only for monitoring purposes.

Variable Name

Data Type

v_array_files

Array

v_array_final_file_list

Array

v_string_modified_datetime_start

String

v_array_files is used within the For Each activity and set to @item().name within an Append Activity. This allows us to generate an array of file names that we are ‘processing’. This mainly represents that you can iterate over the list of newly created or modified files that the Get Metadata activity returned and apply some action to it.

A screenshot of a computer

Description automatically generated

A screenshot of a computer

Description automatically generated

Currently, we cannot inspect the entirety of an array that was hydrated by an Append Activity, so to help monitor and obtain a list of file names, we can set the variable, v_array_final_file_list to @variables(‘v_array_files’) after the ForEach Loop Activity, allowing us to see this list when monitoring the pipeline run.

A screenshot of a computer

Description automatically generated

A number and numbers on a white background

Description automatically generated

v_string_modified_datetime_start is instantiated at the very beginning of the pipeline run within a Set Variable activity, allowing us to capture a specific historical datetime value and used within the Get Metadata activity.

@string(

addminutes(utcnow(),

mul(pipeline().parameters.p_int_schedule_frequency,-1)

)

)

A screenshot of a computer

Description automatically generated

A screenshot of a computer

Description automatically generated

This expression allows us to calculate a datetime in the past. We are multiplying p_int_schedule_frequency by negative 1 which allows the addminutes() function to subtract minutes which isn’t an available function on its own.

The Get Metadata Activity

A screenshot of a computer

Description automatically generated

The Get Metadata Activity is going to be the primary mechanism for us to replicate an event trigger. The “Field List” property allows for the argument, “Child items” which will return an array of files and folders when the “File path” property is a folder or container. We will also use the “Last modified” argument. This argument can be omitted but is useful when troubleshooting as it allows us to verify the last modified time for each returned child item.

The “File path” property has three sections, File system, Directory, and File name.

  1. File system is a required field and is set to the parameter: p_string_file_system
  2. Directory is the directory path to the target folder where we would want to trigger on new files being created. This value should be set to

@coalesce(pipeline().parameters.p_string_directory,”)

Coalesce allows us to specify an empty string if the parameter is empty.

  1. File name is left empty, allowing us access to the Field list argument, Child items.

Hidden within the Advanced options of the Settings tab, we will find the “Filter by last modified” property. This allows us the ability to specify a window of time for when an object was modified last, and luckily for us, this property is set at creation of the object. So, by specifying a Start time (UTC), we can tell the Get Metadata Activity to only return Child items that were created some specified time ago.

Use the expression builder to set the value of this property to: @variables(‘v_string_modified_datetime_start’)

For more information on the Get Metadata activity, please visit: Get Metadata activity – Microsoft Fabric | Microsoft Learn

YouTube Video

Follow along with this video!

 

Complete Data Pipeline JSON

{

    "name": "Advanced_Data_Pipeline_Scheduleing_EventTrigger",

    "objectId": "",

    "properties": {

        "activities": [

            {

                "name": "Get New Files",

                "type": "GetMetadata",

                "dependsOn": [

                    {

                        "activity": "v_string_modified_datetime_start",

                        "dependencyConditions": [

                            "Succeeded"

                        ]

                    }

                ],

                "policy": {

                    "timeout": "0.12:00:00",

                    "retry": 0,

                    "retryIntervalInSeconds": 30,

                    "secureOutput": false,

                    "secureInput": false

                },

                "typeProperties": {

                    "fieldList": [

                        "childItems",

                        "lastModified"

                    ],

                    "datasetSettings": {

                        "annotations": [],

                        "type": "Binary",

                        "typeProperties": {

                            "location": {

                                "type": "AzureBlobFSLocation",

                                "folderPath": {

                                    "value": "@coalesce(pipeline().parameters.p_string_directory,'')",

                                    "type": "Expression"

                                },

                                "fileSystem": {

                                    "value": "@pipeline().parameters.p_string_file_system",

                                    "type": "Expression"

                                }

                            }

                        },

                        "externalReferences": {

                            "connection": ""

                        }

                    },

                    "storeSettings": {

                        "type": "AzureBlobFSReadSettings",

                        "recursive": true,

                        "modifiedDatetimeStart": {

                            "value": "@variables('v_string_modified_datetime_start')",

                            "type": "Expression"

                        },

                        "enablePartitionDiscovery": false

                    },

                    "formatSettings": {

                        "type": "BinaryReadSettings"

                    }

                }

            },

            {

                "name": "Set v_array_final_file_list",

                "type": "SetVariable",

                "dependsOn": [

                    {

                        "activity": "For Each File",

                        "dependencyConditions": [

                            "Succeeded"

                        ]

                    }

                ],

                "policy": {

                    "secureOutput": false,

                    "secureInput": false

                },

                "typeProperties": {

                    "variableName": "v_array_final_file_list",

                    "value": {

                        "value": "@variables('v_array_files')",

                        "type": "Expression"

                    }

                }

            },

            {

                "name": "For Each File",

                "type": "ForEach",

                "dependsOn": [

                    {

                        "activity": "Get New Files",

                        "dependencyConditions": [

                            "Succeeded"

                        ]

                    }

                ],

                "typeProperties": {

                    "items": {

                        "value": "@activity('Get New Files').output.childItems",

                        "type": "Expression"

                    },

                    "batchCount": 50,

                    "activities": [

                        {

                            "name": "Hydrate v_array_files",

                            "type": "AppendVariable",

                            "dependsOn": [],

                            "typeProperties": {

                                "variableName": "v_array_files",

                                "value": {

                                    "value": "@item().name",

                                    "type": "Expression"

                                }

                            }

                        }

                    ]

                }

            },

            {

                "name": "v_string_modified_datetime_start",

                "type": "SetVariable",

                "dependsOn": [],

                "policy": {

                    "secureOutput": false,

                    "secureInput": false

                },

                "typeProperties": {

                    "variableName": "v_string_modified_datetime_start",

                    "value": {

                        "value": "@string(addminutes(utcnow(),mul(pipeline().parameters.p_int_schedule_frequency,-1)))",

                        "type": "Expression"

                    }

                }

            }

        ],

        "parameters": {

            "p_int_schedule_frequency": {

                "type": "int",

                "defaultValue": 3

            },

            "p_string_file_system": {

                "type": "string",

                "defaultValue": "landingzone"

            },

            "p_string_directory": {

                "type": "string"

            }

        },

        "variables": {

            "v_array_files": {

                "type": "Array"

            },

            "v_array_final_file_list": {

                "type": "Array"

            },

            "v_string_modified_datetime_start": {

                "type": "String"

            }

        },

        "annotations": [],

        "lastModifiedByObjectId": "",

        "lastPublishTime": ""

    }

}

Have any questions or feedback? Leave a comment below!

Related blog posts

Fabric Data Pipelines – Advanced Scheduling Techniques (Part 1)

September 10, 2024 by Jianlei Shen

Fabric Lakehouse supports the creation of custom schemas. Schemas allow users to group tables together for better data discovery, access control, and more. This is now a Preview feature in Fabric. Learn more here. We are excited to announce the latest enhancement in Fabric Data Factory that Lakehouse connector in data pipeline now supports schema. … Continue reading “Introducing the New Feature in Lakehouse Connector in Fabric Data Factory: Schema Support for Reading and Writing Data”

September 9, 2024 by Jianlei Shen

We are thrilled to announce the release of a highly anticipated feature in Fabric Data Factory: storage integration support for the Snowflake connector in data pipeline. This new capability enhances the security of your data pipelines, enabling seamless and secure integration between Snowflake and external cloud storage providers. What is Storage Integration? Storage integration in … Continue reading “Announcing New Storage Integration Support in Snowflake Connector for Fabric Data Factory”