AI Ready Apps: build RAG Data pipeline from Azure Blob Storage to SQL Database in Microsoft Fabric within minutes
Microsoft Fabric is a unified, secure, and user-friendly data platform equipped with features necessary for developing enterprise-grade applications with minimal or no coding required. Last year, the platform was enhanced by introducing SQL Database in Fabric, facilitating AI application development within Microsoft Fabric. In a previous blog post, we discussed how to build a chatbot using SQL Database in Fabric. This blog post will provide step-by-step instructions for creating a Retrieval Augmented Generation (RAG) pipeline to prepare your data for AI integration using SQL Database in Microsoft Fabric.
For this blog post, we will consider a use case involving Contoso Recruiting Agency, which aims to make more sense of the stored candidates’ resumes by creating a searchable database of candidates to match specific skill requirements.
RAG pipeline activity workflow
Each recruiter uploads the resume (in PDF format) to their designated folder in Azure Blob storage. Whenever a file is uploaded, the RAG pipeline is triggered. The pipeline processes the file by extracting the text content, chunking the text, redacting any Personally Identifiable Information (PII), generating embeddings, and finally storing the embeddings in a SQL database within Fabric as a vector store. These stored embeddings can subsequently be utilized to develop AI applications such as smart search, recommendation engines, chatbots, and other tools.
Prerequisite
This blog post requires users to bring their own key (BYOK) for AI services, which also means creating these services outside of the Microsoft Fabric platform.
- Download Git and clone the rag-pipeline repository.
- Azure subscription: Create a free account.
- Microsoft Fabric subscription: Create a free trial account.
- Azure OpenAI access: Apply for access in the desired Azure subscription as needed.
- Azure OpenAI resource: Deploy an embedding model (e.g. text-emebdding-3-small).
- Azure AI multi-service resource, specifically, we will be using Document Intelligence and Azure Language Services from this resource.
- Azure Portal: Create a storage account and assign storage blob data contributor role.
- Optionally, download Azure Storage Explorer to manage the storage account from your desktop.
- Optionally, download Visual Studio Code for free and install Azure Functions Core Tools if you plan to edit User Data Functions using Visual Studio Code.
Dataset
Considering the file formats supported by the Document Intelligence Service we utilize the PDF files from Resume Dataset from Kaggle.
Steps
- Create a workspace named “IntelligentApp”.
- Create a Lakehouse named “blob_filestorage”.
- Create SQL Database in Fabric named “datamart”.
- Navigate to the workspace “IntelligentApp”, click Import – Notebook – From this computer and then import the notebook “cp_azblob_lakehouse.ipynb” from the cloned repository’s notebook folder.
Import a Notebook
5. Attach the Lakehouse “blob_filestorage” to “cp_azblob_lakehouse” Notebook.
-
- Open the notebook, on the explorer click ‘Add sources’.
- Select, ‘Existing data sources’.
- Select, “blob_filestorage” from OneLake catalog and then click ‘Connect’.
Attach a Lakehouse to the Notebook
6. Create User Data Function,
-
- Navigate to the “IntelligentApp” workspace and click “+New item”
- Search for “Function” and Select “User data function(s)”
- Provide the name “file_processor”.
- Click “New function”.
- Add Lakehouse and SQL Database as managed connection(s).
- On the Functions Explorer screen click “Manage Connections” and then select “+Add Data connection“.
- From the “OneLake catalog” select datamart (SQL database) and then click “Connect”.
- Repeat the previous step to add blob_filestorage (Lakehouse) as a managed connection.
- Click “Library management“, add the following dependencies (Click “+ Add from PyPI” to add the dependencies). The dependencies are also listed in “/functions/requirements.txt” file of the cloned repository. Ensure you are using fabric-user-data-functions version 0.2.28rc0 or higher.
Add User Data Function dependencies
-
- In the function editor, replace existing content with the contents of “function\function_app.py” from the cloned repository.
- Click “Publish” (on the top right) to deploy the function. After the functions are deployed, click “Refresh”.
7. Create a Data pipeline by navigating to the workspace and then clicking on “+ New Item”
-
- Search and select “Data pipeline”.
- Provide the name “blob_ingest_pipeline”.
8. Create a data pipeline storage trigger by clicking “Add trigger (preview)” button and provide the following configuration:
-
- Source: Select “Azure Blob Storage events”
- Storage account: “Connect to existing Azure Blob Storage account”
- Subscription: Select your Azure subscription
- Azure Blob Storage account: Select the blob storage account under your subscription.
- Eventstream name: blob_ingest_stream
Create Event stream
Click “Next”, to configure the event type and source
- Event Type(s): Select only the Microsoft.Storage.BlobCreated event. This will ensure that an event is generated each time a new blob object is uploaded.
Click “Next” to review the configuration. Then, click “Connect” to connect to the blob storage. A successful connection will be indicated by the status “Successful”. Finally, click “Save“.
On the Set alert screen, under Save location, configure the following settings.
- Select, “Create a new item”.
- New item name: blob_activator.
Click “Create” to create and save the alert.
Create Fabric Activator
Now that we have setup the stream, it’s time to define the “blob_ingest_pipeline”.
Pipeline definition
Pipeline can be defined in two ways as outlined below,
Import template
Templates offer a quick way to begin building data pipelines. Importing a template brings in all required activities for orchestrating a pipeline.
To import a template:
- Navigate to the Home menu of the data pipeline.
- Click, “Use a template”
- From the “Pipeline templates” page click “Import template”
- Import the file “template/ AI-Develop RAG pipeline using SQL database in Fabric.zip” from the cloned repository.
Import Data pipeline from template
The imported data pipeline is preloaded with all necessary activities, variables, and connectors required for end-to-end orchestration.
Important!!!
If you are using a template, there is no need to manually add a variable or an activity. Instead, you can proceed directly to configuring values for the variables and each activity parameter in the pipeline, as detailed in the Blank Canvas section.
Blank Canvas
1. Establish pipeline variables: Click on the pipeline canvas, select the “Variables” menu, and then click “+ New” to add and configure values for the following variables:
Configure pipeline variables
Name |
Type |
Value |
Comment |
fileName | String | @pipeline()?.TriggerEvent?.FileName | |
container | String | @pipeline()?.TriggerEvent?.FolderPath | |
source | String | @pipeline()?.TriggerEvent?.Source | |
cognitiveServiceEndpoint | String | <<YOUR-MULTI-SERVICE-ACCOUNT-ENDPOINT>> | Replace<< YOUR-MULTI-SERVICE-ACCOUNT-ENDPOINT>> with the cognitive Service Endpoint. For example, https://myservices.cognitiveservices.azure.com/ |
apiKey | String | <<YOUR-MULTI-SERVICE-ACCOUNT-APIKEY>> | Replace <<YOUR-MULTI-SERVICE-ACCOUNT-APIKEY with the apikey of your multi-service account |
openAIEndpoint | String | <<YOUR-OPENAI-SERVICE-ENDPOINT >> | Replace <<YOUR-OPENAI-SERVICE-ENDPOINT>> with the endpoint of your Azure OpenAI Account. For example, https://myopenaiservice.openai.azure.com/ |
openAIKey | String | <<YOUR-OPENAI-APIKEY>> | Replace <<YOUR-OPENAI-APIKE>> with the apikey of your Azure OpenAI Account |
embeddingModel | String | text-embedding-3-small | |
recepientEmailAddress | String | <<to-email-address>> | recipients email address |
senderEmailAddress | String | <<from-email-address>> | sender’s email address |
Get multi-services account endpoint
2. Add a “Notebook” activity. The notebook associated with this activity utilizes NotebookUtils to manage file system. During the execution of the notebook, a folder corresponding to the container name will be created if it does not exist. Subsequently, the file will be copied from Azure Blob Storage to the Lakehouse folder. Configure this activity as outlined below:
-
- General tab,
- Name: azureblob_to_lakehouse
- Settings tab,
- Notebook: cp_azblob_lakehouse
- General tab,
Base parameters
Click “+New” to add the following parameters,
-
-
- Name: fileName
- Type: String
- Value:
@variables('fileName')
- Name: container
- Type: String
- Value:
@variables('container')
- Name: source
- Type: String
- Value:
@variables('source')
- Name: fileName
-
Use On Success connector of the activity to link to the subsequent function (Extract Text) activity.
3. Add a “Functions activity”. The function “extract_text” associated with this activity uses Azure AI Document Intelligence service to extract the “text” content from the file copied into the Lakehouse by the previous activity. Configure this activity as outlined below:
-
- General tab,
- Name: Extract Text
- Settings tab,
- Type: Fabric user data functions
- Connection: Sign-in (if not already) using your workspace credentials.
- Workspace: IntelligentApp (default selected)
- User data functions: file_processor
- Function: extract_text
- General tab,
Parameters,
-
-
-
- Name: filePath
- Type: str
- Value:
@activity('azureblob_to_lakehouse').output.result.exitValue
- Name: cognitiveServicesEndpoint
- Type: str
- Value:
@variables('cognitiveServiceEndpoint')
- Name: apiKey
- Type: str
- Value:
@variables('apiKey')
- Name: filePath
-
-
Use On Completion connector of the activity to link to the subsequent If Condition (Text Extraction Results) activity.
4. Add an “If Conditions” activity to verify the success of the text extraction in the previous step. If the extraction was unsuccessful, an email would be sent to the configured recipient, and the pipeline would be terminated. Configure this activity as outlined below,
-
- General tab,
- Name: Text Extraction Results
- Activities tab,
- Expression:
@empty(activity('Extract Text').error)
- Case: False, edit the “false” condition using the edit (pencil) icon, and add the following activities,
- Expression:
- General tab,
“Office 365 Outlook” activity: To send alert emails.
-
- General Tab,
- Name: Text Extraction Failure Email Alert
- Settings Tab,
- Signed in as: Sign- in (if not already) using the credentials as those of your workspace.
- To:
@variables('recepientEmailAddress')
- Subject: Text Extraction Error
- Body:
<p>@{replace(string(activity('Extract Text').error.message), '\','')}</p>
- General Tab,
Advanced,
-
-
- From:
@variables('senderEmailAddress')
- Importance: High
- From:
-
Use On Success connector of the activity to link to the subsequent Fail activity.
“Fail” activity: To terminate the pipeline
-
- General tab,
- Name: Text Extraction Process Failure
- Settings tab,
- Fail message:
@{replace(string(activity('Extract Text').error), '\','')}
- Error code:
@{activity('Extract Text').statuscode}
- Fail message:
- General tab,
Return to the main canvas by clicking the pipeline name (blob_ingest_pipeline) and use the On Success connector of the “If Condition” activity to link to the subsequent Function (generate chunks) activity.
5. Add a “Function” activity. The function “chunk_text” associated with this activity uses the tiktoken tokenizer to “generate chunks” for the text extracted by the previous activity. Configure this activity as outlined below:
-
- General tab,
- Name: Generate Chunks
- Settings tab,
- Type: Fabric user data functions
- Connection: If not already, sign in using the credentials that have complete access to your workspace.
- Workspace: IntelligentApp (default selected)
- User data functions: file_processor
- Function: chunk_text
- General tab,
Parameters,
-
-
-
- Name: text
- Type: str
- Value:
@activity('Extract Text').output.output
- Name: maxToken
- Type: int
- Value:
500
- Name: encoding
- Type: str
- Value:
cl100k_base
- Name: text
-
-
Use On Success connector of the activity to link to the subsequent Function (Redact PII Data) activity.
6. Add a “Function” activity. The function “redact_text” associated with this activity uses the Azure AI Language service to “Redact PII Data” for the chunks generated by the preceding activity. The chunking of text is done prior to redaction to comply with the service limits requirements for the PII detection feature. Configure this activity as outlined below:
- General tab,
-
- Name: Redact PII Data
-
- Settings tab,
-
- Type: Fabric user data functions
- Connection: Sign-in (if not already) using the credentials that have complete access to your workspace.
- Workspace: IntelligentApp (default selected)
- User data functions: file_processor
- Function: redact_text
-
Parameters,
-
-
-
- Name: text
- Type: list
- Value:
@activity('Generate Chunks').output.output
- Name: cognitiveServicesEndpoint
- Type: str
- Value:
@variables('cognitiveServiceEndpoint')
- Name: apiKey
- Type: str
- Value:
@variables('apiKey')
- Name: text
-
-
Configure a User Defined Function
Use On Completion connector of the activity to link to the subsequent If Condition (PII Redaction Results) activity.
7. Add an “If Conditions” activity to verify the success of the PII redaction in the previous step. If the redaction was unsuccessful, an email would be sent to the configured recipient, and the pipeline would be terminated. Configure this activity as outlined below,
-
- General tab,
- Name: PII Reaction Results
- Activities tab,
- Expression:
@empty(activity('Redact PII Data').error)
- Case: False, edit the “false” condition using the pencil icon, and add the following activities:
- Expression:
- General tab,
“Office 365 Outlook” activity: To send alert emails.
-
- General tab,
- Name: Redaction Failure Email Alert
- Settings tab,
- Signed in as: Sign-in (if not already) using the credentials as those of your workspace.
- To:
@variables('recepientEmailAddress')
- Subject: Text Redaction Error
- Body:
<p>@{replace(string(activity('Redact PII Data').error.message), '\','')} </p>
- General tab,
Advanced,
-
-
- From:
@variables('senderEmailAddress')
- Importance: High
- From:
-
Use On Success connector of the activity to link to the subsequent Fail activity.
“Fail” activity: To terminate the pipeline
-
- General tab,
- Name: Text Redaction Process Failure
- Settings tab,
- Fail message:
@{replace(string(activity('Redact PII Data').error), '\','')}
- Error code:
@{activity('Redact PII Data').statuscode}
- Fail message:
- General tab,
Return to the main canvas and use the On Success connector of the “If Condition” activity to link to the subsequent Function (Generate Embeddings) activity.
8. Add a “Function” activity. The function “generate_embeddings” associated with this activity uses Azure Open AI Service embedding model to convert the redacted chunks into embeddings. Configure this activity as outlined below:
-
- General tab,
- Name: Generate Embeddings
- Settings tab,
- Type: Fabric user data functions
- Connection: Sign-in (if not already) using the credentials that have complete access to your workspace.
- Workspace: IntelligentApp (default selected)
- User data functions: file_processor
- Function: generate_embeddings
- General tab,
Parameters,
-
-
- Name: text
- Type: list
- Value:
@activity('Redact PII Data').output.output
- Name: openAIServiceEndpoint
- Type: str
- Value:
@variables('openAIEndpoint')
- Name: embeddingModel
- Type: str
- Value:
@variables('embeddingModel')
- Name: openAIKey
- Type: str
- Value:
@variables('openAIKey')
- Name: fileName
- Type: str
- Value:
@variables('fileName')
- Name: text
-
Use On Completion connector of the activity to link to the subsequent If Condition (Generate Embeddings Results) activity.
9. Add an “If Conditions” activity to verify the success of the Generate Embeddings in the previous step. If the embeddings generation were unsuccessful, an email would be sent to the configured recipient, and the pipeline would be terminated. Configure this activity as outlined below:
-
- General tab,
- Name: Generate Embeddings Results
- Activities tab,
- Expression:
@empty(activity('Generate Embeddings').error)
- Case: False, edit the “false” condition using the pencil icon, and add the following activities:
- Expression:
- General tab,
“Office 365 Outlook” activity: To send alert emails.
-
- General tab,
- Name: Generate Embeddings Failure Email Alert
- Settings tab,
- Signed in as: Sign-in (if not already) using the credentials as those of your workspace.
- To: @variables(‘recepientEmailAddress’)
- Subject: Generate Embeddings Error
- Body:
<p>@{replace(string(activity('Generate Embeddings').error.message)</p>
- General tab,
Advanced,
-
-
- From:
@variables('senderEmailAddress')
- Importance: High
- From:
-
Use On Success connector of the activity to link to the subsequent Fail activity.
“Fail” activity: To terminate the pipeline
-
- General tab,
- Name: Generate Embeddings Processing Failure
- Settings tab,
- Fail message:
@{replace(string(activity('Generate Embeddings').error), '\','')}
- Error code:
@{activity('Generate Embeddings').statuscode}
- Fail message:
- General tab,
Return to the main canvas and use On Success connector of the “If Condition” activity to link to the subsequent Function (Create Database Objects) activity.
10. Add a “Function” activity. The function “create_table” associated with this activity executes a SQL command to create a documents table within the previously created “datamart”, SQL database. Configure this activity as outlined below:
-
- General tab,
- Name: Create Database Objects
- Settings tab,
- Type: Fabric user data functions
- Connection: Sign-in (if not already) using the credentials that have complete access to your workspace.
- Workspace: IntelligentApp (default selected)
- User data functions: file_processor
- Function: create_table
- General tab,
Use On Success connector of the activity to link to the subsequent Function (Save Data) activity.
11. Add a “Function” activity. The function “insert_data” associated with this activity executes a SQL command to bulk insert rows in the documents table created in the previous activity. Configure this activity as outlined below:
-
- General tab,
- Name: Save Data
- Settings tab,
- Type: Fabric user data functions
- Connection: Sign-in (if not already) using the credentials that have complete access to your workspace.
- Workspace: IntelligentApp (default selected)
- User data functions: file_processor
- Function: insert_data
- General tab,
Parameters,
-
-
-
- Name: data
- Type: list
- Value:
@activity('Generate Embeddings').output.output
- Name: data
-
-
Troubleshooting
- When adding a Python library from PyPI to User Data Functions, you might notice an error, such as a wiggly line under the library name (e.g., “azure-ai-textanalytics”), like a spelling mistake. Users should ensure the library name is spelled correctly and then ignore the error by tabbing out to the Version dropdown and selecting the correct version, this transient error should resolve itself.
-
The imported pipeline reportedly doesn’t seem to preload with the parameter values. For each activity in the pipeline, ensure that the parameter values are provided and correct. Refer to the Blank Canvas section for the required parameters and their values.
Execute pipeline (pipeline in action)
It’s now time to put everything we have done so far into perspective and see the pipeline in action.
- Upload a PDF file,
- Use the Azure Storage Explorer or alternatively Azure Portal and create a Blob container named “resume”.
- Upload a PDF file from the Kaggle dataset.
Create a container and upload a PDF file
- Review pipeline execution
- From the pipeline’s “Run” menu, select “View run history” and select the recent pipeline run.
- In the details view, check to see if the status is “Succeeded”.
- In case of a Failure, try to “Rerun” the pipeline using the rerun option.
Pipeline activity execution
- Review Lakehouse
- A folder with the same name as that of the container (resume) is created.
- The PDF file is copied from Azure Blob Storage to the Lakehouse files.
PDF file copied to Lakehouse
- Review database
- The document table should be automatically created by the pipeline.
- Redacted chunk data and the embeddings stored in the documents table.
Save data in SQL Database in Fabric
Conclusion
We hope this blog post has provided useful insights for preparing your data for AI applications. Your feedback is important, and we look forward to seeing how you use this information to develop innovative AI solutions.
Would also like to express our sincere gratitude to Aaron Saidi for the invaluable contribution towards reviewing this blog post. Thank you, Aaron, for your support.
Submit your feedback on Fabric Ideas and join the conversation on the Fabric Community.