Get started with the Starlight for RabbitMQ extension
The Starlight for RabbitMQ extension enables RabbitMQ protocol support for Apache Pulsar™.
The extension acts as a proxy between your RabbitMQ application and Pulsar cluster. It implements the AMQP 0.9.1 protocol used by RabbitMQ clients and translates AMQP frames and concepts to Pulsar concepts.
Deploy Starlight for RabbitMQ
There are several ways to run Starlight for RabbitMQ:
- Pulsar pluggable protocol handler
-
Deploying Starlight for RabbitMQ in the broker as a protocol handler means that you don’t need to deploy another runtime. However, the resources used by Starlight for RabbitMQ to handle traffic can have an impact on the load balancing algorithms used by Pulsar.
By adding the Starlight for RabbitMQ protocol handler to your Pulsar cluster, you can migrate your existing RabbitMQ applications and services to Pulsar without modifying the code.

- Pulsar Proxy extension
-
Deploying Starlight for RabbitMQ in the Pulsar Proxy as a proxy extension means that you don’t need to deploy another runtime if you already use the Pulsar Proxy. Additionally, there is no significant impact on the Pulsar Proxy’s performance. Proxy extensions require Pulsar version 2.9 or later.
- Standalone Java application
-
Running Starlight for RabbitMQ as a standalone Java application has the advantage of not impacting the resources used by the Pulsar cluster but on the other hand you need to deploy and manage the lifecycle of an additional runtime.
The deployment options you can use depend on your Pulsar deployment.
-
Astra Streaming
-
Luna Streaming
-
Self-managed
When you enable Starlight for RabbitMQ in Astra Streaming, it is deployed as a protocol handler in the Pulsar cluster. Because Astra Streaming manages the Pulsar cluster for you, you cannot use the other Starlight for RabbitMQ deployment options.
The following steps explain how to enable the Starlight for RabbitMQ protocol handler in an Astra Streaming Pulsar cluster, and then get the connection details for your RabbitMQ client.
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Create a tenant or click the name of an existing tenant.
-
Go to your tenant’s Connect tab, select RabbitMQ, and then click Enable RabbitMQ.
-
Review the information about the Starlight for RabbitMQ extension, and then click Enable RabbitMQ to confirm that you want to enable this extension on your tenant.
This action creates a configuration file and a
rabbitmqnamespace in your Astra Streaming tenant automatically.The
rabbitmqnamespace is required for the Starlight for RabbitMQ extension to function properly. It is a permanent namespace that cannot be removed except by deleting the entire tenant and all of its data. -
Save the RabbitMQ connection details to a file named
rabbitmq.conf. The actual values depend on your Astra Streaming tenant’s configuration and cloud provider.rabbitmq.confusername: TENANT_NAME password: ****** host: rabbitmq-PROVIDER-REGION.streaming.datastax.com port: 5671 virtual_host: TENANT_NAME/rabbitmq amqp_URI: ******@rabbitmq-PROVIDER-REGION.streaming.datastax.com:5671/TENANT_NAME/rabbitmqIn Astra Streaming, if you click Download or Copy, then a Pulsar token is included in the
passwordfield automatically. You can also generate a token on your tenant’s Settings tab.Continue to the next section to learn how to use these connection details in a RabbitMQ client script.
-
Protocol handler (Helm chart)
-
Protocol handler (
narfile) -
Proxy extension
-
Standalone Java application
The Starlight for RabbitMQ extension is included in the luna-streaming-all image used to deploy a Luna cluster.
The Luna Helm chart simplifies deployment of Starlight for RabbitMQ as a protocol handler.
The following steps explain how to deploy a Luna Streaming Helm chart to create a simple Pulsar cluster with the Starlight for RabbitMQ extension ready for use:
-
Make sure you meet the following prerequisites:
-
Install Helm 3 CLI version 3.8.0 or later.
-
Install kubectl CLI version 1.23.4 or later.
-
Have access to a Kubernetes cluster with permission to create a namespace, deployments, and pods.
-
-
Add the DataStax Helm chart repo to your Helm store:
helm repo add datastax-pulsar https://datastax.github.io/pulsar-helm-chart -
Install the Helm chart using a minimal values file. The following command creates a Helm release named
my-pulsar-clusterusing the DataStax Luna Helm chart within a Kubernetes namespace nameddatastax-pulsar. This minimal configuration creates only the essential components and has no ingress or load balanced services.VALUES_URL="https://raw.githubusercontent.com/datastaxdevs/luna-streaming-examples/main/starlight-for-rabbitmq/values.yaml" helm install \ --namespace datastax-pulsar \ --create-namespace \ --values $VALUES_URL \ --version 3.0.4 \ my-pulsar-cluster \ datastax-pulsar/pulsar -
Wait for the broker pod to reach a running state. It might restart a few times while the components start up.
kubectl -n datastax-pulsar wait --for=condition=Ready pod/pulsar-broker-0 --timeout=120s -
Enable port forwarding for the Pulsar Admin and Starlight for RabbitMQ services that are running on the Kubernetes cluster.
You don’t need to open the Pulsar binary port to accept new messages when using Starlight for RabbitMQ. This is because RabbitMQ clients communicate using the AMQP protocol over the RabbitMQ port (5672).
-
In a new terminal, port forward the Pulsar Admin service:
kubectl port-forward -n datastax-pulsar service/pulsar-broker 8080:8080 -
In a separate terminal window, port forward the Starlight for RabbitMQ service:
kubectl port-forward -n datastax-pulsar service/pulsar-proxy 5672:5672The Luna Helm chart deployed Starlight for RabbitMQ on the Pulsar proxy and opened the correct port. The forwarded ports allow you to access Pulsar and Starlight for RabbitMQ services when running RabbitMQ client scripts and using CLI tools locally. Your applications can now communicate with Pulsar as if it were a real RabbitMQ host, as explained in the next section.
-
You can embed Starlight for RabbitMQ directly into the Pulsar brokers by loading it as a protocol handler.
-
Download the Starlight for RabbitMQ
tarfile from the Starlight for RabbitMQ GitHub repository. -
Extract the files from the
tar:tar xvfz starlight-rabbitmq-VERSION-all.tar.gzCopy the
starlight-rabbitmq-VERSION.narfile, which is used to run Starlight for RabbitMQ as a protocol handler, to the appropriate directory in your Pulsar broker installation directory, such as the/protocolsdirectory. -
Create or edit a Pulsar broker configuration file, such as
broker.conforstandalone.conf. -
Set the Starlight for RabbitMQ protocol handler configuration. For example, if the
narfile is in the./protocolsdirectory:broker.confmessagingProtocols=rabbitmq protocolHandlerDirectory=./protocols -
Set the AMQP service listeners. For example:
broker.confamqpListeners=amqp://127.0.0.1:5672 advertisedAddress=127.0.0.1The hostname value in
amqpListenersis the same as Pulsar broker’sadvertisedAddress. -
Start the Pulsar broker.
You can embed Starlight for RabbitMQ into the Pulsar Proxy by loading it as a proxy extension.
-
Download the Starlight for RabbitMQ
tarfile from the Starlight for RabbitMQ GitHub repository. -
Extract the files from the
tar:tar xvfz starlight-rabbitmq-VERSION-all.tar.gzCopy the
starlight-rabbitmq-VERSION.narfile, which is used to run Starlight for RabbitMQ as a proxy extension, to the appropriate directory in your Pulsar Proxy installation directory, such as the/proxyextensionsdirectory. -
Create or edit a Pulsar Proxy configuration file, such as
proxy.conf. -
Set the Starlight for RabbitMQ proxy extension configuration. For example, if the
narfile is in the./proxyextensionsdirectoryproxy.confproxyExtensions=rabbitmq proxyExtensionsDirectory=./proxyextensions -
Set the AMQP service listeners. For example:
proxy.confamqpListeners=amqp://127.0.0.1:5672 advertisedAddress=127.0.0.1The hostname value in listeners is the same as Pulsar proxy’s
advertisedAddress. -
Start the Pulsar proxy.
You can run Starlight for RabbitMQ as a standalone Java application.
The jar file for running Starlight for RabbitMQ as a standalone Java application is available in the Starlight for RabbitMQ tar file.
-
Download the Starlight for RabbitMQ
tarfile from the Starlight for RabbitMQ GitHub repository. -
Extract the files from the
tar:tar xvfz starlight-rabbitmq-VERSION-all.tar.gzThe path to the
jarfile is/starlight-rabbitmq/target/starlight-rabbitmq-VERSION-jar-with-dependencies.jar. -
In a configuration file, set the URLs of the Pulsar brokers and the Apache ZooKeeper™ configuration store. For example:
brokerServiceURL=pulsar://localhost:6650 brokerWebServiceURL=http://localhost:8080 configurationStoreServers=localhost:2181 -
Run the
jarfile as a Java application with the configuration file path in the--config(-c) option:java -jar ./starlight-rabbitmq/target/starlight-rabbitmq-${version}-jar-with-dependencies.jar -c conf/starlight-for-rabbitmq.conf
-
Protocol handler
-
Proxy extension
-
Standalone Java application
You can embed Starlight for RabbitMQ directly into the Pulsar brokers by loading it as a protocol handler.
-
Download the Starlight for RabbitMQ
tarfile from the Starlight for RabbitMQ GitHub repository. -
Extract the files from the
tar:tar xvfz starlight-rabbitmq-VERSION-all.tar.gzCopy the
starlight-rabbitmq-VERSION.narfile, which is used to run Starlight for RabbitMQ as a protocol handler, to the appropriate directory in your Pulsar broker installation directory, such as the/protocolsdirectory. -
Create or edit a Pulsar broker configuration file, such as
broker.conforstandalone.conf. -
Set the Starlight for RabbitMQ protocol handler configuration. For example, if the
narfile is in the./protocolsdirectory:broker.confmessagingProtocols=rabbitmq protocolHandlerDirectory=./protocols -
Set the AMQP service listeners. For example:
broker.confamqpListeners=amqp://127.0.0.1:5672 advertisedAddress=127.0.0.1The hostname value in
amqpListenersis the same as Pulsar broker’sadvertisedAddress. -
Start the Pulsar broker.
You can embed Starlight for RabbitMQ into the Pulsar Proxy by loading it as a proxy extension.
-
Download the Starlight for RabbitMQ
tarfile from the Starlight for RabbitMQ GitHub repository. -
Extract the files from the
tar:tar xvfz starlight-rabbitmq-VERSION-all.tar.gzCopy the
starlight-rabbitmq-VERSION.narfile, which is used to run Starlight for RabbitMQ as a proxy extension, to the appropriate directory in your Pulsar Proxy installation directory, such as the/proxyextensionsdirectory. -
Create or edit a Pulsar Proxy configuration file, such as
proxy.conf. -
Set the Starlight for RabbitMQ proxy extension configuration. For example, if the
narfile is in the./proxyextensionsdirectoryproxy.confproxyExtensions=rabbitmq proxyExtensionsDirectory=./proxyextensions -
Set the AMQP service listeners. For example:
proxy.confamqpListeners=amqp://127.0.0.1:5672 advertisedAddress=127.0.0.1The hostname value in listeners is the same as Pulsar proxy’s
advertisedAddress. -
Start the Pulsar proxy.
You can run Starlight for RabbitMQ as a standalone Java application.
The jar file for running Starlight for RabbitMQ as a standalone Java application is available in the Starlight for RabbitMQ tar file.
-
Download the Starlight for RabbitMQ
tarfile from the Starlight for RabbitMQ GitHub repository. -
Extract the files from the
tar:tar xvfz starlight-rabbitmq-VERSION-all.tar.gzThe path to the
jarfile is/starlight-rabbitmq/target/starlight-rabbitmq-VERSION-jar-with-dependencies.jar. -
In a configuration file, set the URLs of the Pulsar brokers and the Apache ZooKeeper™ configuration store. For example:
brokerServiceURL=pulsar://localhost:6650 brokerWebServiceURL=http://localhost:8080 configurationStoreServers=localhost:2181 -
Run the
jarfile as a Java application with the configuration file path in the--config(-c) option:java -jar ./starlight-rabbitmq/target/starlight-rabbitmq-${version}-jar-with-dependencies.jar -c conf/starlight-for-rabbitmq.conf
Produce and consume messages with Starlight for RabbitMQ
|
Most of these examples assume you deployed Starlight for RabbitMQ as a protocol handler. You might need to make some modifications if you deployed Starlight for RabbitMQ as a standalone application or a proxy extension. |
This section explains how to use your Pulsar tenant’s connection details with a RabbitMQ client to produce and consume messages with Starlight for RabbitMQ.
Starlight for RabbitMQ supports many different use cases. With a Pulsar cluster between publishers and consumers, you can change the type of publisher and consumer to fit your needs.
The following examples use Python and Java. The Java example uses Maven, but you can also use Gradle. For complete source code examples and examples for other languages, see the DataStax streaming examples repository. These examples are written for Astra Streaming but they can be adapted for Luna Streaming or self-managed Pulsar clusters by replacing the connection details with those for your cluster.
-
Astra Streaming
-
Luna Streaming
-
Self-managed
-
Python client
-
Java client
The following example uses a Python script to create a connection between RabbitMQ and your Astra Streaming tenant.
It also establishes a message queue named queuename, prints ten messages, and then closes the connection.
-
Enable Starlight for RabbitMQ and get the
rabbitmq.confconnection details, as explained in Deploy Starlight for RabbitMQ. -
Create a
connect-test.pyfile containing the following code, and then replace the placeholders with the values from yourrabbitmq.conffile:connect-test.pyimport ssl import pika virtual_host = "VIRTUAL_HOST" token = "PASSWORD" context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) context.verify_mode = ssl.CERT_NONE context.check_hostname = False context.load_default_certs() ssl_options = pika.SSLOptions(context) connection = pika.BlockingConnection(pika.ConnectionParameters( virtual_host=virtual_host, host="HOST", ssl_options=ssl_options, port=PORT, credentials=pika.PlainCredentials("", token))) print("connection success") channel = connection.channel() print("started a channel") channel.queue_declare(queue='queuename') for x in range(10): channel.basic_publish(exchange='', routing_key='routingkey', body='message body goes here') print(" sent one") connection.close() -
Optional: Change the values for
queue,routing_key, andbody.queuenameandroutingkeybecome the names of Pulsar topics in your Astra Streaming tenant. Thebodyis the content of each message that is sent. -
Save and run the
connect-test.pyscript:python3 connect-test.py -
Make sure the output shows that the connection was successful and ten messages were sent:
connection success started a channel sent one sent one sent one sent one sent one sent one sent one sent one sent one sent one -
In Astra Streaming, go to your tenant’s Namespaces and Topics tab to inspect the activity in the
rabbitmqnamespace.If you everything was configured correctly, then the
rabbitmqnamespace should have topics namedamq.default.__queuenameandamq.default_routingkeythat were created by the Python script. Additionally, the namespace’s metrics should reflect that at least 10 messages were published and consumed by your Astra Streaming Pulsar topics.
The following example uses a Java program to create a connection between RabbitMQ and your Astra Streaming tenant, and then it establishes a message queue and sends a message.
-
Enable Starlight for RabbitMQ and get the
rabbitmq.confconnection details, as explained in Deploy Starlight for RabbitMQ. -
Create a new Maven project:
mvn archetype:generate \ -DgroupId=org.example \ -DartifactId=StarlightForRabbitMqClient \ -DarchetypeArtifactId=maven-archetype-quickstart \ -DinteractiveMode=false -
Change to the new project directory:
cd StarlightForRabbitMqClient -
Open the new project in your IDE, and then add the RabbitMQ client dependency to
pom.xml:pom.xml<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.16.0</version> </dependency> -
Open the
App.javafile atsrc/main/java/org/example/App.java, and then delete any preeixsting code in this file. In the next steps, you will add code to this file to create a complete program that produces and consumes messages. -
Paste the following code in the file, and then replace the placeholders with the values from your
rabbitmq.conffile. Your editor will report errors because this isn’t a complete program yet.App.javapackage org.example; import com.rabbitmq.client.*; import java.io.IOException; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.util.concurrent.TimeoutException; public class App { private static final String username = ""; private static final String password = "PASSWORD"; private static final String host = "HOST"; private static final int port = 5671; private static final String virtual_host = "VIRTUAL_HOST"; private static final String queueName = "queuename"; private static final String amqp_URI = String.format("amqps://%s:%s@%s:%d/%s", username, password, host, port, virtual_host.replace("/","%2f")); public static void main(String[] args) throws IOException, TimeoutException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException, InterruptedException { -
Optional: Replace
queuenamewith another name for the queue that publishes and consumes messages. This name is also used as the corresponding topic name in Astra Streaming. If the topic doesn’t exist, it is created automatically when the producer sends the first message. -
Add the following code to create a connection, channel, and queue that is used by both the producer and consumer:
App.javaConnectionFactory factory = new ConnectionFactory(); factory.setUri(amqp_URI); /* You could also set each value individually factory.setHost(host); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); factory.setVirtualHost(virtual_host); factory.useSslProtocol(); */ Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); -
Add the producer code, which is a simple flow that sends a single message and awaits acknowledgment:
App.javaString publishMessage = "Hello World!"; channel.basicPublish("", queueName, null, publishMessage.getBytes()); System.out.println(" Sent '" + publishMessage + "'"); -
Add the consumer code, which creates a basic consumer with callback on message receipt. Because the consumer isn’t a blocking thread, the
sleepallows time for messages to be received and processed.App.javaDeliverCallback deliverCallback = (consumerTag, delivery) -> { String consumeMessage = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" Received '" + consumeMessage + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); Thread.sleep(4000); // wait a bit for messages to be received channel.close(); connection.close(); } } -
Save
App.java, and then build and run the JAR file for the complete program:mvn clean package assembly:single java -jar target/StarlightForRabbitMqClient-1.0-SNAPSHOT-jar-with-dependencies.jar -
Make sure the result shows that a message was sent and received:
Sent 'Hello World!' Received 'Hello World!' -
In Astra Streaming, go to your tenant’s Namespaces and Topics tab to inspect the activity in the
rabbitmqnamespace.If you everything was configured correctly, then the
rabbitmqnamespace should have a topic namedamq.default.__queuenamethat was created by the Java program. Additionally, the namespace’s metrics should reflect that at least one message was published and consumed by your Astra Streaming Pulsar topic.
To use a RabbitMQ client with Starlight for RabbitMQ, you use your Luna Streaming Pulsar tenant as the AMQP listener.
The following examples use a connection on localhost:5672 with Starlight for RabbitMQ as a protocol handler.
For other connection methods, see the documentation for your preferred RabbitMQ client library.
-
Python client
-
Java client
The following example uses the Pika RabbitMQ Python library to produce and consume messages from Pulsar.
-
Save the following Python script to a safe place as
test-queue.py. The example script assumes you have opened thelocalhost:5672port, as explained in Deploy Starlight for RabbitMQ.test-queue.py#!/usr/bin/env python import pika connection = pika.BlockingConnection(pika.ConnectionParameters(port=5672)) channel = connection.channel() try: channel.queue_declare("test-queue") print("created test-queue queue") channel.basic_publish(exchange="", routing_key="test-queue", body="test".encode('utf-8')) print("published message test") _, _, res = channel.basic_get(queue="test-queue", auto_ack=True) assert res is not None, "should have received a message" print("received message: " + res.decode()) channel.queue_delete("test-queue") print("deleted test-queue queue") finally: connection.close() -
Optional: Replace
test-queuewith another name for the queue and routing key. These names are also used as the corresponding topic names in your Pulsar tenant. If the topic doesn’t exist, it is created automatically when the producer sends the first message. -
Save and run the
test-queue.pyscript:python ./test-queue.py -
Make sure the output shows that the queue was created and a message was sent:
created test-queue queue published message test received message: test deleted test-queue queue -
Use the Apache Pulsar admin CLI or the Luna Streaming Pulsar Admin Console to inspect your tenant’s activity. Make sure the
test-queuetopic was created and a message was published and consumed.
The following example uses a Java program to create a connection between RabbitMQ and your Luna Streaming Pulsar tenant, and then it establishes a message queue and sends a message.
-
Create a new Maven project:
mvn archetype:generate \ -DgroupId=org.example \ -DartifactId=StarlightForRabbitMqClient \ -DarchetypeArtifactId=maven-archetype-quickstart \ -DinteractiveMode=false -
Change to the new project directory:
cd StarlightForRabbitMqClient -
Open the new project in your IDE, and then add the RabbitMQ client dependency to
pom.xml:pom.xml<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.16.0</version> </dependency> -
Open the
App.javafile atsrc/main/java/org/example/App.java, and then delete any preeixsting code in this file. In the next steps, you will add code to this file to create a complete program that produces and consumes messages. -
Paste the following code in the file. This code creates a connection, channel, and queue that is used by both the producer and consumer. It uses the default connection values to connect on
localhost:5672, which was port forwarded in Deploy Starlight for RabbitMQ. Your editor will report errors because this isn’t a complete program yet.App.javapackage org.example; import com.rabbitmq.client.*; import java.io.IOException; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; import java.util.concurrent.TimeoutException; public class App { private static final String queueName = "queuename"; public static void main(String[] args) throws IOException, TimeoutException, URISyntaxException, NoSuchAlgorithmException, KeyManagementException, InterruptedException { // Use the default values to connect on localhost:5672 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queueName, false, false, false, null); -
Optional: Replace
queuenamewith another name for the queue that publishes and consumes messages. This name is also used as the corresponding topic name in Astra Streaming. If the topic doesn’t exist,it is created automatically when the producer sends the first message. -
Add the producer code, which is a simple flow that sends a single message and awaits acknowledgment:
App.javaString publishMessage = "Hello World!"; channel.basicPublish("", queueName, null, publishMessage.getBytes()); System.out.println(" Sent '" + publishMessage + "'"); -
Add the consumer code, which creates a basic consumer with callback on message receipt. Because the consumer isn’t a blocking thread, the
sleepallows time for messages to be received and processed.App.javaDeliverCallback deliverCallback = (consumerTag, delivery) -> { String consumeMessage = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" Received '" + consumeMessage + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); Thread.sleep(4000); // wait a bit for messages to be received channel.close(); connection.close(); } } -
Save
App.java, and then build and run the JAR file for the complete program:mvn clean package assembly:single java -jar target/StarlightForRabbitMqClient-1.0-SNAPSHOT-jar-with-dependencies.jar -
Make sure the result shows that a message was sent and received:
Sent 'Hello World!' Received 'Hello World!' -
Use the Pulsar Admin CLI or the Luna Streaming Pulsar Admin Console to inspect your tenant’s activity. Make sure the
queuenametopic was created and a message was published and consumed.
To use a RabbitMQ client with Starlight for RabbitMQ, you use your Pulsar tenant as the AMQP listener.
You can also connect on localhost if you have port forwarded the RabbitMQ port (5672) to your local machine.
For localhost examples, see the Luna Streaming tab.
For more information about expected connection parameters, see the documentation for your preferred RabbitMQ client library.
Before connecting your production applications with Starlight for RabbitMQ, you can use a RabbitMQ/AMQP-0.9.1 client or a tool such as RabbitMQ PerfTest to test Starlight for RabbitMQ after deploying it.
For example, the following Python script creates a queue, publishes a message that is routed to the queue, reads the message from the queue, and then deletes the queue:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(port=5672))
channel = connection.channel()
try:
channel.queue_declare("test-queue")
print("created test-queue queue")
channel.basic_publish(exchange="", routing_key="test-queue", body="test".encode('utf-8'))
print("published message test")
_, _, res = channel.basic_get(queue="test-queue", auto_ack=True)
assert res is not None, "should have received a message"
print("received message: " + res.decode())
channel.queue_delete("test-queue")
print("deleted test-queue queue")
finally:
connection.close()