The RAGStack project combines the intelligence of large language models with the agility of stream processing to create powerful Generative AI applications.

This guide will help you build and deploy a RAGStack application to Astra Streaming.

Private preview feature RAGStack is available only in a private preview. This feature is not intended for production use, has not been certified for production workloads, and might contain bugs and other functional issues. There is no guarantee that a preview feature will ever become generally available.If you are interested in participating in the private preview, contact us at Astra-PM@datastax.com. We will contact you with more information.

Install RAGStack CLI

Install the RAGStack CLI:

brew install datastax/ragstack/ragstack

For more on the RAGStack CLI, see RAGStack CLI↗.

Enable RAGStack in Astra

Create an Astra Streaming tenant in the GCP us-east-1 region.

Your tenant will be created with a default namespace, which is a logical grouping of topics.

Your tenant will be listed in the RAGStack tab. Select Enable to enable RAGStack for your tenant.

Enable RAGStack

Under the hood, this is enabling the Starlight for Kafka API for your tenant to connect to your Kafka cluster.

Connect RAGStack to your tenant

Select the Generate Configuration Command button to generate a CLI configuration file for your tenant.

Run the generated command in your local environment to connect your tenant to the RAGStack CLI.

  • RAGStack CLI

  • Result

ragstack profiles import astra-rs-tenant --inline 'base64:...' --set-current -u
profile astra-rs-tenant created
profile astra-rs-tenant set as current

The configuration values will look something like this.

tenant: rs-tenant
webServiceUrl: https://pulsar-gcp-useast1.api.streaming.datastax.com/ragstack
apiGatewayUrl: wss://lsgwy-gcp-useast1.streaming.datastax.com/ragstack-api-gateway/
token: AstraCS:...

Your tenant is now connected to the RAGStack CLI.

You can also establish a connection by including the configuration values from the Astra Streaming Connect tab in your RAGStack application’s instance.yaml file. See << instance >> for an example.

Build a RAGStack Application

Build a RAGStack application by creating YAML files to describe the application. The application folder structure looks like this:

|- project-folder
    |- application
        |- pipeline.yaml
        |- gateways.yaml
        |- configuration.yaml
|- secrets.yaml
|- instance.yaml

Here’s a shortcut:

mkdir project-folder && cd project-folder
touch secrets.yaml instance.yaml
mkdir application && cd application
touch pipeline.yaml gateways.yaml configuration.yaml

The instance.yaml and secrets.yaml files cannot be in the "application" directory, because the application directory is passed as a zip at runtime. Next, you will populate the YAML files to connect your application to your Astra Streaming tenant.

Populate YAML files

Instance.yaml declares the application’s processing infrastructure, including where streaming and compute take place. The secrets for tokens and passwords are stored in the secrets.yaml file, which you’ll populate in the next step. An instance.yaml file can be downloaded from the Connect tab of your Astra Streaming tenant. Paste it into your instance.yaml file to connect your application to your tenant.

    type: "kafka"
        bootstrap.servers: "${ secrets.kafka.bootstrapServers }"
        security.protocol: "${ secrets.kafka.securityProtocol }"
        sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required username='${ secrets.kafka.username }' password='${ secrets.kafka.password }';"
        sasl.mechanism: PLAIN
        session.timeout.ms: "45000"

    type: "kubernetes"

Secrets.yaml contains auth information for connecting to other services. Secret values can be modified directly in secrets.yaml, or you can pass your secrets as environment variables or in a .env file. The secrets.yaml resolves these environment variables.

export ASTRA_CLIENT_ID=...
export ASTRA_SECRET=...
export ASTRA_TOKEN=...

When you go to production, you should create a dedicated secrets.yaml file for each environment. The Astra client-id, token, and secret are found in the Astra UI. The values for the Kafka bootstrap server are found in your Astra Streaming tenant or in the Starlight for Kafka ssl.properties file. The Azure access key and URL are found in your Azure OpenAI deployment. A secrets.yaml file can be downloaded from the Connect tab of your Astra Streaming tenant. Paste it into your secrets.yaml file to authorize your application to your tenant. For more on finding values for secrets, see Secrets↗.

  - id: astra
      clientId: ${ASTRA_CLIENT_ID:-}
      secret: ${ASTRA_SECRET:-}
      token: ${ASTRA_TOKEN:-}
      database: ${ASTRA_DATABASE:-}
      secureBundle: ${ASTRA_SECURE_BUNDLE:-}
      environment: ${ASTRA_ENVIRONMENT:-PROD}
  - id: open-ai
      access-key: "${OPEN_AI_ACCESS_KEY:-}"
      url: "${OPEN_AI_URL:-}"
      provider: "${OPEN_AI_PROVIDER:-azure}"
      embeddings-model: "${OPEN_AI_EMBEDDINGS_MODEL:-text-embedding-ada-002}"
      chat-completions-model: "${OPEN_AI_CHAT_COMPLETIONS_MODEL:-gpt-35-turbo}"
  - id: google
      client-id: "${GOOGLE_CLIENT_ID:-}"

