Microsoft Fabric Updates Blog

Data Pipelines Tutorial: Ingest files into a Lakehouse from a REST API with pagination ft. AVEVA Data Hub

Contents

Scenario overview

In this blog, we will act in the persona of an AVEVA customer who needs to retrieve operations data from AVEVA Data Hub into a Microsoft Fabric Lakehouse.

Note: While this scenario is using AVEVA Data Hub, the concepts translate to interacting with any REST API that uses pagination.

Considerations

  1. As an AVEVA customer, we can obtain a Client ID and Client Secret from AVEVA Data Hub. Our Data Pipeline will use these credentials to issue a request in order to obtain a Bearer Token that allows access to the AVEVA Data Hub where our target data lives.
  2. The REST API paginates the results and returns a subset of the target data files. This pagination is managed by a custom header (Next-Page), indicating the URL for the next subset of data. In turn, this URL will return yet another header until we have pulled over all of the data. At which point, this header will no longer exist in the response header.
  3. The Bearer Token can expire prior to retrieving all of the files.
  4. Based on the URL’s Relative Path, we must define the Lakehouse file path and name.
  5. We want to maximize concurrent parallel Copy Activities to improve performance.
  6. We want to make this Pipeline highly parameterized, allowing us to specify:
    • Credentials
    • Target Files
    • Data Range of the data we want
    • Interval determining how granular that data is
    • Page Count to indicate how many records each Next-Page should contain
  7. Our destination is the Fabric Lakehouse file. We want to control the Folder path as well as the file name.

Design Pattern

Complete architecture of the design pattern for retrieving files from a REST API and landing them into Microsoft Fabric]
Complete architecture of the design pattern for retrieving files from a REST API and landing them into Microsoft Fabric.

Pipeline Parameters and Variables

Parameters

Parameters allow the user to specify multiple conditions such as the AVEVA Data Hub data view, dataset time range, interval of records, file format returned, and number of records each request can return.

List of Data Pipeline Parameters
List of Data Pipeline Parameters.

Variables

Variables are used to manage properties that need to change throughout the execution of the Data Pipeline.

List of Data Pipeline Variables
List of Data Pipeline Variables.

Obtain a Bearer Token

Creating Web v2 Connection

Before we can connect to the AVEVA Data Hub data view REST endpoint and retrieve data as a parquet file, we must obtain a Bearer Token to use within the Copy activity source.

Detailed view of the settings for the Web Activity: Get Bearer Token
Detailed view of the settings for the Web activity: Get Bearer Token.

Using Web activity, we can create a new Web v2 connection to the REST API endpoint.

A screenshot of a computer

Description automatically generated with medium confidence
The Web v2 Connection settings.

NOTE: In the future, we will be able to specify OAuth2.0 as an Authentication Method, which will allow us to securely store the Client ID and Client Secret obtained from AVEVA Data Hub.

Configuring the Web activity

  1. Relative URL: specify the to the token endpoint,
  2. Method: POST
  3. Body: pass in the Client ID, Client Secret, and Grant Type
    • NOTE: This must match the format you specify in your Headers
  4. Headers
    • Name: Content-type
    • Value: application/x-www-form-urlencoded

A screenshot of a computer

Description automatically generated with medium confidence
Zoomed in view of the setting for the Web activity Get Bearer Token.
@concat('grant_type=',pipeline().parameters.grant_type,'&client_id=',pipeline().parameters.client_id,'&client_secret=',uriComponent(pipeline().parameters.client_secret))

Web activity: Output


The output of the Web activity contains multiple fields that are important to us later on in the Data Pipeline. The most important is the access_token, which will be used in the Copy activity and proceeding Web Activities as part of the Header. Expires_in tells us how long the access_token will be valid for. If we are doing a historical load with many paginations, there is risk for our process to allow this token to expire. We will discuss next how we can mitigate this risk. Token_type lets us know we are leveraging a Bearer Token.

A picture containing text, screenshot, font

Description automatically generated
Example Output from the Web activity Get Bearer Token.

Setting the Bearer Token from the Web activity to a variable for repeated use

We will be referring to this auth token throughout the pipeline, and for easy reusability, we want to capture this token and assign it to a pipeline variable.

Initializing the BearerToken variable
Initializing the BearerToken variable.
@concat('Authorization: Bearer ',activity('Get Bearer Token').output.access_token)

NOTE: In the future, we will have the capability to specify Secure Input/Output of the Set Variable activity, hiding sensitive information.

NOTE: The Copy and Web activity treats Headers differently, so we have to do some string manipulation to fit for each activity. In this case, we want to remove “Authorization: ” within the Web activity because we provide that in the Name field.

Web activityA screenshot of a computer

Description automatically generated with medium confidence.
Copy activity

Handling the Bearer Token’s Expiration Date

