Microsoft Fabric Updates Blog

Building Custom AI Applications with Microsoft Fabric: Implementing Retrieval Augmented Generation for Enhanced Language Models

We are excited to share guidance for how you can use Microsoft Fabric to turn your data into knowledge for Generative AI applications. This guide will walk you through implementing a RAG (Retrieval Augmented Generation) system in Microsoft Fabric using Azure OpenAI and Azure AI Search. By the end, you’ll be more familiar with how to customize your AI solutions with your specific data, significantly improving the effectiveness of your AI applications. You can also access and run the Notebook here.

What is Retrieval Augmented Generation?

Large Language Models (LLMs), like OpenAI’s ChatGPT, have revolutionized how we interact with and utilize artificial intelligence. These models are adept at generating human-like text and can be harnessed for a wide range of applications. However, to maximize their utility in specific contexts, such as business environments or specialized domains, customization is essential.

Out-of-the-box, LLMs are trained on broad datasets that encompass a wide array of general knowledge. While this makes them versatile, it also means they might lack the precision needed for specialized tasks. For instance, a legal professional might need the LLM to understand specific legal terminology and case law, or a customer service bot might need detailed knowledge about a company’s products and policies.

One effective method to enhance the relevance and accuracy of LLM outputs is through Retrieval Augmented Generation (RAG). RAG supplements the model’s responses with dynamically retrieved information, ensuring that the output is both contextually relevant and up-to-date. This is particularly valuable when dealing with specific business data or customer queries, where generic responses fall short.

In RAG, the LLM is augmented by an information retrieval system that searches a database of unstructured text data for relevant chunks. These chunks are then used to inform and enhance the model’s responses. The Vector Search Index is a powerful tool in this context, enabling efficient searches through vast datasets by ranking text chunks based on their relevance to the input query.

Prerequisites

To follow this guide, you will need to ensure that you have access to the following services and have the necessary credentials and keys set up.

  • Microsoft Fabric to orchestrate the data pipeline.
  • Azure OpenAI Studio to manage and deploy OpenAI models.
  • Azure AI Search to create and manage the vector search index.

Step 1: Overview of Azure Setup

First, set up your Azure subscription. If you don’t have one, you can create a free account here. Once done, you need to take following steps to set up Azure OpenAI and Azure AI Search Services.

  • Create Azure OpenAI Service: Follow the instructions here to create an Azure OpenAI Service.
  • Set Up Keys and Endpoints: Define the endpoints and necessary keys to set up your Azure AI services. For deploying your own OpenAI models outside of Microsoft Fabric, Azure AI Studio for OpenAI is recommended. Two models are required for RAG: text-embedding-ada-002 for embedding and gpt-35-turbo or a similar chat model.
  • Set Up Azure AI Search: Create a vector database through Azure AI Search by following the instructions here.
# Fill in the following lines with your Azure OpenAI service information
aoai_endpoint = "https://<your-openai-endpoint>.openai.azure.com" # Provide the URL endpoint for your created Azure OpenAI
aoai_key = "<your-aoai-key>" # Fill in your API key from Azure OpenAI 
aoai_deployment_name_embeddings = "text-embedding-ada-002"
aoai_model_name_query = "gpt-35-turbo"
aoai_api_version = "2024-02-01"

# Setup key accesses to Azure AI Search
aisearch_index_name = "<your-index-name>" # Create a new index name: must only contain lowercase, numbers, and dashes
aisearch_api_key = "<your-aisearch-key>" # Fill in your API key from Azure AI Search
aisearch_endpoint = "https://<your-aisearch-endpoint>.search.windows.net" # Provide the URL endpoint for your created Azure AI Search 
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning) 

import os, requests, json, warnings

from datetime import datetime, timedelta
from azure.core.credentials import AzureKeyCredential
from azure.search.documents import SearchClient

