Integrate Haystack with Astra DB Serverless

query_builder 15 min

Haystack can use Astra DB Serverless to store and retrieve vectors for ML applications.

Prerequisites

The code samples on this page assume the following:

Create a document store

  1. Create a .env file in the root of your program.

  2. Populate the file with the Astra token and endpoint values from the Database Details section of your database’s Overview tab, and your OpenAI API key. If you’re running this code in Google Colab, you’ll be prompted for the .env values instead.

    .env
    ASTRA_DB_APPLICATION_TOKEN=AstraCS:...
    ASTRA_DB_ID=bxe07f45-8ab4-4d81-aa7d-7f58dbed4ret
    ASTRA_DB_REGION=us-east-1
    ASTRA_DB_COLLECTION_NAME=test
    ASTRA_DB_KEYSPACE_NAME=default_keyspace
    OPENAI_API_KEY=sk-...

    The endpoint format is https://ASTRA_DB_ID-ASTRA_DB_REGION.apps.astra.datastax.com/api/json/v1/ASTRA_DB_KEYSPACE_NAME/ASTRA_DB_COLLECTION_NAME.

  3. Import your dependencies.

    haystack-rag.py
    import os
    from haystack import Document, Pipeline
    from haystack.components.builders.answer_builder import AnswerBuilder
    from haystack.components.builders.prompt_builder import PromptBuilder
    from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
    from haystack.components.generators import OpenAIGenerator
    from haystack.components.writers import DocumentWriter
    from haystack.document_stores.types import DuplicatePolicy
    from astra_haystack.document_store import AstraDocumentStore
    from astra_haystack.retriever import AstraRetriever
    from dotenv import load_dotenv
    
    load_dotenv()
  4. Create a DocumentStore object with values from your .env file.

    haystack-rag.py
    document_store = AstraDocumentStore(
        astra_id=os.getenv("ASTRA_DB_ID"),
        astra_region=os.getenv("ASTRA_DB_REGION"),
        astra_collection=os.getenv("ASTRA_DB_COLLECTION_NAME"),
        astra_keyspace=os.getenv("ASTRA_DB_KEYSPACE_NAME"),
        astra_application_token=os.getenv("ASTRA_DB_APPLICATION_TOKEN"),
        duplicates_policy=DuplicatePolicy.SKIP,
        embedding_dim=384, # match the dimension of your sentence transformer
    )
    
    # ...

    duplicates_policy is set to SKIP to avoid inserting duplicate documents.

    The embedding_dim parameter is the dimension of the vector embedding. This example uses the all-MiniLM-L6-v2 sentence transformer, which has an embedding dimension of 384. This number should match the embedding_dim you declare in the AstraDocumentStore object.

    For more on sentence transformers, see pretrained models.

Create an indexing pipeline

One of Haystack’s core features is the ability to construct reusable pipelines of components.

In this example, you’ll create a pipeline that indexes documents in the AstraDocumentStore and prints the number of documents embedded in the store.

  1. Create a list containing three Document objects. You’ll index these documents into the AstraDocumentStore with the pipeline.

    haystack-rag.py
    # ...
    
    documents = [
        Document(content="There are over 7,000 languages spoken around the world today."),
        Document(
            content=("Elephants have been observed to behave in a way that indicates"
                     " a high level of self-awareness, such as recognizing themselves in mirrors.")
        ),
        Document(
            content=("In certain parts of the world, like the Maldives, Puerto Rico, "
                     "and San Diego, you can witness the phenomenon of bioluminescent waves.")
        ),
    ]
    
    # ...
  2. Create a Haystack indexing pipeline.

    1. The first component in the pipeline is a SentenceTransformersDocumentEmbedder, which transforms the documents into vectors.

    2. The second component is a DocumentWriter, which writes the list of documents to the AstraDocumentStore.

    3. The index_pipeline.connect() method connects the output of the first component to the input of the second component.

      haystack-rag.py
      # ...
      
      embedding_model_name = "sentence-transformers/all-MiniLM-L6-v2"
      
      index_pipeline = Pipeline()
      index_pipeline.add_component(instance=SentenceTransformersDocumentEmbedder(model=embedding_model_name), name="embedder")
      index_pipeline.add_component(instance=DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP), name="writer")
      index_pipeline.connect("embedder.documents", "writer.documents")
      
      # ...
  3. When you run the complete code, this step runs the pipeline, embeds your documents, and prints the document count to confirm your AstraDocumentStore is populated.

    haystack-rag.py
    # ...
    
    index_pipeline.run({"embedder": {"documents": documents}})
    print(document_store.count_documents())
    
    # ...

Create a RAG pipeline

