Pulsar connector single instance quickstart for DSE

This quickstart provides the minimal configuration to connect a standalone Apache Pulsar™ instance to a single, standalone DataStax Enterprise (DSE) node.

It covers installation and configuration of DSE, Apache Pulsar, and the connector. Then, it demonstrates how to send simple key/value pair messages from Pulsar to DSE with the Pulsar client utility and verify that the records were written to the database table.

Install DSE

  1. If you don’t have DSE installed already, install the tarball distribution of DSE.

  2. Enable authentication or get the credentials for your DSE cluster.

    By default, DSE doesn’t enable authentication. While this may be acceptable for this quickstart, DataStax recommends enabling authentication for production clusters.

  3. Start DSE from the installation directory:

    bin/dse cassandra

    You can run bin/nodetool status to verify that DSE is running from the installation directory, and get the node address and other connection details.

  4. Create a keyspace and table for your Pulsar records:

    1. Start cqlsh:

      bin/cqlsh
    2. Create a keyspace named pulsar_qs:

      cqlsh> CREATE KEYSPACE IF NOT EXISTS pulsar_qs
      WITH replication = {
        'class' : 'SimpleStrategy',
        'replication_factor' : 1
      };
    3. Create a table named pulsar_kv:

      cqlsh> CREATE TABLE pulsar_qs.pulsar_kv (
      	key text PRIMARY KEY,
      	content text
      );

Install Apache Pulsar and the connector

  1. Download Apache Pulsar 2.7.0 or later:

    wget https://archive.apache.org/dist/pulsar/pulsar-2.7.0/apache-pulsar-2.7.0-bin.tar.gz
  2. Untar the Pulsar binary:

    tar xvfz apache-pulsar-2.7.0-bin.tar.gz
  3. Download the DataStax Apache Pulsar connector tar file from the DataStax downloads site.

    For more information about system requirements for the connector, see Install DataStax Apache Pulsar™ connector.

  4. Extract the files, replacing VERSION with the version number of the tar file you downloaded:

    tar zxf cassandra-enhanced-pulsar-sink-VERSION.tar.gz
  5. In your Pulsar home directory, find the connectors directory. If there isn’t a connectors directory, create one.

  6. Move the DataStax Pulsar connector NAR file to the Pulsar connectors directory:

    mv installation_location/cassandra-enhanced-pulsar-sink-1.4.0.nar pulsar_home/connectors
  7. In your Pulsar config directory, create a qs.yml file with the following contents:

    qs.yml
    configs:
      verbose: false
      batchSize: 3000
      batchFlushTimeoutMs: 1000
      topics: example_topic
      contactPoints: localhost
      loadBalancing.localDc: Cassandra
      port: 9042
      cloud.secureConnectBundle:
      ignoreErrors: None
      maxConcurrentRequests: 500
      maxNumberOfRecordsInBatch: 32
      queryExecutionTimeout: 30
      connectionPoolLocalSize: 4
      jmx: true
      compression: None
      auth:
        provider: None
        username:
        password:
        gssapi:
          keyTab:
          principal:
          service: dse
      ssl:
        provider: None
        hostnameValidation: true
        keystore:
          password:
          path:
        openssl:
          keyCertChain:
          privateKey:
        truststore:
          password:
          path:
        cipherSuites:
      topic:
        example_topic:
          pulsar_qs:
            pulsar_kv:
              mapping: 'key=key,content=value'
              consistencyLevel: LOCAL_ONE
              ttl: -1
              ttlTimeUnit : SECONDS
              timestampTimeUnit : MICROSECONDS
              nullToUnset: true
              deletesEnabled: true
          codec:
            locale: en_US
            timeZone: UTC
            timestamp: CQL_TIMESTAMP
            date: ISO_LOCAL_DATE
            time: ISO_LOCAL_TIME
            unit: MILLISECONDS
  8. If your cluster has authentication enabled, you want to use an SSL-encrypted connection, or your cluster isn’t compatible with the connection properties in qs.yml, you must edit the configuration file as explained in Connect the DataStax Apache Pulsar™ connector.

  9. Ensure that the user running Pulsar has permission to access the configuration and NAR files.

Run Pulsar with the connector

  1. Start Apache Pulsar in standalone mode:

    bin/pulsar standalone
  2. In a new terminal, check that the DataStax Pulsar connector is running:

    curl -s http://localhost:8080/admin/v2/functions/connectors

    Make sure the response includes the DataStax Pulsar connector:

    [{"name":"cassandra-enhanced","description":"A DataStax Pulsar Sink to load records from Pulsar topics to Apache Cassandra(R) or DataStax Enterprise(DSE)\n","sinkClass":"com.datastax.oss.sink.pulsar.RecordCassandraSinkTask"}]
  3. Create a Pulsar sink:

    bin/pulsar-admin sinks create \
    	--name dse-sink-kv \
    	--classname com.datastax.oss.sink.pulsar.StringCassandraSinkTask \
    	--sink-config-file config/qs.yml \
    	--sink-type cassandra-enhanced \
    	--tenant public \
    	--namespace default \
    	--inputs "persistent://public/default/example_topic"

    The topic name and mapping are set in the connector configuration YAML file.

  4. Send some messages to your new sink:

    bin/pulsar-client produce -k "Message 1" -m "Content 1" persistent://public/default/example_topic
    bin/pulsar-client produce -k "Message 2" -m "Content 2" persistent://public/default/example_topic
    bin/pulsar-client produce -k "Message 3" -m "Content 3" persistent://public/default/example_topic
  5. To verify that the messages were written to DSE, start cqlsh, and then query your pulsar_kv table:

    cqlsh> SELECT * FROM pulsar_qs.pulsar_kv;

    The result should include the test messages you sent:

     key       | content
    -----------+-----------
     Message 3 | Content 3
     Message 2 | Content 2
     Message 1 | Content 1
    
    (3 rows)

Next steps

Your schemas probably require more complex mappings than the example used in this quickstart. Explore the following documentation to get started with more complex mappings and schemas:

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax, an IBM Company | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com