from pyspark.sql import functions as F
from pyspark.sql.functions import to_timestamp, current_timestamp, concat, col, split, explode, udf, monotonically_increasing_id, when, rand, coalesce, lit, input_file_name, regexp_extract, concat_ws, length, ceil
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, ArrayType, FloatType
from pyspark.sql import Row
import pandas as pd
from azure.search.documents.indexes import SearchIndexClient
from azure.search.documents.models import VectorizedQuery
from azure.search.documents.indexes.models import (  
    SearchIndex,  
    SearchField,  
    SearchFieldDataType,  
    SimpleField,  
    SearchableField,   
    SemanticConfiguration,  
    SemanticPrioritizedFields,
    SemanticField,  
    SemanticSearch,
    VectorSearch,  
    HnswAlgorithmConfiguration,
    HnswParameters,  
    VectorSearchProfile,
    VectorSearchAlgorithmKind,
    VectorSearchAlgorithmMetric,
)

import openai
import matplotlib.pyplot as plt
from synapse.ml.featurize.text import PageSplitter
import ipywidgets as widgets  
from IPython.display import display as w_display

Step 2: Load the Data into the Lakehouse and Spark

This guide uses the Carnegie Mellon University Question-Answer dataset version 1.2, which includes Wikipedia articles, factual questions, and corresponding answers, all manually generated. The data has been cleaned and organized into a single structured table with fields for article title, question, answer, difficulty ratings, and cleaned article text. The dataset is divided into semesters S08, S09, and S10, but due to licensing, only S08 and S09 are used, consolidated into one table. This guide focuses on sets 1 and 2 from S08, specifically highlighting wildlife and countries.

# Publicly hosted refined dataset inspired by https://www.cs.cmu.edu/~ark/QA-data/
storage_account_name = "appliedaipublicdata"
container_name = "cmuqa-08-09"

wasbs_path = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/output/part-00000-c258b030-b04e-4f9d-b887-c85225af4332-c000.snappy.parquet"  

# Save as a delta lake, parquet table to Tables section of the default lakehouse
spark.read.parquet(wasbs_path).write.mode("overwrite").format("delta").saveAsTable("cmu_qa_08_09")

# Read parquet table from default lakehouse into Spark DataFrame
df_dataset = spark.sql("SELECT * FROM cmu_qa_08_09")
display(df_dataset)

# Filter the DataFrame to include only the specified paths
df_selected = df_dataset.filter((col("ExtractedPath").like("S08/data/set1/%")) | (col("ExtractedPath").like("S08/data/set2/%")))

# Select only the required columns
filtered_df = df_selected.select('ExtractedPath', 'ArticleTitle', 'text')

# Drop duplicate rows based on ExtractedPath, ArticleTitle, and text
df_wiki = filtered_df.dropDuplicates(['ExtractedPath', 'ArticleTitle', 'text'])

# Show the result
display(df_wiki)

Step 3: Chunk the Text

When large documents are inputted into RAG, it needs to extract the most important information to answer user queries. Chunking involves breaking down large text into smaller segments or chunks. In the RAG context, embedding smaller chunks rather than entire documents for the knowledge base means retrieving only the most relevant chunks in response to a user’s query. This approach reduces input tokens and provides more focused context for the LLM to process.

To perform chunking, you should use the PageSplitter implementation from the SynapseML library for distributed processing. Adjusting the page length parameters (in characters) is crucial for optimizing the performance based on the text size supported by the language model and the number of chunks selected as context for the conversational bot. For demonstration, a page length of 4000 characters is recommended.

ps = (
    PageSplitter()
    .setInputCol("text")
    .setMaximumPageLength(4000)
    .setMinimumPageLength(3000)
    .setOutputCol("chunks")
)

df_splitted = ps.transform(df_wiki) 
display(df_splitted.limit(10)) 

Each row can contain multiple chunks from the same document represented as a vector. When you use the function explode on a DataFrame column that contains arrays, it will create a new row for every element within the array of each original row

df_chunks = df_splitted.select('ExtractedPath', 'ArticleTitle', 'text', explode(col("chunks")).alias("chunk"))
display(df_chunks)

Now, you will add a unique id for each row.

df_chunks_id = df_chunks.withColumn("Id", monotonically_increasing_id())
display(df_chunks_id)

Step 4: Create Embeddings

In RAG, embedding refers to incorporating relevant chunks of information from documents into the model’s knowledge base. These chunks are chosen based on their relevance to potential user queries, allowing the model to retrieve specific and targeted information rather than entire documents. Embedding helps optimize the retrieval process by providing pertinent context for generating accurate responses to user inputs.

