Microsoft Fabric Updates Blog

Data Pipeline Performance Improvements Part 1: How to convert a time interval (dd.hh:mm:ss) into seconds

Series Overview

Welcome to this short series where we’ll be discussing the technical methods used to improve Data Pipeline Copy activity performance through parallelization by logically partitioning any source.

Often, we see solutions leveraging a single Copy Activity to move large volumes of data. While this works great, you might face a scenario where you need to improve the performance by reducing the time it takes to move data into your Fabric Lakehouse.

To improve performance, instead of using a single Copy Activity to move a large volume of data, we can have multiple Copy Activities moving smaller volumes in parallel. It doesn’t matter if the source is a REST API, Blob Storage, or a Transactional Database. In many cases, we can logically partition the source data into buckets then copy each bucket over to the destination.

An important tip to remember is for you, as the engineer, to take the time and understand your source and destination. You should know the maximum number of concurrent connections and other factors, then design your solution to utilize them to your advantage.

In this series, I will extend upon my previous blog post on ingesting files into a Lakehouse from a REST API with pagination ft. AVEVA Data Hub. There, we covered how to move parquet files from a REST API to Microsoft Fabric Lakehouse in a semi-single threaded method given these parameters:

  • StartIndex – start datetime
  • EndIndex – end datetime
  • Interval – Granularity of data, if interval is set to 1 minute, if provided a Start and End index for a 1-hour span, we will get back 60 records
  • Pagecount – number of records per file (limited by the source)

In the current design, the pipeline will take a considerable amount of time using a single Copy Activity if we provide a StartIndex and EndIndex spanning multiple years with a small Interval and Pagecount.

We can improve performance by creating many sub-time ranges based on the StartIndex, EndIndex, and Interval, then for each sub-time range, we call a child pipeline containing a Copy Activity, allowing multiple Copy Activities to execute at the same time, all handling a subset of data. This method can also be extended to any scenario when provided with some boundary condition. To provide some context, I’ve used this on a large SQL Table that had a datetime column by taking the Min and Max date then creating sub-time ranges. Using a ForEach Activity (Sequential = False, Batch Count = 50) and iterating over each range, I was able to execute many copy activities in parallel, taking processing time down from 6.5 hours to under 8 minutes.

The first two parts of this series are designed to ease you into two technical processes while the third and final part will bring everything together. By the end of this series, we will cover all of the tips and tricks needed to achieve these performance gains.

Part 1: How to convert a time interval (dd.hh:mm:ss) into seconds

Scenario

We have a Data Pipeline that has an Interval parameter (string) being passed in. This Interval is used to determine a future date and comes into the pipeline as dd.hh:mm:ss (e.g. 01.12:05:02). For this example, we need to add 1 day, 12 hours, 5 minutes, and 2 seconds to some date value we have.

Within the Data Pipeline Expression Builder, we have access to

  • addToTime()
  • adddays()
  • addseconds()
  • addminutes()
  • getFutureDate()

Because our smallest increment is Seconds, we will be leveraging the addseconds() function.

The parameter Interval is of type String, allowing us to parse the value into segments. We want to isolate each segment (days, hours, minutes, seconds), convert to seconds, then aggregate out results that we can leverage within the addseconds() function.

Using the split() function, we can split the string into an array given some delimiter.

For 01.12:05:02, if we use split(‘01.12:05:02’, ‘:’), our result is going to be an array: [01.12, 05, 02]

Notice that the Day and Hour parts both fell into the same index because the delimiter differs.

To capture only the Day part, we can nest the original split function inside another split function and reference a specific index of the array returned:

split(split(pipeline().parameters.interval, ‘:’)[0], ‘.’)

Returning only the Day part: 01.

Another challenge is the leading 0 as we will need the values to be of type integers.

Using the function, startswith() we can nest this logic inside an if() condition function that will check if the value starts with 0. If it does, then we will leverage the substring() function to return only the second index (non-zero) of that string.

@if(

startswith(

split(split(pipeline().parameters.interval, ':')[0], '.')[0]

, '0')

, substring(

split(split(pipeline().parameters.interval, ':')[0], '.')[0]

, 1

, 1

),

split(split(pipeline().parameters.interval, ':')[0], '.')[0]

)

