Microsoft Fabric Updates Blog

Unleashing the Power of Microsoft Fabric and SynapseML: A Guide to Q&A on PDF Documents with LLMs

Author(s):
Amir Jafari, Senior Product Manager in Azure Data.
Mark Hamilton, Senior Software Engineer in Azure Data.
Aydan Aksoylar, Senior Applied AI Engineer in Azure Data.
Kartavya Neema, Principal Applied AI Engineer in Azure Data.
Nellie Gustafsson, Principal PM Manager in Azure Data.

Generative AI models or Large Language Models (LLMs) have recently gained a lot of attention when it comes to generating content and helping solve otherwise complex tasks with small effort. Generative AI and LLMs can play a pivotal role in addressing real-world problems, notably in question answering. These powerful models possess the ability to comprehend intricate queries and extract relevant information from vast amounts of text, and with their contextual understanding, extensive pre-training, and language fluency they can provide coherent and human-like answers that align with the given context, making them indispensable across diverse fields.

In this blog post, we delve into the exciting functionalities and features of Microsoft Fabric and SynapseML to demonstrate how to leverage LLMs to perform question and answer (Q&A) tasks on any PDF document. Microsoft Fabric and SynapseML provide a powerful set of LLM tools and technologies that enable seamless integration and analysis of data, making it possible to extract meaningful insights from unstructured documents like PDFs.

We will explore the unique LLM capabilities of Microsoft Fabric and SynapseML, showcasing how they leverage LLMs to empower users to efficiently extract information and gain valuable insights through Q&A processes. We will cover the following key steps:

  1. Azure Open AI Integration: Learn how to apply LLMs at a distributed scale through integration of Azure Open AI services with SynapseML. Azure OpenAI services enable solving various natural language tasks by prompting the completion API and its integration with SynapseML allows users to use Apache Spark distributed computing framework to process millions of prompts effortlessly.
  2. Preprocessing PDF Documents: Learn how to load the PDF documents into a Spark DataFrame, read the documents using the Azure AI Document Intelligence in Azure AI Services, and use SynapseML to split the documents into chunks.
  3. Embedding Generation and Storage: Learn how to generate for the chunks using SynapseML and Azure OpenAI Service, store the embeddings in a vector store using Azure Cognitive Search, and search the vector store to answer the user’s question.
  4. Question Answering Pipeline: Learn how to retrieve relevant documents based on the user’s question and provide accurate answers using LangChain

Throughout the blog, we will highlight the unique LLM features of Microsoft Fabric and SynapseML that make them stand out in the realm of Q&A over PDF documents. These features include:

  1. Scalability and Performance: Demonstrate how Microsoft Fabric and SynapseML handle large-scale data processing through simple, composable, and distributed APIs on top of the Apache Spark distributed computing framework and deliver high-performance results, making them suitable for enterprise-level applications.
  2. Seamless Integration: Understand how Microsoft Fabric and SynapseML seamlessly integrate with each other, allowing users to leverage the strengths of both tools to achieve powerful Q&A capabilities over PDF documents.

With this guide, you can now embark on your journey to leverage Microsoft Fabric and SynapseML LLM capabilities to perform Q&A tasks and gain valuable knowledge from your PDF documents. You can access and run the Q&A Notebook.

Step 1: Provide the keys for Azure AI Services and Azure OpenAI to authenticate the applications.

To authenticate Azure AI Services and Azure OpenAI applications, you need to provide the respective API keys. Here is an example of how you can provide the keys in Python code.

# Setup key accesses to Azure AI Services
ai_services_key = "YOUR_AI_SERVICES_API_KEY" # your AI Services key
ai_services_location = "YOUR_AI_SERVICES_LOCATION" # your AI Services location