In this section, you will utilize Azure OpenAI to generate embeddings for each chunk of text. The integration between Azure and Spark offers a significant advantage for handling large datasets efficiently. Although the current dataset is not particularly large, employing Spark User-Defined Functions (UDFs) ensures readiness for future scalability. UDFs enable the creation of custom functions that can process Spark DataFrames, enhancing Spark’s capabilities.

Next, you will encapsulate the embedding function with a UDF decorator to compute embeddings for each chunk in the DataFrame. By doing so, you transform the function into a UDF that can be applied to the DataFrame for parallel processing.

# UDF definition
@udf(returnType=ArrayType(FloatType()))
def get_openai_embedding(content):
    response = openai.Embedding.create(
        input=content,
        engine=aoai_deployment_name_embeddings,
        api_key=aoai_key,
        api_base=aoai_endpoint,
        api_version=aoai_api_version
    )
    return response['data'][0]['embedding']

# Apply UDF to DataFrame
df_embeddings = df_chunks_id.withColumn("Embedding", get_openai_embedding(col("chunk")))

# Display the result
display(df_embeddings.limit(10))

Step 5: Create Vector Index with Azure AI Search

In RAG, creating a vector index helps quickly retrieve the most relevant information for user queries. By organizing document chunks into a vector space, RAG can match and generate responses based on similar content rather than just keywords. This makes the responses more accurate and meaningful, improving how well the system understands and responds to user inputs.

In the next steps, you will set up a search index in Azure AI Search that integrates both semantic and vector search capabilities. You can begin by initializing the SearchIndexClient with the required endpoint and API key. Then, define the data structure using a list of fields, specifying their types and attributes. The Chunk field will hold the text to be retrieved, while the Embedding field will facilitate vector-based searches. Additional fields like ArticleTitle and ExtractedPath can be included for filtering purposes. For custom datasets, you can adjust the fields as necessary.

For vector search, configure the Hierarchical Navigable Small Worlds (HNSW) algorithm by specifying its parameters and creating a usage profile. You can also set up semantic search by defining a configuration that emphasizes specific fields for improved relevance. Finally, create the search index with these configurations and utilize the client to create or update the index, ensuring it supports advanced search operations.

Note that while this guide focuses on vector search, Azure Search offers text search, filtering, and semantic ranking capabilities that are beneficial for various applications. Here is the code to create the index:

index_client = SearchIndexClient(
    endpoint=aisearch_endpoint,
    credential=AzureKeyCredential(aisearch_api_key),
)

fields = [
    SimpleField(name="Id", type=SearchFieldDataType.String, key=True, sortable=True, filterable=True, facetable=True),
    SearchableField(name="ArticleTitle", type=SearchFieldDataType.String, filterable=True),
    SearchableField(name="ExtractedPath", type=SearchFieldDataType.String, filterable=True),
    SearchableField(name="Chunk", type=SearchFieldDataType.String, searchable=True),
    SearchField(name="Embedding",
        type=SearchFieldDataType.Collection(SearchFieldDataType.Single),
        searchable=True,
        vector_search_dimensions=1536,
        vector_search_profile_name="my-vector-config"
    ),
]

vector_search = VectorSearch(
    algorithms=[
        HnswAlgorithmConfiguration(
            name="myHnsw",
            kind=VectorSearchAlgorithmKind.HNSW,
            parameters=HnswParameters(
                m=4,
                ef_construction=400,
                ef_search=500,
                metric=VectorSearchAlgorithmMetric.COSINE
            )
        )
    ],
    profiles=[
        VectorSearchProfile(
            name="my-vector-config",
            algorithm_configuration_name="myHnsw",
        )
    ]
)

semantic_config = SemanticConfiguration(
    name="my-semantic-config",
    prioritized_fields=SemanticPrioritizedFields(
        title_field=SemanticField(field_name="ArticleTitle"),
        prioritized_content_fields=[SemanticField(field_name="Chunk")]
    )
)

# Create the semantic settings with the configuration
semantic_search = SemanticSearch(configurations=[semantic_config])

