Produce and consume messages with Starlight for RabbitMQ

Most of these examples assume you deployed Starlight for RabbitMQ as a protocol handler. You might need to make some modifications if you deployed Starlight for RabbitMQ as a standalone application or a proxy extension.

After you deploy Starlight for RabbitMQ, use your Pulsar tenant’s connection details with a RabbitMQ client to produce and consume messages with Starlight for RabbitMQ.

Starlight for RabbitMQ supports many different use cases. With a Pulsar cluster between publishers and consumers, you can change the type of publisher and consumer to fit your needs.

The examples on this page aren’t exhaustive. For other connection methods, see the documentation for your preferred RabbitMQ client library.

Connect RabbitMQ to Astra Streaming with a Python client

The following example uses a Python script to create a connection between RabbitMQ and your Astra Streaming tenant. It also establishes a message queue named queuename, prints ten messages, and then closes the connection.

  1. Enable Starlight for RabbitMQ and get the rabbitmq.conf connection details, as explained in Get started with the Starlight for RabbitMQ extension.

  2. Create a connect-test.py file containing the following code, and then replace the placeholders with the values from your rabbitmq.conf file:

    connect-test.py
    import ssl
    import pika
    
    virtual_host = "VIRTUAL_HOST"
    token = "PASSWORD"
    
    context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
    context.verify_mode = ssl.CERT_NONE
    context.check_hostname = False
    context.load_default_certs()
    ssl_options = pika.SSLOptions(context)
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        virtual_host=virtual_host,
        host="HOST",
        ssl_options=ssl_options,
        port=PORT,
        credentials=pika.PlainCredentials("", token)))
    print("connection success")
    
    channel = connection.channel()
    print("started a channel")
    
    channel.queue_declare(queue='queuename')
    
    for x in range(10):
        channel.basic_publish(exchange='',
                          routing_key='routingkey',
                          body='message body goes here')
        print(" sent one")
    
    connection.close()
  3. Optional: Change the values for queue, routing_key, and body.

    queuename and routingkey become the names of Pulsar topics in your Astra Streaming tenant. The body is the content of each message that is sent.

  4. Save and run the connect-test.py script:

    python3 connect-test.py
  5. Make sure the output shows that the connection was successful and ten messages were sent:

    connection success
    started a channel
     sent one
     sent one
     sent one
     sent one
     sent one
     sent one
     sent one
     sent one
     sent one
     sent one
  6. In Astra Streaming, go to your tenant’s Namespaces and Topics tab to inspect the activity in the rabbitmq namespace.

    If you everything was configured correctly, then the rabbitmq namespace should have topics named amq.default.__queuename and amq.default_routingkey that were created by the Python script. Additionally, the namespace’s metrics should reflect that at least 10 messages were published and consumed by your Astra Streaming Pulsar topics.

Connect RabbitMQ to Astra Streaming with a Java client

