DataStax Fast JMS for Apache Pulsar Astra Streaming quick start

This quick start will get you up and running with a simple command-line Java JMS client that can talk to an Astra Streaming Pulsar instance. The client:

  1. Initializes a DataStax Fast JMS for Apache Pulsar queue.

  2. Attaches a message listener to that queue.

  3. Produces 10 messages that are then consumed and printed out in an onMessage callback.

Prerequisites

Create your Maven configuration file

We’ll use Apache Maven to handle dependency management so you don’t have to manually download and install required libraries.

  1. Create a project directory in a convenient location.

  2. Save the following file as pom.xml in your project directory.

    <?xml version="1.0" encoding="UTF-8"?>
    <project>
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.datastax.oss</groupId>
      <artifactId>pulsar-fast-jms-example</artifactId> (1)
      <packaging>jar</packaging>
      <version>1.0</version>
      <name>DataStax Fast JMS for Apache Pulsar (R) example</name>
      <description>A simple JMS client designed to test the DataStax Fast JMS for Apache Pulsar library.</description>
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>1.8</java.version>
        <java.release.version>8</java.release.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
      </properties>
        <dependencies>
          <dependency>
            <groupId>com.datastax.oss</groupId>
            <artifactId>pulsar-jms-all</artifactId> (2)
            <version>1.0.0</version>
          </dependency>
          <dependency>
            <groupId>org.slf4j</groupId> (3)
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.30</version>
          </dependency>
        </dependencies>
      <build>
        <plugins>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.3.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef> (4)
                </descriptorRefs>
                <archive>
                  <manifest>
                    <mainClass>example.AstraTest</mainClass> (5)
                  </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>assemble-all</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
          </plugin>
        </plugins>
      </build>
    </project>

Note the following elements:

  1. The main name for the compiled JAR file.

  2. The DataStax Fast JMS for Apache Pulsar dependency. In this case we’re using a "fat" JAR file, pulsar-jms-all, that includes all dependencies.

  3. The Log4J dependency.

  4. An additional descriptor appended to the JAR file name.

  5. The default package and class, example.AstraTest, so you can just run the JAR file without any additional specifications.

Create the Astra Streaming example app

To create the Astra Streaming example app:

  1. In the project directory you created above, create the following directory hierarchy, <project-directory>/src/main/java/example:

    cd <project-directory>
    mkdir -p /src/main/java/example
  2. Copy the following code into a file named AstraTest.java and save the file:

    package example;
    
    import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
    
    import javax.jms.JMSContext;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.Queue;
    import java.util.HashMap;
    import java.util.Map;
    
    public class AstraTest {
    
        public static void main(String ... args) throws Exception {
    
            String topic = "<astra-topic>"; (1)
    
            String token = "<astra-token>"; (2)
    
            Map<String, Object> properties = new HashMap<>(); (3)
            properties.put("webServiceUrl","<astra-web-service-url>");
            properties.put("brokerServiceUrl","<astra-broker-service-url>");
            properties.put("authPlugin","org.apache.pulsar.client.impl.auth.AuthenticationToken");
            properties.put("authParams",token);
    
            try (PulsarConnectionFactory factory = new PulsarConnectionFactory(properties); ){
                JMSContext context = factory.createContext();
                Queue queue = context.createQueue(topic); (4)
    
                // Listen for messages...
                context.createConsumer(queue).setMessageListener(new MessageListener() { (5)
                    @Override
                    public void onMessage(Message message) { (6)
                        try {
                            System.out.println("Received: " + message.getBody(String.class));
                        } catch (Exception err) {
                            err.printStackTrace();
                        }
                    }
                });
    
                for (int i = 0; i < 10; i++) { (7)
                    String message = "Hello world! " + i;
                    System.out.println("Sending: "+message);
                    context.createProducer().send(queue, message);
                }
    
                Thread.sleep(10000); (8)
            }
        }
    }