# Create the search index with the semantic settings
index = SearchIndex(
    name=aisearch_index_name,
    fields=fields,
    vector_search=vector_search,
    semantic_search=semantic_search
)

result = index_client.create_or_update_index(index)
print(f'{result.name} created')

Next, you define a UDF named insertToAISearch that inserts data into the Azure AI Search index. This function, annotated with @udf(returnType=StringType()), specifies the return type as a string. The function constructs a URL for the Azure AI Search API, creates a payload in JSON format, and sends a POST request with the headers and payload.

@udf(returnType=StringType())
def insertToAISearch(Id, ArticleTitle, ExtractedPath, Chunk, Embedding):
    url = f"{aisearch_endpoint}/indexes/{aisearch_index_name}/docs/index?api-version=2023-11-01"
    payload = json.dumps({
        "value": [
            {
                "Id": str(Id),
                "ArticleTitle": ArticleTitle,
                "ExtractedPath": ExtractedPath,
                "Chunk": Chunk, 
                "Embedding": Embedding,
                "@search.action": "upload",
            }
        ]
    })
    headers = {
        "Content-Type": "application/json",
        "api-key": aisearch_api_key,
    }
    response = requests.request("POST", url, headers=headers, data=payload)
    print(response.text)

    if response.status_code == 200 or response.status_code == 201:
        return "Success"
    else:
        return response.text

Using the insertToAISearch function, you can upload data from a DataFrame to the Azure AI Search index:

df_embeddings_ingested = df_embeddings.withColumn(
    "errorAISearch",
    insertToAISearch(
        df_embeddings["Id"],
        df_embeddings["ArticleTitle"],
        df_embeddings["ExtractedPath"],
        df_embeddings["Chunk"],
        df_embeddings["Embedding"]
    )
)

display(df_embeddings_ingested)

Perform sanity checks to ensure the data has been correctly uploaded to the Azure AI Search index:

# Count the number of successful uploads
successful_uploads = df_embeddings_ingested.filter(col("errorAISearch") == "Success").count()

# Identify and display unsuccessful uploads
unsuccessful_uploads = df_embeddings_ingested.filter(col("errorAISearch") != "Success")
unsuccessful_uploads_count = unsuccessful_uploads.count()

# Display the results
print(f"Number of successful uploads: {successful_uploads}")
print(f"Number of unsuccessful uploads: {unsuccessful_uploads_count}")

# Show details of unsuccessful uploads if any
if unsuccessful_uploads_count > 0:
    unsuccessful_uploads.show()

Step 6: Demonstrate Retrieval Augmented Generation

Once you’ve chunked, embedded, and created a vector index, the final step is to use this indexed data to find and retrieve the most relevant information based on user queries. This allows the system to generate accurate responses or recommendations by leveraging the indexed data’s organization and similarity scores from the embeddings.

First, create a function to retrieve chunks of relevant Wikipedia articles from the vector index using Azure AI Search.

def get_context_source(question, topN=3):
    """
    Retrieves contextual information and sources related to a given question using embeddings and a vector search.  
    Parameters:  
    question (str): The question for which the context and sources are to be retrieved.  
    topN (int, optional): The number of top results to retrieve. Default is 3.     
    Returns:  
    List: A list containing two elements:  
        1. A string with the concatenated retrieved context.  
        2. A list of retrieved source paths.  
    """
    embed_client = openai.AzureOpenAI(
        api_version=aoai_api_version,
        azure_endpoint=aoai_endpoint,
        api_key=aoai_key,
    )

    query_embedding = embed_client.embeddings.create(input=question, model=aoai_deployment_name_embeddings).data[0].embedding
    vector_query = VectorizedQuery(vector=query_embedding, k_nearest_neighbors=topN, fields="Embedding")

    search_client = SearchClient(
        aisearch_endpoint,
        aisearch_index_name,
        credential=AzureKeyCredential(aisearch_api_key)
    )

    results = search_client.search(vector_queries=[vector_query], top=topN)
    retrieved_context = ""
    retrieved_sources = []
    
    for result in results:
        retrieved_context += result['ExtractedPath'] + "\n" + result['Chunk'] + "\n\n"
        retrieved_sources.append(result['ExtractedPath'])

    return [retrieved_context, retrieved_sources]