Use the populated AstraDocumentStore in a Haystack RAG pipeline with the AstraRetriever as the document store retriever.

  1. Define a template for the OpenAI prompt.

    haystack-rag.py
    # ...
    
    prompt_template = """
                    Given these documents, answer the question.
                    Documents:
                    {% for doc in documents %}
                        {{ doc.content }}
                    {% endfor %}
                    Question: {{question}}
                    Answer:
                    """
    
    # ...
  2. Create the Haystack RAG pipeline.

    This might look like a lot of code, but it works just like the indexing pipeline you created before. Define each component, and wire them into a series.

    haystack-rag.py
    # ...
    
    rag_pipeline = Pipeline()
    
    # SentenceTransformersTextEmbedder transforms the question into a vector
    rag_pipeline.add_component(instance=SentenceTransformersTextEmbedder(model=embedding_model), name="embedder")
    
    # AstraRetriever retrieves the most similar documents from the AstraDocumentStore
    rag_pipeline.add_component(instance=AstraRetriever(document_store=document_store), name="retriever")
    
    # PromptBuilder creates a prompt for the OpenAI API from the defined prompt_template
    rag_pipeline.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder")
    
    # OpenAIGenerator generates an answer from the prompt
    rag_pipeline.add_component(instance=OpenAIGenerator(api_key=os.getenv("OPENAI_API_KEY")), name="llm")
    
    # AnswerBuilder extracts the answer from the OpenAI response and metadata (`meta`)
    rag_pipeline.add_component(instance=AnswerBuilder(), name="answer_builder")
    
    # The rag_pipeline.connect() method connects each component to the next
    rag_pipeline.connect("embedder", "retriever")
    rag_pipeline.connect("retriever", "prompt_builder.documents")
    rag_pipeline.connect("prompt_builder", "llm")
    rag_pipeline.connect("llm.replies", "answer_builder.replies")
    rag_pipeline.connect("llm.meta", "answer_builder.meta")
    rag_pipeline.connect("retriever", "answer_builder.documents")
  3. When the pipeline runs, your question passes through the pipeline and returns an answer, thanks to the AstraRetriever.

    question = "How many languages are there in the world today?"
    result = rag_pipeline.run(
        {
            "embedder": {"text": question},
            "retriever": {"top_k": 2},
            "prompt_builder": {"question": question},
            "answer_builder": {"query": question},
        }
    )
    
    print(result)

    If you get Disabling parallelism to avoid deadlocks…​ errors when running, set TOKENIZERS_PARALLELISM=false in your environment variables.

Complete code example

haystack-rag.py
haystack-rag.py
import os
from haystack import Document, Pipeline
from haystack.components.builders.answer_builder import AnswerBuilder
from haystack.components.builders.prompt_builder import PromptBuilder
from haystack.components.embedders import SentenceTransformersTextEmbedder, SentenceTransformersDocumentEmbedder
from haystack.components.generators import OpenAIGenerator
from haystack.components.writers import DocumentWriter
from haystack.document_stores.types import DuplicatePolicy
from astra_haystack.document_store import AstraDocumentStore
from astra_haystack.retriever import AstraRetriever
from dotenv import load_dotenv

load_dotenv()

# Create document store
document_store = AstraDocumentStore(
    astra_id=os.getenv("ASTRA_DB_ID"),
    astra_region=os.getenv("ASTRA_DB_REGION"),
    astra_collection=os.getenv("ASTRA_DB_COLLECTION_NAME"),
    astra_keyspace=os.getenv("ASTRA_DB_KEYSPACE_NAME"),
    astra_application_token=os.getenv("ASTRA_DB_APPLICATION_TOKEN"),
    duplicates_policy=DuplicatePolicy.SKIP,
    embedding_dim=384,
)

# Add documents
documents = [
    Document(content="There are over 7,000 languages spoken around the world today."),
    Document(
        content="Elephants have been observed to behave in a way that indicates"
        " a high level of self-awareness, such as recognizing themselves in mirrors."
    ),
    Document(
        content="In certain parts of the world, like the Maldives, Puerto Rico, "
        "and San Diego, you can witness the phenomenon of bioluminescent waves."
    ),
]
print(documents[0])

embedding_model_name = "sentence-transformers/all-MiniLM-L6-v2"

# Indexing pipeline
index_pipeline = Pipeline()
index_pipeline.add_component(
    instance=SentenceTransformersDocumentEmbedder(model=embedding_model_name),
    name="embedder",
)
index_pipeline.add_component(instance=DocumentWriter(document_store=document_store, policy=DuplicatePolicy.SKIP), name="writer")
index_pipeline.connect("embedder.documents", "writer.documents")

# Run the indexing pipeline and print document count
index_pipeline.run({"embedder": {"documents": documents}})
print(document_store.count_documents())

# Build RAG pipeline
prompt_template = """
                Given these documents, answer the question.
                Documents:
                {% for doc in documents %}
                    {{ doc.content }}
                {% endfor %}
                Question: {{question}}
                Answer:
                """

rag_pipeline = Pipeline()
rag_pipeline.add_component(
    instance=SentenceTransformersTextEmbedder(model=embedding_model_name),
    name="embedder",
)
rag_pipeline.add_component(instance=AstraRetriever(document_store=document_store), name="retriever")
rag_pipeline.add_component(instance=PromptBuilder(template=prompt_template), name="prompt_builder")
rag_pipeline.add_component(instance=OpenAIGenerator(api_key=os.getenv("OPENAI_API_KEY")), name="llm")
rag_pipeline.add_component(instance=AnswerBuilder(), name="answer_builder")
rag_pipeline.connect("embedder", "retriever")
rag_pipeline.connect("retriever", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "llm")
rag_pipeline.connect("llm.replies", "answer_builder.replies")
rag_pipeline.connect("llm.meta", "answer_builder.meta")
rag_pipeline.connect("retriever", "answer_builder.documents")

# Run the pipeline
question = "How many languages are there in the world today?"
result = rag_pipeline.run(
    {
        "embedder": {"text": question},
        "retriever": {"top_k": 2},
        "prompt_builder": {"question": question},
        "answer_builder": {"query": question},
    }
)

print(result)

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com