In the code example above, note the following points of interest and make the required changes:

  1. This is the topic URI the client will use to publish and consume messages. You can create this using the Astra Streaming console. See the Astra Streaming quick start.

  2. Your Astra Streaming security token which you can retrieve from the Connect tab for your tenant in the Astra Streaming console. See the Astra Streaming quick start.

  3. A properties object containing your topic URI, your Astra Streaming token and the web and broker service URIs from the Connect tab for your tenant in the Astra Streaming console. See the Astra Streaming quick start.

  4. Creates a DataStax Fast JMS for Apache Pulsar queue. For more information on mapping Pulsar to JMS concepts, see Mapping Pulsar concepts to JMS specifications

  5. Creates a DataStax Fast JMS for Apache Pulsar consumer context using the createConsumer method…​

  6. …​ and initializes an onMessage callback to consume the messages as they arrive.

  7. Sends 10 "Hello World!" messages to the queue using the createProducer method.

  8. Sleeps for 10 seconds to make sure all of the messages are consumed.

Compile the application

To compile the sample application:

  1. Change to the <product_directory.

  2. Run the maven command:

    mvn clean install

    Results:

    Many status messages...
    
    [INFO] Installing /Users/john.francis/fast-jms/target/pulsar-fast-jms-example-1.0.jar to /Users/john.francis/.m2/repository/com/datastax/oss/pulsar-fast-jms-example/1.0/pulsar-fast-jms-example-1.0.jar
    [INFO] Installing /Users/john.francis/fast-jms/pom.xml to /Users/john.francis/.m2/repository/com/datastax/oss/pulsar-fast-jms-example/1.0/pulsar-fast-jms-example-1.0.pom
    [INFO] Installing /Users/john.francis/fast-jms/target/pulsar-fast-jms-example-1.0-jar-with-dependencies.jar to /Users/john.francis/.m2/repository/com/datastax/oss/pulsar-fast-jms-example/1.0/pulsar-fast-jms-example-1.0-jar-with-dependencies.jar
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time:  31.351 s
    [INFO] Finished at: 2021-05-07T11:11:02-05:00
    [INFO] ------------------------------------------------------------------------
The first time you compile the JAR file, Maven downloads all required dependencies. Subsequent runs will be much faster.

Run the example

To run the sample app, from the <product_directory:

java -jar target/pulsar-fast-jms-example-1.0-jar-with-dependencies.jar

Results:

Sending: Hello world! 0

... many status messages...

Sending: Hello world! 1
Sending: Hello world! 2
Received: Hello world! 0
Received: Hello world! 1
Sending: Hello world! 3
Received: Hello world! 2
Sending: Hello world! 4
Received: Hello world! 3
Sending: Hello world! 5
Sending: Hello world! 6
Received: Hello world! 4
Received: Hello world! 5
Sending: Hello world! 7
Received: Hello world! 6
Sending: Hello world! 8
Received: Hello world! 7
Sending: Hello world! 9
Received: Hello world! 8
Received: Hello world! 9

... more status messages...

[pulsar-client-io-5-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x2923ee4a, L:/192.168.50.153:49153 ! R:pulsar-aws-useast2.streaming.datastax.com/3.138.177.230:6651] Disconnected
[pulsar-client-io-5-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x662e6f64, L:/192.168.50.153:65535 ! R:pulsar-aws-useast2.streaming.datastax.com/3.138.177.230:6651] Disconnected
[pulsar-client-io-5-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0x6a5af606, L:/192.168.50.153:49157 ! R:pulsar-aws-useast2.streaming.datastax.com/3.16.119.226:6651] Disconnected
[pulsar-client-io-5-1] INFO org.apache.pulsar.client.impl.ClientCnx - [id: 0xcc2062bb, L:/192.168.50.153:49154 ! R:pulsar-aws-useast2.streaming.datastax.com/3.138.177.230:6651] Disconnected
You’ll find message 0 is produced at the very top of the output. It can get lost in the subsequent messages. Also, note that the messages are consumed asynchronously.

Next