RAG with Unstructured.io and Astra DB Serverless

Build a RAG pipeline with RAGStack, Astra DB Serverless, and Unstructured.io.

This example demonstrates loading and parsing a PDF document with Unstructured.io into an Astra DB Serverless vector store, then querying the index with LangChain.

Prerequisites

Unstructured

To use Unstructured.io, you need an API key. Sign-up for one here: https://unstructured.io/api-key-hosted.

A key will be emailed to you.

Astra DB Serverless

You will need an vector-enabled Astra DB Serverless database.

pip install ragstack-ai

See the Prerequisites page for more details.

Set up your environment

Create a .env file in your application with the following environment variables:

UNSTRUCTURED_API_KEY=...
UNSTRUCTURED_API_URL=https://api.unstructured.io/general/v0/general
ASTRA_DB_API_ENDPOINT=https://<ASTRA_DB_ID>-<ASTRA_DB_REGION>.apps.astra.datastax.com
ASTRA_DB_APPLICATION_TOKEN=AstraCS:...
OPENAI_API_KEY=sk-...

If you’re using Google Colab, you’ll be prompted for these values in the Colab environment.

See the Prerequisites page for more details.

Create RAG pipeline

  1. Import dependencies and load environment variables.

    import os
    import requests
    
    from dotenv import load_dotenv
    from langchain_astradb import AstraDBVectorStore
    from langchain_core.documents import Document
    from langchain_core.output_parsers import StrOutputParser
    from langchain_core.prompts import PromptTemplate
    from langchain_core.runnables import RunnablePassthrough
    
    from langchain_community.document_loaders import (
        unstructured,
        UnstructuredAPIFileLoader,
    )
    
    from langchain_openai import (
        ChatOpenAI,
        OpenAIEmbeddings,
    )
    
    load_dotenv()
  2. For this example we will focus on pages 9 & 10 of a PDF about attention mechanisms in transformer model architectures. The original source of the paper is available here: https://arxiv.org/pdf/1706.03762.pdf

    url = "https://raw.githubusercontent.com/datastax/ragstack-ai/48bc55e7dc4de6a8b79fcebcedd242dc1254dd63/examples/notebooks/resources/attention_pages_9_10.pdf"
    file_path = "./attention_pages_9_10.pdf"
    
    response = requests.get(url, timeout=30)
    if response.status_code == 200:
        with open(file_path, "wb") as file:
            file.write(response.content)
        print("Download complete.")
    else:
        print("Error downloading the file.")
  3. Parse the downloaded PDF with Unstructured into elements for indexing. Choose either Simple Parsing or Advanced Parsing:

    Simple Parsing:

    This works well if your document doesn’t contain any complex formatting or tables.

    loader = UnstructuredAPIFileLoader(
        file_path="./attention_pages_9_10.pdf",
        api_key=os.getenv("UNSTRUCTURED_API_KEY"),
        url = os.getenv("UNSTRUCTURED_API_URL"),
    )
    simple_docs = loader.load()
    
    print(len(simple_docs))
    print(simple_docs[0].page_content[0:400])

    By default, the parser returns 1 document per pdf file. The sample output of the document contents shows the first table’s description, and the start of a poorly formatted table.

    Advanced Parsing:

    By changing the processing strategy and response mode, we can get more detailed document structure. Unstructured can break the document into elements of different types, which can be helpful for improving your RAG system.

    For example, the Table element type includes the table formatted as simple html, which can help the LLM answer questions from the table data, and we could exclude elements of type Footer from our vector store.

    A list of all the different element types can be found here: https://unstructured-io.github.io/unstructured/introduction/overview.html#id1

    elements = unstructured.get_elements_from_api(
        file_path="./attention_pages_9_10.pdf",
        api_key=os.getenv("UNSTRUCTURED_API_KEY"),
        api_url=os.getenv("UNSTRUCTURED_API_URL"),
        strategy="hi_res", # default "auto"
        pdf_infer_table_structure=True,
    )
    
    print(len(elements))
    tables = [el for el in elements if el.category == "Table"]
    print(tables[1].metadata.text_as_html)

    In the Advanced Parsing mode, we now get 27 elements instead of a single document, and table structure is available as html.

    See the Colab notebook linked at the top of this page for a more detailed investigation into the benefits of using the Advanced Parsing mode.

  4. Create an Astra DB Serverless vector store instance.

    astra_db_store = AstraDBVectorStore(
        collection_name="langchain_unstructured",
        embedding=OpenAIEmbeddings(),
        token=os.getenv("ASTRA_DB_APPLICATION_TOKEN"),
        api_endpoint=os.getenv("ASTRA_DB_API_ENDPOINT")
    )
  5. Create LangChain documents by chunking the text after Table elements and before Title elements. Use the html output format for table data. Insert the documents into Astra DB Serverless.

    documents = []
    current_doc = None
    
    for el in elements:
        if el.category in ["Header", "Footer"]:
            continue # skip these
        if el.category == "Title":
            if current_doc is not None:
                documents.append(current_doc)
            current_doc = None
        if not current_doc:
            current_doc = Document(page_content="", metadata=el.metadata.to_dict())
        current_doc.page_content += el.metadata.text_as_html if el.category == "Table" else el.text
        if el.category == "Table":
            if current_doc is not None:
                documents.append(current_doc)
            current_doc = None
    
    astra_db_store.add_documents(documents)
  6. Build a RAG pipeline using the populated Astra DB Serverless vector store.

    prompt = """
    Answer the question based only on the supplied context. If you don't know the answer, say "I don't know".
    Context: {context}
    Question: {question}
    Your answer:
    """
    
    llm = ChatOpenAI(model="gpt-3.5-turbo-16k", streaming=False, temperature=0)
    
    chain = (
        {"context": astra_db_store.as_retriever(), "question": RunnablePassthrough()}
        | PromptTemplate.from_template(prompt)
        | llm
        | StrOutputParser()
    )