The Bearer Token expires after a certain amount of time. In this scenario, the Web activity that returns the Bearer Token includes an additional field, expires_in which specifies how many seconds this token is valid for. Our process might run longer than this timeframe and we want to prevent losing access and causing the Copy activity to fail. To handle this, we want to create a variable that will hold the future date of when the token expires. During our data loading stage (Until Loop), we will compare this date with the current date and time to determine if we need to generate a new token.

A screenshot of a computer

Description automatically generated
Initializing TokenExpires_in variable to determine future date when we should request a new token.

To get the future date, we can create a variable called TokenExpires_In and set the value to:

@string(ticks(addseconds(utcnow(),sub(int(activity(‘Get Bearer Token’).output.expires_in),300))))

Ticks: Return the ticks property value for a specified timestamp. A tick is a 100-nanosecond interval.

NOTE: We are allowing for a buffer of 300 seconds (5 minutes).

Within the Data Loading Stage (Until activity) of our Pipeline, we will start each iteration off with an If Condition activity to check if the current time is greater than or equal to the time our token will expire. If it is, then we will resubmit the web request to obtain a new token and update our BearerToken variable.

A screenshot of a computer

Description automatically generated with medium confidence
If Condition to check at the start of each loop if we should update token.
@greaterOrEquals(ticks(utcnow()), int(variables('TokenExpires_in')))

NOTE: Variable TokenExpires_in is described above.

Pagination

The AVEVA Data Hub REST API uses the concept of Pagination. This means that each web request will return only a subset of the payload along with a custom header, Next-Page, containing the endpoint to the next subset of data.

In Microsoft Fabric preview, native pagination is supported by a Copy activity only when your source is using the Connection Type of REST. However, we are unable to use this REST option because this is expecting a JSON object to be returned. In our scenario, we are requesting to pull back a parquet file. Luckily, the other Connection Type available to us is HTTP which allows for File Format to be specified.

With some creative Pipeline design, we can mimic the functionality of the native REST pagination by nesting our data movement logic inside an Until activity. The idea is that we can pass in the current page into the Copy activity, then proceeding the Copy activity, we can use the Web activity to capture the Next-Page. We have to use the Web activity because in the current state, the Copy activity doesn’t expose the response’s custom headers. This process continues to be repeated until the Web activity does not return a Next-Page header, at which time, breaking out of the loop and completing the process.

Establishing the First Page’s Endpoint

Prior to interacting with the next page, we must start with the initial endpoint, or first page.

To do this, we are going to leverage the variable: next-page. As the name might imply, we will reuse this variable to hold the relative path for the next page, obtained downstream in the pipeline. However, the initial Copy activity will leverage the value defined from this Set Variable activity: Set First-Page.

A screenshot of a computer

Description automatically generated
Setting the next-page variable to the first page’s endpoint.
@concat('/api/v1/Tenants/',pipeline().parameters.Tenants,'/Namespaces/',pipeline().parameters.Namespaces,'/DataViews/',pipeline().parameters.DataViews,'/Data/Interpolated?startIndex=',pipeline().parameters.startIndex,'&endIndex=',pipeline().parameters.endIndex,'&interval=',pipeline().parameters.interval,'&form=',pipeline().parameters.form)

Until activity’s Logic

The Until activity allows us to continuously submit requests to the REST API until some condition is met. This condition is when the request no longer returns the Next-Page custom header. Once this header doesn’t exist anymore, we can set the next-page variable to an empty string. As we covered above, the variable next-page is set to the initial endpoint. Both of these situations allow us to specify the Until activity’s conditional expression to be based on whether or not the next-page variable is empty.

A screenshot of a computer

Description automatically generated
The Until activity encapsulates the core functionality for pagination.

Peek inside the Until activity

As seen below, a lot of logic is nested within the Until activity.


In summary

  1. Refresh the Bearer Token if close to expiring (covered earlier)
  2. Invoke the DataLoader child pipeline (contains the Copy activity)
    • If this fails, set the next-page variable to an empty string, breaking out of the Until Loop activity
  3. Issue a web request to the same endpoint that we just copied data from in order to capture the Next-Page custom header
    • If this fails, set the next-page variable to an empty string, breaking out of the Until Loop activity
  4. Update the next-page variable with the new value
    • [Important] If this fails, that means there are no more pages left and we can set the next-page variable to an empty string, breaking out of the Until Loop activity. Failure here is expected because currently there isn’t a method to check if a field exists.
  5. Increment the page count and set to the temppagecount variable
  6. Set the pagecount variable to the value of temppagecount
A screenshot of a computer

Description automatically generated with medium confidence
Inside the Until activity.

Obtaining the Next-Page Custom Header Value

In order to get the Next-Page Custom Header, we have to use a Web activity since the Copy activity does not output the request response in the current state. The activity Output of the Web activity: Get Next Page is referenced in the Set Variable: Set Next Page.