# Setup key access to Azure Open AI Services
aoai_service_name = "YOUR_OPENAI_SERVICE_NAME" your Open AI service name
aoai_endpoint = "YOUR_OPENAI_API_ENDPOINT" # your Open AI endpoint
aoai_key = "YOUR_OPENAI_API_KEY" # your Open AI key
aoai_deployment_name_embeddings = "text-embedding-ada-002"
aoai_deployment_name_query = "text-davinci-003"
aoai_model_name_query = "text-davinci-003"

# Azure Cognitive Search
cogsearch_name = "YOUR_COGNITIVE_SEARCH_NAME" # your cognitive search name
cogsearch_index_name = "YOUR_COGNITIVE_SEARCH_INDEX_NAME" 
# your index name: must only contain lowercase, numbers, and dashes
cogsearch_api_key = "YOUR_COGNITIVE_SEARCH_API_KEY" # your api key with admin key


Replace
“YOUR_AI_SERVICES_API_KEY” with your Azure AI Services API key
“YOUR_AI_SERVICES_LOCATION” with your Azure AI Services location
“YOUR_OPENAI_SERVICE_NAME” with your Azure OpenAI service name
“YOUR_OPENAI_API_ENDPOINT” with your Azure OpenAI endpoint
“YOUR_OPENAI_API_KEY” with your Azure OpenAI API key
“YOUR_COGNITIVE_SEARCH_NAME” with your desired cognitive search name
“YOUR_COGNITIVE_SEARCH_INDEX_NAME” with your cognitive search index name
“YOUR_COGNITIVE_SEARCH_API_KEY” with your Azure cognitive search API key.

Step 2: Load the PDF documents into a Spark DataFrame.

To load PDF documents into a Spark DataFrame, you can use the spark.read.format(“binaryFile”) method provided by Apache Spark. Here’s an example code snippet to accomplish this.

# Import required libraries from PySpark
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

document_path = "path_to_pdf_folder_or_file" # path to your document
df = (
    spark.read.format("binaryFile")
    .load(document_path)
    .limit(10)
    .cache()
)

Make sure to replace “path_to_pdf_folder_or_file” with the actual path to the folder containing PDF documents or the path to a single PDF file. This code will read the PDF documents and create a Spark DataFrame named df with the contents of the PDFs. The DataFrame will have a schema that represents the structure of the PDF documents, including their textual content.

Step 3: Read the documents using Azure AI Document Intelligence. 

We utilize SynapseML, an ecosystem of tools designed to enhance the distributed computing framework Apache Spark. SynapseML introduces advanced networking capabilities to the Spark ecosystem and offers user-friendly SparkML transformers for various Azure AI Services through taking the spark pipelines and deploying them with sub-millisecond latencies.

Additionally, we employ AnalyzeDocument from Azure AI Services to extract the complete document content and present it in the designated columns called “output_content” and “paragraph.”

# Import required libraries from SynapseML and PySpark
from synapse.ml.cognitive import AnalyzeDocument
from pyspark.sql.functions import col

analyze_document = (
    AnalyzeDocument()
    .setPrebuiltModelId("prebuilt-layout")
    .setSubscriptionKey(ai_services_key)
    .setLocation(ai_services_location)
    .setImageBytesCol("content")
    .setOutputCol("result")
    .setPages("1-15") # for sake of quick processing, only read the first 15 pages of the documents
)

analyzed_df = (
    analyze_document.transform(df)
    .withColumn("output_content", col("result.analyzeResult.content"))
    .withColumn("paragraphs", col("result.analyzeResult.paragraphs"))).cache()

We can observe the analyzed Spark DataFrame named analyzed_df using the following code. Note that we drop the “content” column as it is not needed anymore.

analyzed_df = analyzed_df.drop("content")
display(analyzed_df)

Step 4: Split the documents into chunks.

After analyzing the document, we leverage SynapseML’s PageSplitter to divide the documents into smaller sections, which are subsequently stored in the “chunks” column. This allows for more granular representation and processing of the document content.

# Import required libraries from SynapseML
from synapse.ml.featurize.text import PageSplitter

