Integrate Unstructured Serverless with Astra DB Serverless

query_builder 15 min

This tutorial explains how to integrate the Unstructured Serverless API and Astra DB Serverless to quickly convert common unstructured file types, such as PDF and document files, into LLM-ready vector data for highly relevant GenAI similarity searches. Unstructured.io provides a no-code platform and cloud service that handles any complex enterprise document type. Unstructured.io then transforms, cleans, and generates embeddings with Astra DB Serverless serving as the vector data store destination.

You can also use the built-in Unstructured data loader integration to load unstructured data into your Serverless (Vector) databases.

In this Python tutorial, you will use the Unstructured Serverless API to process a PDF document, load the extracted vector data into a Astra DB Serverless vector store, and then query the index with LangChain.

Prerequisites

This tutorial requires the following:

Install dependencies

  • RAGStack package

  • Manual install

You can install the RAGStack package to install the Unstructured.io Python client, the Astra DB Data API Python client, and the required LangChain dependencies.

pip install ragstack-ai

If you don’t want to use the RAGStack package, you must manually install the following:

Set up your environment

Define the required environment variables in an .env file in your application directory or in the Colab environment for this tutorial’s Google Colab notebook.

UNSTRUCTURED_API_KEY=UNSTRUCTURED_API_KEY
UNSTRUCTURED_API_URL=https://api.unstructuredapp.io/general/v0/general
ASTRA_DB_API_ENDPOINT=https://ASTRA_DB_ID-ASTRA_DB_REGION.apps.astra.datastax.com
ASTRA_DB_APPLICATION_TOKEN=APPLICATION_TOKEN
OPENAI_API_KEY=OPENAI_API_KEY

Create a RAG pipeline

  1. Import dependencies and load environment variables:

    import os
    import requests
    
    from dotenv import load_dotenv
    from langchain_astradb import AstraDBVectorStore
    from langchain_core.documents import Document
    from langchain_core.output_parsers import StrOutputParser
    from langchain_core.prompts import PromptTemplate
    from langchain_core.runnables import RunnablePassthrough
    
    from langchain_community.document_loaders import (
        unstructured,
        UnstructuredAPIFileLoader,
    )
    
    from langchain_openai import (
        ChatOpenAI,
        OpenAIEmbeddings,
    )
    
    load_dotenv()
  2. For this tutorial, download a PDF to parse:

    url = "https://raw.githubusercontent.com/datastax/ragstack-ai/48bc55e7dc4de6a8b79fcebcedd242dc1254dd63/examples/notebooks/resources/attention_pages_9_10.pdf"
    file_path = "./attention_pages_9_10.pdf"
    
    response = requests.get(url)
    if response.status_code == 200:
        with open(file_path, "wb") as file:
            file.write(response.content)
        print("Download complete.")
    else:
        print("Error downloading the file.")

    This file contains pages nine and ten from a PDF from arXiv.

  3. Use Unstructured to parse the PDF into elements for indexing. You can use either Simple Parsing or Advanced Parsing. This tutorial uses Advanced Parsing.

    • Simple parsing

    • Advanced parsing

    Simple Parsing mode is best for documents that don’t contain complex formatting or tables.

    loader = UnstructuredAPIFileLoader(
        file_path="./attention_pages_9_10.pdf",
        api_key=os.getenv("UNSTRUCTURED_API_KEY"),
        server_url = os.getenv("UNSTRUCTURED_API_URL"),
    )
    simple_docs = loader.load()
    
    print(len(simple_docs))
    print(simple_docs[0].page_content[0:400])

    By default, the parser returns one document per PDF file, and the output includes a sample of the document content. For this tutorial, the simple parsing output shows the first table’s description and the start of a poorly formatted table.

    If you change the processing strategy and response mode, you get a more detailed document structure. Unstructured can break the document into elements of different types, which can be helpful for improving your RAG system.

    For example, the Table element type can format tables as simple HTML, which can help the LLM answer questions from the table data, and you can exclude Footer type elements from your vector store. See the Unstructured documentation for a list of accepted element types.

    elements = unstructured.get_elements_from_api(
        file_path="./attention_pages_9_10.pdf",
        api_key=os.getenv("UNSTRUCTURED_API_KEY"),
        server_url=os.getenv("UNSTRUCTURED_API_URL"),
        strategy="hi_res", # default "auto"
        pdf_infer_table_structure=True,
    )
    
    print(len(elements))
    tables = [el for el in elements if el.category == "Table"]
    print(tables[1].metadata.text_as_html)

    With Advanced Parsing mode, you get 27 elements instead of a single document, and the table structure is available as HTML.

    For more information about the benefits of Advanced Parsing mode, see this tutorial’s Colab notebook.

  4. Create a Serverless (Vector) store instance:

    astra_db_store = AstraDBVectorStore(
        collection_name="langchain_unstructured",
        embedding=OpenAIEmbeddings(),
        token=os.getenv("ASTRA_DB_APPLICATION_TOKEN"),
        api_endpoint=os.getenv("ASTRA_DB_API_ENDPOINT")
    )
  5. Use the following code to create LangChain documents. This code chunks the text after Table elements and before Title elements, uses the HTML output format for table data, and then inserts the documents into the Astra DB vector store instance.

    documents = []
    current_doc = None
    
    for el in elements:
        if el.category in ["Header", "Footer"]:
            continue # skip these
        if el.category == "Title":
            if current_doc is not None:
                documents.append(current_doc)
            current_doc = None
        if not current_doc:
            current_doc = Document(page_content="", metadata=el.metadata.to_dict())
        current_doc.page_content += el.metadata.text_as_html if el.category == "Table" else el.text
        if el.category == "Table":
            if current_doc is not None:
                documents.append(current_doc)
            current_doc = None
    
    astra_db_store.add_documents(documents)
  6. Build a RAG pipeline using the populated vector store:

    prompt = """
    Answer the question based only on the supplied context. If you don't know the answer, say "I don't know".
    Context: {context}
    Question: {question}
    Your answer:
    """
    
    llm = ChatOpenAI(model="gpt-3.5-turbo-16k", streaming=False, temperature=0)
    
    chain = (
        {"context": astra_db_store.as_retriever(), "question": RunnablePassthrough()}
        | PromptTemplate.from_template(prompt)
        | llm
        | StrOutputParser()
    )

