Integrate Unstructured.io with Astra DB Serverless

query_builder 15 min

This tutorial explains how to integrate Unstructured.io and Astra DB Serverless to quickly convert common document types into LLM-ready vector data for highly relevant GenAI similarity searches.

In this Python tutorial, you will load and parse a PDF document with Unstructured.io into a Astra DB Serverless vector store, and then query the index with LangChain.

To use the Unstructured CLI or Python to batch process documents, see the Astra Source Connector in the Unstructured.io documentation.

Prerequisites

This tutorial requires the following:

Install the RAGStack package

If you haven’t done so already, install the RAGStack package:

pip install ragstack-ai

This command installs the Unstructured.io Python client, the Astra DB Data API Python client, and the required Langchain dependencies.

Set up your environment

Create a .env file in your application directory with the following environment variables:

UNSTRUCTURED_API_KEY=*UNSTRUCTURED_API_KEY*
UNSTRUCTURED_API_URL=https://api.unstructuredapp.io/general/v0/general
ASTRA_DB_API_ENDPOINT=https://*ASTRA_DB_ID*-*ASTRA_DB_REGION*.apps.astra.datastax.com
ASTRA_DB_APPLICATION_TOKEN=*APPLICATION_TOKEN*
OPENAI_API_KEY=*OPENAI_API_KEY*

If you’re using this tutorial’s Google Colab notebook, you must enter these values in the Colab environment.

Create a RAG pipeline

  1. Import dependencies and load environment variables:

    import os
    import requests
    
    from dotenv import load_dotenv
    from langchain_astradb import AstraDBVectorStore
    from langchain_core.documents import Document
    from langchain_core.output_parsers import StrOutputParser
    from langchain_core.prompts import PromptTemplate
    from langchain_core.runnables import RunnablePassthrough
    
    from langchain_community.document_loaders import (
        unstructured,
        UnstructuredAPIFileLoader,
    )
    
    from langchain_openai import (
        ChatOpenAI,
        OpenAIEmbeddings,
    )
    
    load_dotenv()
  2. For this tutorial, download a PDF to parse:

    url = "https://raw.githubusercontent.com/datastax/ragstack-ai/48bc55e7dc4de6a8b79fcebcedd242dc1254dd63/examples/notebooks/resources/attention_pages_9_10.pdf"
    file_path = "./attention_pages_9_10.pdf"
    
    response = requests.get(url)
    if response.status_code == 200:
        with open(file_path, "wb") as file:
            file.write(response.content)
        print("Download complete.")
    else:
        print("Error downloading the file.")

    This file contains pages nine and ten from a PDF from arXiv.

  3. Use Unstructured to parse the PDF into elements for indexing. You can use either Simple Parsing or Advanced Parsing. This tutorial uses Advanced Parsing.

    • Simple parsing

    • Advanced parsing

    Simple Parsing mode is best for documents that don’t contain complex formatting or tables.

    loader = UnstructuredAPIFileLoader(
        file_path="./attention_pages_9_10.pdf",
        api_key=os.getenv("UNSTRUCTURED_API_KEY"),
        server_url = os.getenv("UNSTRUCTURED_API_URL"),
    )
    simple_docs = loader.load()
    
    print(len(simple_docs))
    print(simple_docs[0].page_content[0:400])

    By default, the parser returns one document per PDF file, and the output includes a sample of the document content. For this tutorial, the simple parsing output shows the first table’s description and the start of a poorly formatted table.

    If you change the processing strategy and response mode, you get a more detailed document structure. Unstructured can break the document into elements of different types, which can be helpful for improving your RAG system.

    For example, the Table element type can format tables as simple HTML, which can help the LLM answer questions from the table data, and you can exclude Footer type elements from your vector store. See the Unstructured documentation for a list of accepted element types.

    elements = unstructured.get_elements_from_api(
        file_path="./attention_pages_9_10.pdf",
        api_key=os.getenv("UNSTRUCTURED_API_KEY"),
        server_url=os.getenv("UNSTRUCTURED_API_URL"),
        strategy="hi_res", # default "auto"
        pdf_infer_table_structure=True,
    )
    
    print(len(elements))
    tables = [el for el in elements if el.category == "Table"]
    print(tables[1].metadata.text_as_html)

    With Advanced Parsing mode, you get 27 elements instead of a single document, and the table structure is available as HTML.

    For more information about the benefits of Advanced Parsing mode, see this tutorial’s Colab notebook.

  4. Create a Serverless (Vector) store instance:

    astra_db_store = AstraDBVectorStore(
        collection_name="langchain_unstructured",
        embedding=OpenAIEmbeddings(),
        token=os.getenv("ASTRA_DB_APPLICATION_TOKEN"),
        api_endpoint=os.getenv("ASTRA_DB_API_ENDPOINT")
    )
  5. To create LangChain documents, do the following:

    1. Chunk the text after Table elements and before Title elements.

    2. Use the HTML output format for table data.

    3. Insert the documents into the Astra DB vector store instance

      documents = []
      current_doc = None
      
      for el in elements:
          if el.category in ["Header", "Footer"]:
              continue # skip these
          if el.category == "Title":
              if current_doc is not None:
                  documents.append(current_doc)
              current_doc = None
          if not current_doc:
              current_doc = Document(page_content="", metadata=el.metadata.to_dict())
          current_doc.page_content += el.metadata.text_as_html if el.category == "Table" else el.text
          if el.category == "Table":
              if current_doc is not None:
                  documents.append(current_doc)
              current_doc = None
      
      astra_db_store.add_documents(documents)
  6. Build a RAG pipeline using the populated vector store:

    prompt = """
    Answer the question based only on the supplied context. If you don't know the answer, say "I don't know".
    Context: {context}
    Question: {question}
    Your answer:
    """
    
    llm = ChatOpenAI(model="gpt-3.5-turbo-16k", streaming=False, temperature=0)
    
    chain = (
        {"context": astra_db_store.as_retriever(), "question": RunnablePassthrough()}
        | PromptTemplate.from_template(prompt)
        | llm
        | StrOutputParser()
    )