You can either replace the values in secrets.yaml with the actual values, use a .env file, or export the secrets as below:

export OPEN_AI_URL=https://company-openai-dev.openai.azure.com/
export OPEN_AI_ACCESS_KEY=your-openai-access-key
export OPEN_AI_EMBEDDINGS_MODEL=text-embedding-ada-002
export OPEN_AI_PROVIDER=azure
export KAFKA_USERNAME=rs-tenant
export KAFKA_PASSWORD=eyRrr...
export KAFKA_BOOTSTRAP_SERVERS=kafka-gcp-useast1.streaming.datastax.com:9093
export KAFKA_TENANT=rs-tenant
export ASTRA_CLIENT_ID=xxxx
export ASTRA_TOKEN=AstraCS:...
export GOOGLE_CLIENT_ID=xxxx.apps.googleusercontent.com

For more on creating a Google client ID, see Google Service Account↗.

Pipeline.yaml contains the chain of agents that makes up your program, and the input and output topics that they communicate with. For more on building pipelines, see Pipelines↗.

  - name: "input-topic"
    creation-mode: create-if-not-exists
  - name: "output-topic"
    creation-mode: create-if-not-exists
  - name: "history-topic"
    creation-mode: create-if-not-exists
  - name: "convert-to-json"
    type: "document-to-json"
    input: "input-topic"
      text-field: "question"
  - name: "ai-chat-completions"
    type: "ai-chat-completions"
    output: "history-topic"
      model: "${secrets.open-ai.chat-completions-model}" # This needs to be set to the model deployment name, not the base name
      # on the log-topic we add a field with the answer
      completion-field: "value.answer"
      # we are also logging the prompt we sent to the LLM
      log-field: "value.prompt"
      # here we configure the streaming behavior
      # as soon as the LLM answers with a chunk we send it to the answers-topic
      stream-to-topic: "output-topic"
      # on the streaming answer we send the answer as whole message
      # the 'value' syntax is used to refer to the whole value of the message
      stream-response-completion-field: "value"
      # we want to stream the answer as soon as we have 10 chunks
      # in order to reduce latency for the first message the agent sends the first message
      # with 1 chunk, then with 2 chunks....up to the min-chunks-per-message value
      # eventually we want to send bigger messages to reduce the overhead of each message on the topic
      min-chunks-per-message: 10
        - role: user
          content: "You are a helpful assistant. Below you can find a question from the user. Please try to help them the best way you can.\n\n{{ value.question}}"

Gateways.yaml contains API gateways for communicating with your application. For more on gateways and authentication, see API Gateways↗.

  - id: produce-input
    type: produce
    topic: input-topic
      - sessionId
        - key: ragstack-client-session-id
          value-from-parameters: sessionId

  - id: chat
    type: chat
      answers-topic: output-topic
      questions-topic: input-topic

  - id: consume-output
    type: consume
    topic: output-topic
      - sessionId
          - key: ragstack-client-session-id
            value-from-parameters: sessionId

  - id: consume-history
    type: consume
    topic: history-topic
      - sessionId
          - key: ragstack-client-session-id
            value-from-parameters: sessionId

  - id: produce-input-auth
    type: produce
    topic: input-topic
      - sessionId
      provider: google
        clientId: "${secrets.google.client-id}"
        - key: ragstack-client-user-id
          value-from-authentication: subject

  - id: consume-output-auth
    type: consume
    topic: output-topic
      - sessionId
      provider: google
        clientId: "${secrets.google.client-id}"
          - key: ragstack-client-user-id
            value-from-authentication: subject

Configuration.yaml contains additional configuration and resources for your application. A configuration.yaml file can be downloaded from the Connect tab of your Astra Streaming tenant (under AstraDB). For more on configuration, see Configuration↗.

    - type: "open-ai-configuration"
      name: "OpenAI Azure configuration"
        url: "${secrets.open-ai.url}"
        access-key: "${secrets.open-ai.access-key}"
        provider: "${secrets.open-ai.provider}"

Remember to save all your yaml files.

Deploy the RAGStack application on Astra

To deploy the application, run the following commands from the root of your application folder. The first command deploys the application from the YAML files you created above, and the second command gets the status of the application. For more on RAGStack CLI commands, see RAGStack CLI↗.

  • RAGStack CLI

  • Result