The following example uses a Java program to create a connection between RabbitMQ and your Astra Streaming tenant, and then it establishes a message queue and sends a message:

  1. Enable Starlight for RabbitMQ and get the rabbitmq.conf connection details, as explained in Get started with the Starlight for RabbitMQ extension.

  2. Create a new Maven project:

    mvn archetype:generate \
        -DgroupId=org.example \
        -DartifactId=StarlightForRabbitMqClient \
        -DarchetypeArtifactId=maven-archetype-quickstart \
        -DinteractiveMode=false
  3. Change to the new project directory:

    cd StarlightForRabbitMqClient
  4. Add the RabbitMQ client dependency to the pom.xml file:

    pom.xml
        <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>5.16.0</version>
        </dependency>
  5. Open the App.java file at src/main/java/org/example/App.java, and then delete any preeixsting code in this file.

    In the next steps, you will add code to this file to create a complete program that produces and consumes messages.

  6. Paste the following code in the file, and then replace the placeholders with the values from your rabbitmq.conf file.

    Your editor will report errors because this isn’t a complete program yet.

    /src/main/java/org/example/App.java
    package org.example;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.net.URISyntaxException;
    import java.nio.charset.StandardCharsets;
    import java.security.KeyManagementException;
    import java.security.NoSuchAlgorithmException;
    import java.util.concurrent.TimeoutException;
    
    public class App {
      private static final String username = "";
      private static final String password = "PASSWORD";
      private static final String host = "HOST";
      private static final int port = 5671;
      private static final String virtual_host = "VIRTUAL_HOST";
      private static final String queueName = "queuename";
      private static final String amqp_URI = String.format("amqps://%s:%s@%s:%d/%s", username, password, host, port, virtual_host.replace("/","%2f"));
    
      public static void main(String[] args) throws IOException, TimeoutException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException, InterruptedException {
  7. Optional: Replace queuename with another name for the queue that publishes and consumes messages.

    This name is also used as the corresponding topic name in Astra Streaming. If the topic doesn’t exist, it is created automatically when the producer sends the first message.

  8. Add the following code to create a connection, channel, and queue that is used by both the producer and consumer:

    /src/main/java/org/example/App.java
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri(amqp_URI);
    
        /*
        You could also set each value individually
          factory.setHost(host);
          factory.setPort(port);
          factory.setUsername(username);
          factory.setPassword(password);
          factory.setVirtualHost(virtual_host);
          factory.useSslProtocol();
         */
    
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        channel.queueDeclare(queueName, false, false, false, null);
  9. Add the producer code, which is a simple flow that sends a single message and awaits acknowledgment:

    /src/main/java/org/example/App.java
        String publishMessage = "Hello World!";
        channel.basicPublish("", queueName, null, publishMessage.getBytes());
        System.out.println(" Sent '" + publishMessage + "'");
  10. Add the consumer code, which creates a basic consumer with callback on message receipt:

    /src/main/java/org/example/App.java
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
          String consumeMessage = new String(delivery.getBody(), StandardCharsets.UTF_8);
          System.out.println(" Received '" + consumeMessage + "'");
        };
    
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    
        Thread.sleep(4000); // wait a bit for messages to be received
    
        channel.close();
        connection.close();
      }
    }

    Because the consumer isn’t a blocking thread, the sleep allows time for messages to be received and processed.

  11. Save App.java, and then build and run the JAR file for the complete program:

    mvn clean package assembly:single
    java -jar target/StarlightForRabbitMqClient-1.0-SNAPSHOT-jar-with-dependencies.jar
  12. Make sure the result shows that a message was sent and received:

    Sent 'Hello World!'
    Received 'Hello World!'
  13. In Astra Streaming, go to your tenant’s Namespaces and Topics tab to inspect the activity in the rabbitmq namespace.

    If you everything was configured correctly, then the rabbitmq namespace should have a topic named amq.default.__queuename that was created by the Java program. Additionally, the namespace’s metrics should reflect that at least one message was published and consumed by your Astra Streaming Pulsar topic.

Connect RabbitMQ to IBM Elite Support for Apache Pulsar with a Python client

IBM Elite Support for Apache Pulsar was formerly DataStax Luna Streaming.

To use a RabbitMQ client with Starlight for RabbitMQ, you use your IBM Elite Support for Apache Pulsar tenant as the AMQP listener. The following examples use a connection on localhost:5672 with Starlight for RabbitMQ as a protocol handler. For other connection methods, see the documentation for your preferred RabbitMQ client library.

The following example uses the Pika RabbitMQ Python library to produce and consume messages from Pulsar:

  1. Save the following Python script to a safe place as test-queue.py. The example script assumes you have opened the localhost:5672 port, as explained in Get started with the Starlight for RabbitMQ extension.

    test-queue.py
    #!/usr/bin/env python
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(port=5672))
    channel = connection.channel()
    
    try:
        channel.queue_declare("test-queue")
        print("created test-queue queue")
    
        channel.basic_publish(exchange="", routing_key="test-queue", body="test".encode('utf-8'))
        print("published message test")
    
        _, _, res = channel.basic_get(queue="test-queue", auto_ack=True)
        assert res is not None, "should have received a message"
        print("received message: " + res.decode())
    
        channel.queue_delete("test-queue")
        print("deleted test-queue queue")
    
    finally:
        connection.close()
  2. Optional: Replace test-queue with another name for the queue and routing key.

    These names are also used as the corresponding topic names in your Pulsar tenant. If the topic doesn’t exist, it is created automatically when the producer sends the first message.

  3. Save and run the test-queue.py script:

    python ./test-queue.py
  4. Make sure the output shows that the queue was created and a message was sent:

    created test-queue queue
    published message test
    received message: test
    deleted test-queue queue
  5. Use the Apache Pulsar admin CLI or the IBM Elite Support for Apache Pulsar Admin Console to inspect your tenant’s activity. Make sure the test-queue topic was created and a message was published and consumed.

Connect RabbitMQ to IBM Elite Support for Apache Pulsar with a Java client

IBM Elite Support for Apache Pulsar was formerly DataStax Luna Streaming.

To use a RabbitMQ client with Starlight for RabbitMQ, you use your IBM Elite Support for Apache Pulsar tenant as the AMQP listener. The following examples use a connection on localhost:5672 with Starlight for RabbitMQ as a protocol handler. For other connection methods, see the documentation for your preferred RabbitMQ client library.

In addition to the following example, you can modify the Astra Streaming Java client example for IBM Elite Support for Apache Pulsar by replacing the connection details with those for your cluster.