Execute queries

  1. Ask a question that should be answered by the text of the document - this query should return Reducing the attention key size hurts model quality..

    response_1 = chain.invoke("What does reducing the attention key size do?")
    print("\n***********New Unstructured Basic Query Engine***********")
    print(response_1)
  2. Ask a question that can be answered from the table data. This query should return The 'WSJ 23 F1' value for 'Dyer et al. (2016) (5]' was 91.7. because the table data contains this information. This highlights the power of using Unstructured.io.

    response_2 = chain.invoke("For the transformer to English constituency results, what was the 'WSJ 23 F1' value for 'Dyer et al. (2016) (5]'?")
    print("\n***********New Unstructured Basic Query Engine***********")
    print(response_2)
  3. Ask a question with an expected lack of context. This query should return I don’t know. The context does not provide any information about George Washington’s birthdate. because your document does not contain information about George Washington.

    response_3 = chain.invoke("When was George Washington born?")
    print("\n***********New Unstructured Basic Query Engine***********")
    print(response_3)

Complete code (Advanced Parsing)

Python
import os
import requests

from dotenv import load_dotenv
from langchain_astradb import AstraDBVectorStore
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough

from langchain_community.document_loaders import (
    unstructured,
    UnstructuredAPIFileLoader,
)

from langchain_openai import (
    ChatOpenAI,
    OpenAIEmbeddings,
)

load_dotenv()

# download pdf
url = "https://raw.githubusercontent.com/datastax/ragstack-ai/48bc55e7dc4de6a8b79fcebcedd242dc1254dd63/examples/notebooks/resources/attention_pages_9_10.pdf"
file_path = "./attention_pages_9_10.pdf"

response = requests.get(url, timeout=30)
if response.status_code == 200:
    with open(file_path, "wb") as file:
        file.write(response.content)
    print("Download complete.")
else:
    print("Error downloading the file.")

# simple parse
loader = UnstructuredAPIFileLoader(
    file_path="./attention_pages_9_10.pdf",
    api_key=os.getenv("UNSTRUCTURED_API_KEY"),
    url = os.getenv("UNSTRUCTURED_API_URL"),
)
simple_docs = loader.load()

print(len(simple_docs))
print(simple_docs[0].page_content[0:400])

# complex parse
elements = unstructured.get_elements_from_api(
    file_path="./attention_pages_9_10.pdf",
    api_key=os.getenv("UNSTRUCTURED_API_KEY"),
    api_url=os.getenv("UNSTRUCTURED_API_URL"),
    strategy="hi_res", # default "auto"
    pdf_infer_table_structure=True,
)

print(len(elements))
tables = [el for el in elements if el.category == "Table"]
print(tables[1].metadata.text_as_html)

# create vector store
astra_db_store = AstraDBVectorStore(
    collection_name="langchain_unstructured",
    embedding=OpenAIEmbeddings(),
    token=os.getenv("ASTRA_DB_APPLICATION_TOKEN"),
    api_endpoint=os.getenv("ASTRA_DB_API_ENDPOINT")
)

# load documents
documents = []
current_doc = None

for el in elements:
    if el.category in ["Header", "Footer"]:
        continue # skip these
    if el.category == "Title":
        if current_doc is not None:
            documents.append(current_doc)
        current_doc = None
    if not current_doc:
        current_doc = Document(page_content="", metadata=el.metadata.to_dict())
    current_doc.page_content += el.metadata.text_as_html if el.category == "Table" else el.text
    if el.category == "Table":
        if current_doc is not None:
            documents.append(current_doc)
        current_doc = None

astra_db_store.add_documents(documents)

# prompt and query
prompt = """
Answer the question based only on the supplied context. If you don't know the answer, say "I don't know".
Context: {context}
Question: {question}
Your answer:
"""

llm = ChatOpenAI(model="gpt-3.5-turbo-16k", streaming=False, temperature=0)

chain = (
    {"context": astra_db_store.as_retriever(), "question": RunnablePassthrough()}
    | PromptTemplate.from_template(prompt)
    | llm
    | StrOutputParser()
)

response_1 = chain.invoke("What does reducing the attention key size do?")
print("\n***********New Unstructured Basic Query Engine***********")
print(response_1)

response_2 = chain.invoke("For the transformer to English constituency results, what was the 'WSJ 23 F1' value for 'Dyer et al. (2016) (5]'?")
print("\n***********New Unstructured Basic Query Engine***********")
print(response_2)

response_3 = chain.invoke("When was George Washington born?")
print("\n***********New Unstructured Basic Query Engine***********")
print(response_3)

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com