ragstack apps deploy sample-app -app ./application -i ./instance.yaml -s ./secrets.yaml
ragstack apps get sample-app
packaging app: /Users/mendon.kissling/sample-app/./application
app packaged
deploying application: sample-app (1 KB)
application sample-app deployed
ID               STREAMING        COMPUTE          STATUS           EXECUTORS        REPLICAS
sample-app       kafka            kubernetes       DEPLOYED         1/1              1/1

Ensure your app is running - a Kubernetes pod should be deployed with your application, and STATUS will change to DEPLOYED.

Your application should be listed in your RAGStack tenant:

App deployed

You should see a map of your application in the RAGStack UI:

App map

Hmm, this application has an Error. To get logs, use ragstack apps logs <my-application>.

RAGStack CLI connection values

If you’re running into issues, ensure the values in your CLI profile match the values in your Astra Streaming tenant.

If you’re unsure of the profile name, use ragstack profiles list, then ragstack profiles get <my-profile> -o=json to display the current values.

  "webServiceUrl" : "https://pulsar-gcp-useast1.api.streaming.datastax.com/langstream",
  "apiGatewayUrl" : "wss://lsgwy-gcp-useast1.streaming.datastax.com/langstream-api-gateway/",
  "tenant" : "ragstack-tenant",
  "token" : "AstraCS:<token>",
  "name" : "astra-ragstack-tenant"

To update these values, use ragstack profiles update astra-ragstack-tenant --command-option="value".

Command Option Description


Set this profile as current


webServiceUrl of the profile


apiGatewayUrl of the profile


tenant of the profile


token of the profile

If you get lost along the way, here are the default profile values:

webServiceUrl: "http://localhost:8090"
apiGatewayUrl: "ws://localhost:8091"
tenant: "default"
token: null

Issue a curl call to your RAGStack tenant to find the connection values for your tenant. The X-DataStax-Current-Org value is the client-id associated with the Astra token, and can be found in the Astra UI.

  • curl

  • Result

curl --location --request POST 'https://pulsar-gcp-useast1.api.streaming.datastax.com/langstream/ragstack-tenant' \
--header 'X-DataStax-Current-Org:lzAiCLsTMKruZZZUxieNgYhe' \
--header 'X-DataStax-Pulsar-Cluster: pulsar-gcp-useast1' \
--header 'Authorization: Bearer AstraCS:<token value>'
  "token":"{astra token}"}%

Ensure the values returned from the curl call match the values in your RAGStack CLI profile.

Check connection to Astra

In the RAGStack CLI, run the following command to open a gateway connection to your Astra Streaming tenant. This command will connect to your tenant and consume from the output-topic and produce to the input-topic.

ragstack gateway chat sample-app -cg consume-output -pg produce-input -p sessionId=$(uuidgen)

In Astra Streaming, confirm that your application is connected to your tenant. Select the Websocket tab of your RAGStack-enabled tenant, and choose to consume from output-topic and to produce to input-topic. If the Websocket tab is not visible, you may need to refresh the page or try opening it in Incognito mode. Send a message to your application, and confirm that it is received by the Astra websocket:

ragstack gateway chat sample-app -cg consume-output -pg produce-input -p sessionId=$(uuidgen)
Connected to wss://lsgwy-gcp-useast1.streaming.datastax.com/langstream-api-gateway//v1/consume/ragstack-tenant/sample-app/consume-output?param:sessionId=F85E4665-BE00-4513-A5C5-E59B42646490&option:position=latest
Connected to wss://lsgwy-gcp-useast1.streaming.datastax.com/langstream-api-gateway//v1/produce/ragstack-tenant/sample-app/produce-input?param:sessionId=F85E4665-BE00-4513-A5C5-E59B42646490

> Hi Astra, it's me, K8s. How are you?
Websocket chat

Your gateway connection is confirmed, and you can send messages to your application. This sample-app also produces messages to the consume-history gateway to provide more context to the AI model. To consume from this gateway, run the following command:

  • RAGStack CLI

  • Result

ragstack gateway consume sample-app consume-history -p sessionId=F85E4665-BE00-4513-A5C5-E59B42646490
Connected to wss://lsgwy-gcp-useast1.streaming.datastax.com/langstream-api-gateway//v1/consume/ragstack-tenant/sample-app/consume-history?param:sessionId=F85E4665-BE00-4513-A5C5-E59B42646490
{"record":{"key":null,"value":"Hi K8s, it's me, Astra.","headers":{}},"offset":"eyJvZmZzZXRzIjp7IjAiOiIxIn19"}

What’s next?

RAGStack is built with the LangStream framework, which is a set of tools for building Generative AI streaming applications.

For more, see GitHub↗.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com