ps = (PageSplitter()
    .setInputCol("output_content")
    .setMaximumPageLength(4000)
    .setMinimumPageLength(3000)
    .setOutputCol("chunks"))

splitted_df = ps.transform(analyzed_df)

Note that the chunks for each document are presented in a single row inside an array. In order to embed all the chunks in the following cells, we need to have each chunk in a separate row. To accomplish that, we first explode these arrays so there is only one chunk in each row, then filter the Spark DataFrame in order to only keep the path to the document and the chunk in a single row.

# Import required libraries from PySpark
from pyspark.sql.functions import explode, col
# Each column contains many chunks for the same document as a vector 
# Explode will distribute and replicate the content of a vecor across multiple rows
exploded_df = splitted_df.select("path", explode(col("chunks")).alias("chunk")).select("path", "chunk")
display(exploded_df)

Step 5: Generate Embeddings.

To produce embeddings for each chunk, we utilize both SynapseML and Azure OpenAI Service. By integrating the Azure OpenAI service with SynapseML, we can leverage the power of the Apache Spark distributed computing framework to process numerous prompts using the OpenAI service. This integration enables the SynapseML embedding client to generate embeddings in a distributed manner, enabling efficient processing of large volumes of data. If you’re interested in applying large language models at a distributed scale using Azure OpenAI and Azure Synapse Analytics, you can refer to this approach. Learn more about generating embeddings with Azure OpenAI.

# Import required libraries from SynapseML
from synapse.ml.featurize.text import PageSplitter
from synapse.ml.cognitive import OpenAIEmbedding