Next, create a function to get the response from the OpenAI Chat model. This function combines the user question with the context retrieved from Azure AI Search.

def get_answer(question, context):
    """  
    Generates a response to a given question using provided context and an   Azure OpenAI model.  
    
    Parameters:  
        question (str): The question that needs to be answered.  
        context (str): The contextual information related to the question that will help generate a relevant response.  
    
    Returns:  
        str: The response generated by the Azure OpenAI model based on the provided question and context.  
    """
    messages = [
        {
            "role": "system",
            "content": "You are a helpful chat assistant who will be provided text information for you to refer to in response."
        }
    ]

    messages.append(
        {
            "role": "user", 
            "content": question + "\n" + context,
        },
    )

    chat_client = openai.AzureOpenAI(
        azure_endpoint=aoai_endpoint,
        api_key=aoai_key,
        api_version=aoai_api_version,
    )

    chat_completion = chat_client.chat.completions.create(
        model=aoai_model_name_query,
        messages=messages,
    )

    return chat_completion.choices[0].message.content

Finally, you can test the retrieval and answering functions.

question = "How do elephants communicate over long distances?"
retrieved_context, retrieved_sources = get_context_source(question)
answer = get_answer(question, retrieved_context)
print(answer)

As a bonus, you can use the following code to create a basic ipywidget to serve as a chatbot interface.

# Create a text box for input  
text = widgets.Text(
    value='',
    placeholder='Type something',
    description='Question:',
    disabled=False,
    continuous_update=False,
    layout=widgets.Layout(width='800px')  # Adjust the width as needed
)

# Create an HTML widget to display the answer 
label = widgets.HTML(
    value='',
    layout=widgets.Layout(width='800px')  # Adjust the width as needed
)

# Define what happens when the text box value changes  
def on_text_change(change):
    if change['type'] == 'change' and change['name'] == 'value':
        retrieved_context, retrieved_sources = get_context_source(change['new'])
        label.value = f"{get_answer(change['new'], retrieved_context)}"

text.observe(on_text_change)

# Display the text box and label  
w_display(text, label)

Note that this solution can be used to answer queries accurately by leveraging both semantic and vector search capabilities.

Conclusion

This guide provided a comprehensive overview of implementing a RAG application using Azure OpenAI and Azure AI Search in Microsoft Fabric. By following these steps, you enhanced the language model’s ability to provide accurate and contextually relevant answers to user queries. By implementing the RAG strategy, you can create powerful and customized AI solutions in Microsoft Fabric for business applications, ensuring that the generated responses are both precise and tailored to specific datasets. For next steps, you can access the notebook here and refine the RAG application using your own datasets.

Post Author(s):
Amir Jafari – Senior Product Manager in Azure Data.
Journey McDowell – Senior AI Engineer in Azure Data.
Hossein Khadivi Heris – Principal AI Engineer in Azure Data.
Alexandra Savelieva – Principal AI Engineer in Azure Data.

Related blog posts

Building Custom AI Applications with Microsoft Fabric: Implementing Retrieval Augmented Generation for Enhanced Language Models

September 25, 2024 by Santhosh Kumar Ravindran

We’re excited to introduce high concurrency mode for notebooks in pipelines, bringing session sharing to one of the most popular orchestration mechanisms for enterprise data ingestion and transformation. Notebooks will now automatically be packed into an active high concurrency session without compromising performance or security, while paying for a single session. Key Benefits: Why Use … Continue reading “Introducing High Concurrency Mode for Notebooks in Pipelines for Fabric Spark”

September 25, 2024 by Jenny Jiang

Fabric Apache Spark Diagnostic Emitter for Logs and Metrics is now in public preview. This new feature allows Apache Spark users to collect Spark logs, job events, and metrics from their Spark applications and send them to various destinations, including Azure Event Hubs, Azure Storage, and Azure Log Analytics. It provides robust support for monitoring … Continue reading “Announcing the Fabric Apache Spark Diagnostic Emitter: Collect Logs and Metrics”