Given: pipeline().parameters.interval = 01.12:05:02

The Output: 1

Now that we know how to parse the Interval into the respective parts, we want to take each part and multiple by the number of seconds within each.

1 Day = 86400 Seconds

1 Hour = 3600 Seconds

1 Minute = 60 Seconds

1 Second = 1 Second

We can use the mul() function to multiply the output to the number of seconds. Mul() requires numeric values as arguments, so we need to cast the result of our existing expression to an integer using the int() function.

@mul(int(if(

startswith(

split(split(pipeline().parameters.interval, ':')[0], '.')[0]

, '0')

, substring(

split(split(pipeline().parameters.interval, ':')[0], '.')[0]

, 1

, 1

),

split(split(pipeline().parameters.interval, ':')[0], '.')[0]

)),86400)

Given: pipeline().parameters.interval = 01.12:05:02

The Output: 86400

Next, we need to aggregate all these parts into a single number. To do this we can use the add() function. This additive function can only accept two arguments, so again we will need to nest this function into itself.

add(dd,

add(hh,

add(mm,ss)

 )

)

 Bringing everything together, our final expression is:

@string(add(mul(int(if(

startswith(

split(split(pipeline().parameters.interval, ':')[0], '.')[0]

, '0')

, substring(

split(split(pipeline().parameters.interval, ':')[0], '.')[0]

, 1

, 1

),

split(split(pipeline().parameters.interval, ':')[0], '.')[0]

)),86400)

,add(mul(int(if(

startswith(

split(split(pipeline().parameters.interval, ':')[0], '.')[1]

, '0')

, substring(

split(split(pipeline().parameters.interval, ':')[0], '.')[1]

, 1

, 1

),

split(split(pipeline().parameters.interval, ':')[0], '.')[1]

)), 3600)

,add(mul(int(if(

startswith(

split(pipeline().parameters.interval, ':')[1]

, '0')

, substring(

split(pipeline().parameters.interval, ':')[1]

, 1

, 1

),

split(pipeline().parameters.interval, ':')[1]

)), 60)

,int(if(

startswith(

split(pipeline().parameters.interval, ':')[2]

, '0')

, substring(

split(pipeline().parameters.interval, ':')[2]

, 1

, 1

),

split(pipeline().parameters.interval, ':')[2]

))

))))

Given: pipeline().parameters.interval = 01.12:05:02

The Output: 129902

Example Run

In this example, we are converting the Interval parameter into seconds and using addSeconds() function to calculate a new date.

Set Variable Activity: Update StartIndex

@string(addSeconds(pipeline().parameters.StartIndex, int(variables(‘ParseInterval’))))

A screenshot of a computer

Description automatically generated

This concludes Part 1 of our series. In Part 2, we will go over how to create an Array of JSON objects. Then finally, in Part 3, we will tie everything together into a cohesive end-to-end solution.

As always, please feel free to leave questions and comments below. If there is a particular Data Pipeline scenario that you would like to see covered, please let me know in the comments. Thank you for reading!

منشورات المدونات ذات الصلة

Data Pipeline Performance Improvements Part 1: How to convert a time interval (dd.hh:mm:ss) into seconds

أكتوبر 29, 2024 بواسطة Leo Li

We’re excited to announce several powerful updates to the Virtual Network (VNET) Data Gateway, designed to further enhance performance and improve the overall user experience. These new features allow users to better manage increasing workloads, perform complex data transformations, and simplify log management. Expanded Cluster Size from 5 to 7 One of the key improvements … Continue reading “New Features and Enhancements for Virtual Network Data Gateway”

أكتوبر 28, 2024 بواسطة Estera Kot

We’re thrilled to announce that the Native Execution Engine is now available at no additional cost, unlocking next-level performance and efficiency for your workloads. What’s New?  The Native Execution Engine now supports Fabric Runtime 1.3, which includes Apache Spark 3.5 and Delta Lake 3.2. This upgrade enhances Microsoft Fabric’s Data Engineering and Data Science workflows, … Continue reading “Native Execution Engine available at no additional cost!”