A screenshot of a computer

Description automatically generated with medium confidence
Using a Web activity to capture the Next-page custom header after loading the previous page

Removing the Base URL from the Next-Page Custom Header and Inserting Count Value

The Next-Page Custom Header is a complete URL. Since our Copy activity and Web Activities use the Web and Web v2 connectors, we need to remove the base URL and set the next-page variable to the URL’s Relative Path only. Also, we need to add the logic to replace the default &count= with the value of the parameter pagecount.

A screenshot of a computer

Description automatically generated with medium confidence
Storing the customer header, Next-Page, in a variable and using string manipulation to edit.
@replace(replace(activity(‘Get Next Page’).output.ADFWebActivityResponseHeaders[‘Next-Page’], substring(activity(‘Get Next Page’).output.ADFWebActivityResponseHeaders[‘Next-Page’],0,add(indexOf(activity(‘Get Next Page’).output.ADFWebActivityResponseHeaders[‘Next-Page’],’.com’ ),length(‘.com’) )),’’), substring(activity(‘Get Next Page’).output.ADFWebActivityResponseHeaders[‘Next-Page’], lastindexof(activity(‘Get Next Page’).output.ADFWebActivityResponseHeaders[‘Next-Page’],’&count=’),
sub(
length(activity(‘Get Next Page’).output.ADFWebActivityResponseHeaders[‘Next-Page’]),
lastindexof(activity(‘Get Next Page’).output.ADFWebActivityResponseHeaders[‘Next-Page’], ‘&count=’)
)),
concat(‘&count=’,pipeline().parameters.pagecount)
)

Breaking Out of the Until Loop

The AVEVA Data Hub REST API will no longer respond with the custom header, next-page, when we have reached the end of the requested pull. Currently, using the expression builder, we cannot check if a field exists prior to referencing it and when you try to reference a field that doesn’t exist, the activity will fail. However, this can be used as a signal and we can use the activity dependency: On Failure, to route the pipeline’s flow to a Set Variable activity, setting the next-page variable to an empty string, thus breaking out of the loop.

A screenshot of a computer

Description automatically generated
On Failure is expected when the customer header, Next-Page, no longer exists, allowing us to set the next-page variable to empty string, which is what the Until loop is evaluating for.

Designing Parallel Copy activities

The Until activity is a sequential iteration process. To facilitate some mechanism of concurrency within this activity, we can leverage the Invoke Pipeline activity with the Wait on Completion option disabled. This allows us to send the longer running processes to another pipeline run while instantly moving on to proceeding activities. Since the Until loop will cycle faster than this child pipeline will finish, we will have multiple child pipelines running Copy activities simultaneously, allowing for faster overall throughput.

A screenshot of a computer

Description automatically generated
The Invoke Pipeline activity allows us to contain the data movement logic into another pipeline, enabling concurrent execution.
A screenshot of a computer

Description automatically generated with medium confidence
The child pipeline that handles the data movement logic along with the file path and name logic.

The Copy activity

The source of the Copy activity is using a Web Connector with the Connection Type of HTTP which allows us to specify the File Format. For the most optimal performance, we can use the File Format of Binary. This also allows various file formats to be specified in the parameter form, allowing us to determine which file format we want at execution.

A screenshot of a computer

Description automatically generated
The data movement Copy activity settings, highlighting the File Format of Binary and dynamic content using the parameters passed in.

For the Destination we are leveraging the built in integration with our Workspace to push these files to a Lakehouse Files destination. This will allow for the most optimal data loading while also preventing the need to maintain Schema Mapping which can be done via another downstream process.

The destination of the Copy activity uses the variables to specify the file path and name.

Creating a Custom Counter as a file name suffix

In Fabric Data Pipelines, as well as in Azure Data Factory and Azure Synapse Analytics Pipelines, we are unable to reference a variable within the same Set Variable activity where you are setting that variable.

If we try, we will receive the follow error:

The expression contains self referencing variable. A variable cannot reference itself in the expression.

As a workaround, we have to create a secondary variable (temp) and use a Set Variable for this temp variable using the primary variable in the expression. Then following this Set Variable Temp, create another Set Variable activity and set our primary variable to the value of the temp variable.

In our scenario, our primary variable is pagecount, and our secondary temp variable is temppagecount.

At the start of our Data Pipeline, we want to initialize our pagecount variable to 0 because we are passing this variable into the Invoke Pipeline as a parameter.

A screenshot of a computer

Description automatically generated
In the main pipeline, we initialize the pagecount variable to 0, starting the counter.

Inside the Until activity, after the Invoke Pipeline activity, we want to add 1 to the pagecount variable. To do this, we are going to create the Set Variable: Increment Page Count which will add 1 to the current value of the pagecount variable and set this value to the variable, temppagecount.

A screenshot of a computer

