Produce and consume messages with Starlight for RabbitMQ
|
Most of these examples assume you deployed Starlight for RabbitMQ as a protocol handler. You might need to make some modifications if you deployed Starlight for RabbitMQ as a standalone application or a proxy extension. |
After you deploy Starlight for RabbitMQ, use your Pulsar tenant’s connection details with a RabbitMQ client to produce and consume messages with Starlight for RabbitMQ.
Starlight for RabbitMQ supports many different use cases. With a Pulsar cluster between publishers and consumers, you can change the type of publisher and consumer to fit your needs.
|
The examples on this page aren’t exhaustive. For other connection methods, see the documentation for your preferred RabbitMQ client library. |
Connect RabbitMQ to Astra Streaming with a Python client
The following example uses a Python script to create a connection between RabbitMQ and your Astra Streaming tenant.
It also establishes a message queue named queuename, prints ten messages, and then closes the connection.
-
Enable Starlight for RabbitMQ and get the
rabbitmq.confconnection details, as explained in Get started with the Starlight for RabbitMQ extension. -
Create a
connect-test.pyfile containing the following code, and then replace the placeholders with the values from yourrabbitmq.conffile:connect-test.pyimport ssl import pika virtual_host = "VIRTUAL_HOST" token = "PASSWORD" context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) context.verify_mode = ssl.CERT_NONE context.check_hostname = False context.load_default_certs() ssl_options = pika.SSLOptions(context) connection = pika.BlockingConnection(pika.ConnectionParameters( virtual_host=virtual_host, host="HOST", ssl_options=ssl_options, port=PORT, credentials=pika.PlainCredentials("", token))) print("connection success") channel = connection.channel() print("started a channel") channel.queue_declare(queue='queuename') for x in range(10): channel.basic_publish(exchange='', routing_key='routingkey', body='message body goes here') print(" sent one") connection.close() -
Optional: Change the values for
queue,routing_key, andbody.queuenameandroutingkeybecome the names of Pulsar topics in your Astra Streaming tenant. Thebodyis the content of each message that is sent. -
Save and run the
connect-test.pyscript:python3 connect-test.py -
Make sure the output shows that the connection was successful and ten messages were sent:
connection success started a channel sent one sent one sent one sent one sent one sent one sent one sent one sent one sent one -
In Astra Streaming, go to your tenant’s Namespaces and Topics tab to inspect the activity in the
rabbitmqnamespace.If you everything was configured correctly, then the
rabbitmqnamespace should have topics namedamq.default.__queuenameandamq.default_routingkeythat were created by the Python script. Additionally, the namespace’s metrics should reflect that at least 10 messages were published and consumed by your Astra Streaming Pulsar topics.
Connect RabbitMQ to Astra Streaming with a Java client
The following example uses a Java program to create a connection between RabbitMQ and your Astra Streaming tenant, and then it establishes a message queue and sends a message:
-
Enable Starlight for RabbitMQ and get the
rabbitmq.confconnection details, as explained in Get started with the Starlight for RabbitMQ extension. -
Create a new Maven project:
mvn archetype:generate \ -DgroupId=org.example \ -DartifactId=StarlightForRabbitMqClient \ -DarchetypeArtifactId=maven-archetype-quickstart \ -DinteractiveMode=false -
Change to the new project directory:
cd StarlightForRabbitMqClient -
Add the RabbitMQ client dependency to the
pom.xmlfile:pom.xml<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.16.0</version> </dependency> -
Open the
App.javafile atsrc/main/java/org/example/App.java, and then delete any preeixsting code in this file.In the next steps, you will add code to this file to create a complete program that produces and consumes messages.
-
Paste the following code in the file, and then replace the placeholders with the values from your
rabbitmq.conffile.Your editor will report errors because this isn’t a complete program yet.
/src/main/java/org/example/App.javapackage org.example; import com.rabbitmq.client.*; import java.io.IOException; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.util.concurrent.TimeoutException; public class App { private static final String username = ""; private static final String password = "PASSWORD"; private static final String host = "HOST"; private static final int port = 5671; private static final String virtual_host = "VIRTUAL_HOST"; private static final String queueName = "queuename"; private static final String amqp_URI = String.format("amqps://%s:%s@%s:%d/%s", username, password, host, port, virtual_host.replace("/","%2f")); public static void main(String[] args) throws IOException, TimeoutException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException, InterruptedException { -
Optional: Replace
queuenamewith another name for the queue that publishes and consumes messages.This name is also used as the corresponding topic name in Astra Streaming. If the topic doesn’t exist, it is created automatically when the producer sends the first message.
-
Add the following code to create a connection, channel, and queue that is used by both the producer and consumer:
/src/main/java/org/example/App.javaConnectionFactory factory = new ConnectionFactory(); factory.setUri(amqp_URI); /* You could also set each value individually factory.setHost(host); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost(virtual_host); factory.useSslProtocol(); */ Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); -
Add the producer code, which is a simple flow that sends a single message and awaits acknowledgment:
/src/main/java/org/example/App.javaString publishMessage = "Hello World!"; channel.basicPublish("", queueName, null, publishMessage.getBytes()); System.out.println(" Sent '" + publishMessage + "'"); -
Add the consumer code, which creates a basic consumer with callback on message receipt:
/src/main/java/org/example/App.javaDeliverCallback deliverCallback = (consumerTag, delivery) -> { String consumeMessage = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" Received '" + consumeMessage + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); Thread.sleep(4000); // wait a bit for messages to be received channel.close(); connection.close(); } }Because the consumer isn’t a blocking thread, the
sleepallows time for messages to be received and processed. -
Save
App.java, and then build and run the JAR file for the complete program:mvn clean package assembly:single java -jar target/StarlightForRabbitMqClient-1.0-SNAPSHOT-jar-with-dependencies.jar -
Make sure the result shows that a message was sent and received:
Sent 'Hello World!' Received 'Hello World!' -
In Astra Streaming, go to your tenant’s Namespaces and Topics tab to inspect the activity in the
rabbitmqnamespace.If you everything was configured correctly, then the
rabbitmqnamespace should have a topic namedamq.default.__queuenamethat was created by the Java program. Additionally, the namespace’s metrics should reflect that at least one message was published and consumed by your Astra Streaming Pulsar topic.
Connect RabbitMQ to IBM Elite Support for Apache Pulsar with a Python client
|
IBM Elite Support for Apache Pulsar was formerly DataStax Luna Streaming. |
To use a RabbitMQ client with Starlight for RabbitMQ, you use your IBM Elite Support for Apache Pulsar tenant as the AMQP listener.
The following examples use a connection on localhost:5672 with Starlight for RabbitMQ as a protocol handler.
For other connection methods, see the documentation for your preferred RabbitMQ client library.
The following example uses the Pika RabbitMQ Python library to produce and consume messages from Pulsar:
-
Save the following Python script to a safe place as
test-queue.py. The example script assumes you have opened thelocalhost:5672port, as explained in Get started with the Starlight for RabbitMQ extension.test-queue.py#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(port=5672)) channel = connection.channel() try: channel.queue_declare("test-queue") print("created test-queue queue") channel.basic_publish(exchange="", routing_key="test-queue", body="test".encode('utf-8')) print("published message test") _, _, res = channel.basic_get(queue="test-queue", auto_ack=True) assert res is not None, "should have received a message" print("received message: " + res.decode()) channel.queue_delete("test-queue") print("deleted test-queue queue") finally: connection.close() -
Optional: Replace
test-queuewith another name for the queue and routing key.These names are also used as the corresponding topic names in your Pulsar tenant. If the topic doesn’t exist, it is created automatically when the producer sends the first message.
-
Save and run the
test-queue.pyscript:python ./test-queue.py -
Make sure the output shows that the queue was created and a message was sent:
created test-queue queue published message test received message: test deleted test-queue queue -
Use the Apache Pulsar admin CLI or the IBM Elite Support for Apache Pulsar Admin Console to inspect your tenant’s activity. Make sure the
test-queuetopic was created and a message was published and consumed.
Connect RabbitMQ to IBM Elite Support for Apache Pulsar with a Java client
|
IBM Elite Support for Apache Pulsar was formerly DataStax Luna Streaming. |
To use a RabbitMQ client with Starlight for RabbitMQ, you use your IBM Elite Support for Apache Pulsar tenant as the AMQP listener.
The following examples use a connection on localhost:5672 with Starlight for RabbitMQ as a protocol handler.
For other connection methods, see the documentation for your preferred RabbitMQ client library.
|
In addition to the following example, you can modify the Astra Streaming Java client example for IBM Elite Support for Apache Pulsar by replacing the connection details with those for your cluster. |
The following example uses a Java program to create a connection between RabbitMQ and your IBM Elite Support for Apache Pulsar tenant, and then it establishes a message queue and sends a message:
-
Create a new Maven project:
mvn archetype:generate \ -DgroupId=org.example \ -DartifactId=StarlightForRabbitMqClient \ -DarchetypeArtifactId=maven-archetype-quickstart \ -DinteractiveMode=false -
Change to the new project directory:
cd StarlightForRabbitMqClient -
Open the new project in your IDE, and then add the RabbitMQ client dependency to
pom.xml:pom.xml<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.16.0</version> </dependency> -
Open the
App.javafile atsrc/main/java/org/example/App.java, and then delete any preeixsting code in this file. In the next steps, you will add code to this file to create a complete program that produces and consumes messages. -
Paste the following code in the file.
This code creates a connection, channel, and queue that is used by both the producer and consumer. It uses the default connection values to connect on
localhost:5672with port forwarding, as explained in Get started with the Starlight for RabbitMQ extension.Your editor will report errors because this isn’t a complete program yet.
App.javapackage org.example; import com.rabbitmq.client.*; import java.io.IOException; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.util.concurrent.TimeoutException; public class App { private static final String queueName = "queuename"; public static void main(String[] args) throws IOException, TimeoutException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException, InterruptedException { // Use the default values to connect on localhost:5672 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); -
Optional: Replace
queuenamewith another name for the queue that publishes and consumes messages.This name is also used as the corresponding topic name in Astra Streaming. If the topic doesn’t exist,it is created automatically when the producer sends the first message.
-
Add the producer code, which is a simple flow that sends a single message and awaits acknowledgment:
App.javaString publishMessage = "Hello World!"; channel.basicPublish("", queueName, null, publishMessage.getBytes()); System.out.println(" Sent '" + publishMessage + "'"); -
Add the consumer code, which creates a basic consumer with callback on message receipt.
App.javaDeliverCallback deliverCallback = (consumerTag, delivery) -> { String consumeMessage = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" Received '" + consumeMessage + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); Thread.sleep(4000); // wait a bit for messages to be received channel.close(); connection.close(); } }Because the consumer isn’t a blocking thread, the
sleepallows time for messages to be received and processed. -
Save
App.java, and then build and run the JAR file for the complete program:mvn clean package assembly:single java -jar target/StarlightForRabbitMqClient-1.0-SNAPSHOT-jar-with-dependencies.jar -
Make sure the result shows that a message was sent and received:
Sent 'Hello World!' Received 'Hello World!' -
Use the Pulsar Admin CLI or the IBM Elite Support for Apache Pulsar Admin Console to inspect your tenant’s activity. Make sure the
queuenametopic was created and a message was published and consumed.
Use Starlight for RabbitMQ with a self-managed Pulsar cluster
-
Deploy Starlight for RabbitMQ on a self-managed Pulsar cluster.
-
Before connecting your production applications with Starlight for RabbitMQ, you can use a
RabbitMQ/AMQP-0.9.1client or a tool such as RabbitMQ PerfTest to test your Starlight for RabbitMQ deployment. For example, the following Python script creates a queue, publishes a message that is routed to the queue, reads the message from the queue, and then deletes the queue:#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(port=5672)) channel = connection.channel() try: channel.queue_declare("test-queue") print("created test-queue queue") channel.basic_publish(exchange="", routing_key="test-queue", body="test".encode('utf-8')) print("published message test") _, _, res = channel.basic_get(queue="test-queue", auto_ack=True) assert res is not None, "should have received a message" print("received message: " + res.decode()) channel.queue_delete("test-queue") print("deleted test-queue queue") finally: connection.close() -
To use a RabbitMQ client with Starlight for RabbitMQ, use your Pulsar tenant as the AMQP listener.
You can also connect on
localhostif you have port forwarded the RabbitMQ port (5672) to your local machine. Forlocalhostexamples, see the IBM Elite Support for Apache Pulsar Python client example and IBM Elite Support for Apache Pulsar Java client example.You can also modify the Astra Streaming Java client example by replacing the connection details with your cluster’s connection details.
For more information about expected connection parameters, see the documentation for your preferred RabbitMQ client library.