embedding = (
    OpenAIEmbedding()
    .setSubscriptionKey(aoai_key)
    .setDeploymentName(aoai_deployment_name_embeddings)
    .setCustomServiceName(aoai_service_name)
    .setTextCol("chunk")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

df_embeddings = embedding.transform(exploded_df)

Step 6: Store the embeddings in Azure Cognitive Search Vector Store.

Azure Cognitive Search offers a user-friendly interface for creating a vector database, as well as storing and retrieving data using vector search.

Storing data in the AzureCogSearch vector database involves two main steps:

Creating the Index: The first step is to establish the index or schema of the vector database. This entails defining the structure and properties of the data that will be stored and indexed in the vector database.

Adding Chunked Documents and Embeddings: The second step involves adding the chunked documents, along with their corresponding embeddings, to the vector datastore. This allows for efficient storage and retrieval of the data using vector search capabilities.

By following these steps, you can effectively store your chunked documents and their associated embeddings in the AzureCogSearch vector database, enabling seamless retrieval of relevant information through vector search functionality.

# Import required libraries
import requests 
import json

EMBEDDING_LENGTH = 1536 # length of the embedding vector (OpenAI generates embeddings of length 1536)

# Create Index for Cognitive Search with fields as id, content, and contentVector

url = f"https://{cogsearch_name}.search.windows.net/indexes/{cogsearch_index_name}?api-version=2023-07-01-Preview"
payload = json.dumps(
    {
        "name": cogsearch_index_name,
        "fields": [
            {"name": "id", "type": "Edm.String", "key": True, "filterable": True},
            {
                "name": "content",
                "type": "Edm.String",
                "searchable": True,
                "retrievable": True,
            },
            {
                "name": "contentVector",
                "type": "Collection(Edm.Single)",
                "searchable": True,
                "retrievable": True,
                "dimensions": EMBEDDING_LENGTH,
                "vectorSearchConfiguration": "vectorConfig",
            },
        ],
        "vectorSearch": {
            "algorithmConfigurations": [
                {
                    "name": "vectorConfig",
                    "kind": "hnsw",
                }
            ]
        },
    }
)
headers = {"Content-Type": "application/json", "api-key": cogsearch_api_key}

response = requests.request("PUT", url, headers=headers, data=payload)
print(response.status_code)

By employing the udf() method, we can utilize User Defined Functions (UDFs) to directly apply functions to DataFrames and SQL databases without the requirement of separate registration for each function. Here, we define and register a UDF to insert the entries into Cognitive search.

# Use Spark's UDF to insert entries to Cognitive Search
# This allows to run the code in a distributed fashion

@udf(returnType=StringType())
def insert_to_cog_search(idx, content, contentVector ):
    url = f"https://{cogsearch_name}.search.windows.net/indexes/{cogsearch_index_name}/docs/index?api-version=2023-07-01-Preview"

    payload = json.dumps({
    "value": [
        {
        "id": str(idx),
        "content": content,
        "contentVector": contentVector.tolist(),
        "@search.action": "upload"
        },
    ]
    })
    headers = {
    'Content-Type': 'application/json',
    'api-key': cogsearch_api_key,
    }

    response = requests.request("POST", url, headers=headers, data=payload)
    # response.text

    if response.status_code == 200 or response.status_code == 201:
        return "Success"
    else:
        return "Failure"

In the following, we apply UDF to different columns. Note that UDF can be also employed to add new columns to the DataFrame.

# Import required libraries
from pyspark.sql.functions import monotonically_increasing_id

df_embeddings = df_embeddings.withColumn("idx", monotonically_increasing_id()) # add a new column

df_embeddings = df_embeddings.withColumn("errorCogSearch", insert_to_cog_search(df_embeddings["idx"], df_embeddings["chunk"], df_embeddings["embeddings"]))

# Show the transformed DataFrame
df_embeddings.show()

Step 7: Ask a Question.

After processing the document, we can proceed to pose a question. We will use SynapseML to convert the user’s question into an embedding and then utilize to retrieve the top K document chunks that closely match the user’s question. It is worth mentioning that alternative similarity metrics can also be employed.

# Import required libraries from SynapseML
from synapse.ml.cognitive import OpenAIEmbedding

user_question = "YOUR_QUESTION" # type the user's question
retrieve_k = 2 # retrieve the top K (i.e., 2) documents from vector database

def gen_question_embedding(user_question): 
    # Use synapseML to convert the question to embeddings 

    df_ques = spark.createDataFrame([(user_question, 1)], ["questions", "dummy"])
    embedding = (
        OpenAIEmbedding()
        .setSubscriptionKey(aoai_key)
        .setDeploymentName(aoai_deployment_name_embeddings)
        .setCustomServiceName(aoai_service_name)
        .setTextCol("questions")
        .setErrorCol("errorQ")
        .setOutputCol("embeddings")
    )
    df_ques_embeddings = embedding.transform(df_ques)
    row = df_ques_embeddings.collect()[0]
    question_embedding = row.embeddings.tolist()
    return question_embedding

def retrieve_k_chunk(k, question_embedding):
    # Retrieve the top K entries
    url = f"https://{cogsearch_name}.search.windows.net/indexes/{cogsearch_index_name}/docs/search?api-version=2023-07-01-Preview"

    payload = json.dumps({
    "vector": {
        "value": question_embedding,
        "fields": "contentVector",
        "k": 2
    }
    })
    headers = {
    'Content-Type': 'application/json',
    'api-key': cogsearch_api_key,
    }

    response = requests.request("POST", url, headers=headers, data=payload)
    output = json.loads(response.text)
    print(response.status_code)
    return output

question_embedding = gen_question_embedding(user_question)
output = retrieve_k_chunk(retrieve_k, question_embedding)

Replace “YOUR_QUESTION” with the actual question for which you are looking for an answer.

Step 8: Respond to the User’s Question.

To provide a response to the user’s question, we will utilize the LangChain framework which is a framework designed to simplify the creation of applications using large language models. Before proceeding, it is necessary to install the LangChain library by executing the following command.

%pip install langchain openai

Once the library is installed, we will augment the retrieved documents with respect to the user’s question. Following this, we can request a response from the LangChain framework.

# Import required libraries from LangChain and set up OpenAI
from langchain.llms import AzureOpenAI
from langchain import PromptTemplate
from langchain.chains import LLMChain
import openai

openai.api_type = "azure"
openai.api_base = aoai_endpoint
openai.api_version = "2022-12-01"
openai.api_key = aoai_key


# Define a Question Answering chain function using LangChain
def qa_chain_func():

    # Define the LLM model
    llm = AzureOpenAI(deployment_name=aoai_deployment_name_query, model_name=aoai_model_name_query, openai_api_key=aoai_key, openai_api_version="2022-12-01")

    # Write a preprompt with context and query as variables 
    template = """
    context :{context}
    Answer the question based on the context above. If the
    information to answer the question is not present in the given context then reply "I don't know".
    Question: {query}
    Answer: """

    # Define a prompt template
    prompt_template = PromptTemplate(
        input_variables=["context", "query"],
        template=template
    )
    # Define a chain
    qa_chain = LLMChain(llm=llm, prompt=prompt_template)
    return qa_chain

# Concatenate the content of retrieved documents
context = [i['content'] for i in output["value"]] 

# Make a Quesion Answer chain function and pass 
qa_chain = qa_chain_func()
answer = qa_chain.run({
    'context': context,
    'query': user_question 
    })

This concludes all the required steps to build a Q&A framework using Microsoft Fabric and SynapseML. We can now wrap up the Q&A journey by asking a question and checking the answer.

user_question = "What did the astronaut Edgar Mitchell call Earth?"
retrieve_k = 1 # Retrieve the top document from vector database
Answer = "Edgar Mitchell once called Earth “a sparkling blue and white jewel”."

In this blog, we have showcased an end-to-end demonstration of using Microsoft Fabric, Azure Open AI and SynapseML for performing Q&A on any PDF documents of your choice, while taking advantage of Spark.

Through our step-by-step guide, we showcased the accuracy and efficiency of our Q&A framework and illustrated how these tools can be seamlessly integrated to extract valuable insights from PDFs. With precise implementation, we successfully demonstrated that our framework could generate insightful answers to questions posed, highlighting its potential for effectively extracting information from PDF documents.

We hope this guide was useful. Stay tuned for additional guidance on leveraging LLMs with your data in Microsoft Fabric.

Get started with Microsoft Fabric

Microsoft Fabric is currently in preview. Try out everything Fabric has to offer by signing up for a free trial—no credit card information required!

If you want to learn more about Microsoft Fabric, consider:

Learning Resources

To help you get started with Microsoft Fabric, there are several resources we recommend:

  • Microsoft Fabric Learning Paths: experience a high-level tour of Microsoft Fabric and how to get started
  • Microsoft Fabric Tutorials: get detailed tutorials with a step-by-step guide on how to create an end-to-end solution in Microsoft Fabric. These tutorials focus on a few different common patterns including a Lakehouse architecture, data warehouse architecture, real-time analytics, and data science projects.
  • Microsoft Fabric Documentation: read Fabric docs to see detailed documentation for all aspects of Microsoft Fabric.

Related blog posts

Unleashing the Power of Microsoft Fabric and SynapseML: A Guide to Q&A on PDF Documents with LLMs

September 25, 2024 by Santhosh Kumar Ravindran

We’re excited to introduce high concurrency mode for notebooks in pipelines, bringing session sharing to one of the most popular orchestration mechanisms for enterprise data ingestion and transformation. Notebooks will now automatically be packed into an active high concurrency session without compromising performance or security, while paying for a single session. Key Benefits: Why Use … Continue reading “Introducing High Concurrency Mode for Notebooks in Pipelines for Fabric Spark”

September 25, 2024 by Jenny Jiang

Fabric Apache Spark Diagnostic Emitter for Logs and Metrics is now in public preview. This new feature allows Apache Spark users to collect Spark logs, job events, and metrics from their Spark applications and send them to various destinations, including Azure Event Hubs, Azure Storage, and Azure Log Analytics. It provides robust support for monitoring … Continue reading “Announcing the Fabric Apache Spark Diagnostic Emitter: Collect Logs and Metrics”