Description automatically generated
Inside the Until activity, after the data movement and next page logic, we increment the temppagecount variable.

Finally, we update the current value of pagecount to our newly incremented temppagecount value.

A screenshot of a computer

Description automatically generated with medium confidence
Setting the Nextpagecount variable to the temppagecount variable’s value, for use in the next iteration.

Parsing the Relative URL to create Folder Path and File Name

In the child pipeline where we issue the Copy Command, we are also going to create the File Path and File Name used in the Destination by using string manipulation of the relative path.

Relative Path Example:

/api/v1/Tenants/<>/Namespaces/<>/DataViews/Emissions/Data/Interpolated?startIndex=2021-06-23T05:00:00Z&endIndex=2023-06-23T05:00:00Z&interval=00.00:01:00&form=parquet&continuationtoken=<>&count=100000

The File Path Format: Dataview/YYYY/MM/DD

The File Name Format : Dataview-YYYY-MM-DD-StartIndex-EndIndex-PageCount.form

File Path

A screenshot of a computer

Description automatically generated with medium confidence
Using the expression language to create the value for the FilePath variable.
@decodeUriComponent(concat(substring(pipeline().parameters.RelativeURL,

add(indexOf(pipeline().parameters.RelativeURL,'/DataViews/' ), length('/DataViews/')),

sub(indexOf(pipeline().parameters.RelativeURL,'/Data/'), add(indexOf(pipeline().parameters.RelativeURL,'/DataViews/' ),

length('/DataViews/')))),'/', replace(split(substring(pipeline().parameters.RelativeURL,

add(indexOf(pipeline().parameters.RelativeURL,'?startIndex=' ),length('?startIndex=')),

sub(indexOf(pipeline().parameters.RelativeURL,'&endIndex='),

add(indexOf(pipeline().parameters.RelativeURL,'?startIndex=' ),

length('?startIndex=')))), 'T')[0], '-', '/')))

File Name

A screenshot of a computer

Description automatically generated with medium confidence
Using dynamic expressions to set the variable, FileName, by manipulating the RelativeURL parameter.
@replace(decodeUriComponent(concat(substring(pipeline().parameters.RelativeURL,

add(indexOf(pipeline().parameters.RelativeURL,'/DataViews/' ),

length('/DataViews/')), sub(indexOf(pipeline().parameters.RelativeURL,'/Data/'),

add(indexOf(pipeline().parameters.RelativeURL,'/DataViews/' ), 

length('/DataViews/')))), '-', substring(pipeline().parameters.RelativeURL,

add(indexOf(pipeline().parameters.RelativeURL,'?startIndex=' ), 

length('startIndex=')),sub(indexOf(pipeline().parameters.RelativeURL,'&endIndex='), 

add(indexOf(pipeline().parameters.RelativeURL,'?startIndex=' ),

length('?startIndex=')))), '-', substring(pipeline().parameters.RelativeURL, 

add(indexOf(pipeline().parameters.RelativeURL,'&endIndex=' ),

length('&endIndex=')), 

sub(indexOf(pipeline().parameters.RelativeURL,'&interval='),

add(indexOf(pipeline().parameters.RelativeURL,'&endIndex=' ), 

length('&endIndex=')))), '-', pipeline().parameters.pagecount, '.parquet')),' ', '-')

Conclusion

After running this pipeline, we will see the parquet files within our Lakehouse Files, given the file path and name that we specified. Now that we have landed these files from the AVEVA Data Hub into the Lakehouse Files, we can leverage Data pipelines, Dataflows Gen2, or Notebooks to manipulate and move these files to either Lakehouse Tables, KQL Database, or the Warehouse.

A screenshot of a computer

Description automatically generated with low confidence
Parquet files uploaded to the Lakehouse Files.

Additional Resources

Have any questions or feedback? Leave a comment below!

Postagens relacionadas em blogs

Data Pipelines Tutorial: Ingest files into a Lakehouse from a REST API with pagination ft. AVEVA Data Hub

outubro 29, 2024 de Leo Li

We’re excited to announce several powerful updates to the Virtual Network (VNET) Data Gateway, designed to further enhance performance and improve the overall user experience. These new features allow users to better manage increasing workloads, perform complex data transformations, and simplify log management. Expanded Cluster Size from 5 to 7 One of the key improvements … Continue reading “New Features and Enhancements for Virtual Network Data Gateway”

outubro 28, 2024 de Estera Kot

We’re thrilled to announce that the Native Execution Engine is now available at no additional cost, unlocking next-level performance and efficiency for your workloads. What’s New?  The Native Execution Engine now supports Fabric Runtime 1.3, which includes Apache Spark 3.5 and Delta Lake 3.2. This upgrade enhances Microsoft Fabric’s Data Engineering and Data Science workflows, … Continue reading “Native Execution Engine available at no additional cost!”