The following example uses a Java program to create a connection between RabbitMQ and your IBM Elite Support for Apache Pulsar tenant, and then it establishes a message queue and sends a message:

  1. Create a new Maven project:

    mvn archetype:generate \
        -DgroupId=org.example \
        -DartifactId=StarlightForRabbitMqClient \
        -DarchetypeArtifactId=maven-archetype-quickstart \
        -DinteractiveMode=false
  2. Change to the new project directory:

    cd StarlightForRabbitMqClient
  3. Open the new project in your IDE, and then add the RabbitMQ client dependency to pom.xml:

    pom.xml
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>5.16.0</version>
    </dependency>
  4. Open the App.java file at src/main/java/org/example/App.java, and then delete any preeixsting code in this file. In the next steps, you will add code to this file to create a complete program that produces and consumes messages.

  5. Paste the following code in the file.

    This code creates a connection, channel, and queue that is used by both the producer and consumer. It uses the default connection values to connect on localhost:5672 with port forwarding, as explained in Get started with the Starlight for RabbitMQ extension.

    Your editor will report errors because this isn’t a complete program yet.

    App.java
    package org.example;
    
    import com.rabbitmq.client.*;
    
    import java.io.IOException;
    import java.net.URISyntaxException;
    import java.nio.charset.StandardCharsets;
    import java.security.KeyManagementException;
    import java.security.NoSuchAlgorithmException;
    import java.util.concurrent.TimeoutException;
    
    public class App {
      private static final String queueName = "queuename";
    
      public static void main(String[] args) throws IOException, TimeoutException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException, InterruptedException {
    
        // Use the default values to connect on localhost:5672
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername(userName);
        factory.setPassword(password);
        factory.setVirtualHost(virtualHost);
        factory.setHost(hostName);
        factory.setPort(portNumber);
    
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
    
        channel.queueDeclare(queueName, false, false, false, null);
  6. Optional: Replace queuename with another name for the queue that publishes and consumes messages.

    This name is also used as the corresponding topic name in Astra Streaming. If the topic doesn’t exist,it is created automatically when the producer sends the first message.

  7. Add the producer code, which is a simple flow that sends a single message and awaits acknowledgment:

    App.java
        String publishMessage = "Hello World!";
        channel.basicPublish("", queueName, null, publishMessage.getBytes());
        System.out.println(" Sent '" + publishMessage + "'");
  8. Add the consumer code, which creates a basic consumer with callback on message receipt.

    App.java
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
          String consumeMessage = new String(delivery.getBody(), StandardCharsets.UTF_8);
          System.out.println(" Received '" + consumeMessage + "'");
        };
    
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
    
        Thread.sleep(4000); // wait a bit for messages to be received
    
        channel.close();
        connection.close();
      }
    }

    Because the consumer isn’t a blocking thread, the sleep allows time for messages to be received and processed.

  9. Save App.java, and then build and run the JAR file for the complete program:

    mvn clean package assembly:single
    java -jar target/StarlightForRabbitMqClient-1.0-SNAPSHOT-jar-with-dependencies.jar
  10. Make sure the result shows that a message was sent and received:

    Sent 'Hello World!'
    Received 'Hello World!'
  11. Use the Pulsar Admin CLI or the IBM Elite Support for Apache Pulsar Admin Console to inspect your tenant’s activity. Make sure the queuename topic was created and a message was published and consumed.

Use Starlight for RabbitMQ with a self-managed Pulsar cluster

  1. Deploy Starlight for RabbitMQ on a self-managed Pulsar cluster.

  2. Before connecting your production applications with Starlight for RabbitMQ, you can use a RabbitMQ/AMQP-0.9.1 client or a tool such as RabbitMQ PerfTest to test your Starlight for RabbitMQ deployment. For example, the following Python script creates a queue, publishes a message that is routed to the queue, reads the message from the queue, and then deletes the queue:

    #!/usr/bin/env python
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(port=5672))
    channel = connection.channel()
    
    try:
        channel.queue_declare("test-queue")
        print("created test-queue queue")
        channel.basic_publish(exchange="", routing_key="test-queue", body="test".encode('utf-8'))
        print("published message test")
        _, _, res = channel.basic_get(queue="test-queue", auto_ack=True)
        assert res is not None, "should have received a message"
        print("received message: " + res.decode())
        channel.queue_delete("test-queue")
        print("deleted test-queue queue")
    finally:
        connection.close()
  3. To use a RabbitMQ client with Starlight for RabbitMQ, use your Pulsar tenant as the AMQP listener.

    You can also connect on localhost if you have port forwarded the RabbitMQ port (5672) to your local machine. For localhost examples, see the IBM Elite Support for Apache Pulsar Python client example and IBM Elite Support for Apache Pulsar Java client example.

    You can also modify the Astra Streaming Java client example by replacing the connection details with your cluster’s connection details.

    For more information about expected connection parameters, see the documentation for your preferred RabbitMQ client library.

Was this helpful?

Give Feedback

How can we improve the documentation?

© Copyright IBM Corporation 2026 | Privacy policy | Terms of use Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: Contact IBM