Query the vector store with LangChain

  1. Ask a question that the model can answer from the text in the parsed document:

    response_1 = chain.invoke("What does reducing the attention key size do?")
    print("\n***********New Unstructured Basic Query Engine***********")
    print(response_1)

    This query should return Reducing the attention key size hurts model quality.

  2. Ask a question that the model can answer from the table data in the parsed document:

    response_2 = chain.invoke("For the transformer to English constituency results, what was the 'WSJ 23 F1' value for 'Dyer et al. (2016) [8]'?")
    print("\n***********New Unstructured Basic Query Engine***********")
    print(response_2)

    This query should return The 'WSJ 23 F1' value for 'Dyer et al. (2016) [8]' was 91.7, which comes from the second table in the parsed PDF.

  3. Ask a question with an expected lack of context. This can be any query that has no relationship to the data in the parsed document.

    response_3 = chain.invoke("When was George Washington born?")
    print("\n***********New Unstructured Basic Query Engine***********")
    print(response_3)

    Queries that the model can’t answer return I don’t know. The context does not provide any information about UNRELATED_QUERY.

Complete code example

The following Python script includes commands to Create a RAG pipeline with advanced parsing, and then Query the vector store with LangChain. To use this script, make sure that you complete the Prerequisites, Install the RAGStack package, and Set up your environment.

unstructured-example.py
import os
import requests

from dotenv import load_dotenv
from langchain_astradb import AstraDBVectorStore
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import PromptTemplate
from langchain_core.runnables import RunnablePassthrough

from langchain_community.document_loaders import (
    unstructured,
    UnstructuredAPIFileLoader,
)

from langchain_openai import (
    ChatOpenAI,
    OpenAIEmbeddings,
)

load_dotenv()

# download pdf
url = "https://raw.githubusercontent.com/datastax/ragstack-ai/48bc55e7dc4de6a8b79fcebcedd242dc1254dd63/examples/notebooks/resources/attention_pages_9_10.pdf"
file_path = "./attention_pages_9_10.pdf"

response = requests.get(url)
if response.status_code == 200:
    with open(file_path, "wb") as file:
        file.write(response.content)
    print("Download complete.")
else:
    print("Error downloading the file.")

# simple parse
loader = UnstructuredAPIFileLoader(
    file_path="./attention_pages_9_10.pdf",
    api_key=os.getenv("UNSTRUCTURED_API_KEY"),
    url = os.getenv("UNSTRUCTURED_API_URL"),
)
simple_docs = loader.load()

print(len(simple_docs))
print(simple_docs[0].page_content[0:400])

# complex parse
elements = unstructured.get_elements_from_api(
    file_path="./attention_pages_9_10.pdf",
    api_key=os.getenv("UNSTRUCTURED_API_KEY"),
    api_url=os.getenv("UNSTRUCTURED_API_URL"),
    strategy="hi_res", # default "auto"
    pdf_infer_table_structure=True,
)

print(len(elements))
tables = [el for el in elements if el.category == "Table"]
print(tables[1].metadata.text_as_html)

# create vector store
astra_db_store = AstraDBVectorStore(
    collection_name="langchain_unstructured",
    embedding=OpenAIEmbeddings(),
    token=os.getenv("ASTRA_DB_APPLICATION_TOKEN"),
    api_endpoint=os.getenv("ASTRA_DB_API_ENDPOINT")
)

# load documents
documents = []
current_doc = None

for el in elements:
    if el.category in ["Header", "Footer"]:
        continue # skip these
    if el.category == "Title":
        if current_doc is not None:
            documents.append(current_doc)
        current_doc = None
    if not current_doc:
        current_doc = Document(page_content="", metadata=el.metadata.to_dict())
    current_doc.page_content += el.metadata.text_as_html if el.category == "Table" else el.text
    if el.category == "Table":
        if current_doc is not None:
            documents.append(current_doc)
        current_doc = None

astra_db_store.add_documents(documents)

# prompt and query
prompt = """
Answer the question based only on the supplied context. If you don't know the answer, say "I don't know".
Context: {context}
Question: {question}
Your answer:
"""

llm = ChatOpenAI(model="gpt-3.5-turbo-16k", streaming=False, temperature=0)

chain = (
    {"context": astra_db_store.as_retriever(), "question": RunnablePassthrough()}
    | PromptTemplate.from_template(prompt)
    | llm
    | StrOutputParser()
)

response_1 = chain.invoke("What does reducing the attention key size do?")
print("\n***********New Unstructured Basic Query Engine***********")
print(response_1)

response_2 = chain.invoke("For the transformer to English constituency results, what was the 'WSJ 23 F1' value for 'Dyer et al. (2016) (5]'?")
print("\n***********New Unstructured Basic Query Engine***********")
print(response_2)

response_3 = chain.invoke("When was George Washington born?")
print("\n***********New Unstructured Basic Query Engine***********")
print(response_3)

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com