Query the vector store with LangChain

  1. Ask a question that the model can answer from the text in the parsed document:

    response_1 = chain.invoke("What does reducing the attention key size do?")
    print("\n***********New Unstructured Basic Query Engine***********")
    print(response_1)

    This query should return Reducing the attention key size hurts model quality.

  2. Ask a question that the model can answer from the table data in the parsed document:

    response_2 = chain.invoke("For the transformer to English constituency results, what was the 'WSJ 23 F1' value for 'Dyer et al. (2016) [8]'?")
    print("\n***********New Unstructured Basic Query Engine***********")
    print(response_2)

    This query should return The 'WSJ 23 F1' value for 'Dyer et al. (2016) [8]' was 91.7, which comes from the second table in the parsed PDF.

  3. Ask a question with an expected lack of context. This can be any query that has no relationship to the data in the parsed document.

    response_3 = chain.invoke("When was George Washington born?")
    print("\n***********New Unstructured Basic Query Engine***********")
    print(response_3)

    Queries that the model can’t answer return I don’t know. The context does not provide any information about UNRELATED_QUERY.

Complete code example

The following Python script includes commands to Create a RAG pipeline with advanced parsing, and then Query the vector store with LangChain. To use this script, make sure that you complete the Prerequisites, Install dependencies, and Set up your environment.

unstructured-example.py
import os
import requests

from dotenv import load_dotenv
from langchain_astradb import AstraDBVectorStore
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough

from langchain_community.document_loaders import (
    unstructured,
    UnstructuredAPIFileLoader,
)

from langchain_openai import (
    ChatOpenAI,
    OpenAIEmbeddings,
)

load_dotenv()

# download pdf
url = "https://raw.githubusercontent.com/datastax/ragstack-ai/48bc55e7dc4de6a8b79fcebcedd242dc1254dd63/examples/notebooks/resources/attention_pages_9_10.pdf"
file_path = "./attention_pages_9_10.pdf"

response = requests.get(url)
if response.status_code == 200:
    with open(file_path, "wb") as file:
        file.write(response.content)
    print("Download complete.")
else:
    print("Error downloading the file.")

# simple parse
loader = UnstructuredAPIFileLoader(
    file_path="./attention_pages_9_10.pdf",
    api_key=os.getenv("UNSTRUCTURED_API_KEY"),
    url = os.getenv("UNSTRUCTURED_API_URL"),
)
simple_docs = loader.load()

print(len(simple_docs))
print(simple_docs[0].page_content[0:400])

# complex parse
elements = unstructured.get_elements_from_api(
    file_path="./attention_pages_9_10.pdf",
    api_key=os.getenv("UNSTRUCTURED_API_KEY"),
    api_url=os.getenv("UNSTRUCTURED_API_URL"),
    strategy="hi_res", # default "auto"
    pdf_infer_table_structure=True,
)

print(len(elements))
tables = [el for el in elements if el.category == "Table"]
print(tables[1].metadata.text_as_html)

# create vector store
astra_db_store = AstraDBVectorStore(
    collection_name="langchain_unstructured",
    embedding=OpenAIEmbeddings(),
    token=os.getenv("ASTRA_DB_APPLICATION_TOKEN"),
    api_endpoint=os.getenv("ASTRA_DB_API_ENDPOINT")
)

# load documents
documents = []
current_doc = None

for el in elements:
    if el.category in ["Header", "Footer"]:
        continue # skip these
    if el.category == "Title":
        if current_doc is not None:
            documents.append(current_doc)
        current_doc = None
    if not current_doc:
        current_doc = Document(page_content="", metadata=el.metadata.to_dict())
    current_doc.page_content += el.metadata.text_as_html if el.category == "Table" else el.text
    if el.category == "Table":
        if current_doc is not None:
            documents.append(current_doc)
        current_doc = None

astra_db_store.add_documents(documents)

# prompt and query
prompt = """
Answer the question based only on the supplied context. If you don't know the answer, say "I don't know".
Context: {context}
Question: {question}
Your answer:
"""

llm = ChatOpenAI(model="gpt-3.5-turbo-16k", streaming=False, temperature=0)

chain = (
    {"context": astra_db_store.as_retriever(), "question": RunnablePassthrough()}
    | PromptTemplate.from_template(prompt)
    | llm
    | StrOutputParser()
)

response_1 = chain.invoke("What does reducing the attention key size do?")
print("\n***********New Unstructured Basic Query Engine***********")
print(response_1)

response_2 = chain.invoke("For the transformer to English constituency results, what was the 'WSJ 23 F1' value for 'Dyer et al. (2016) (5]'?")
print("\n***********New Unstructured Basic Query Engine***********")
print(response_2)

response_3 = chain.invoke("When was George Washington born?")
print("\n***********New Unstructured Basic Query Engine***********")
print(response_3)

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com