Pulsar Connector single instance quick start

This quick start will walk you through the simplest possible configuration connecting a standalone Apache® Pulsar instance to a single standalone DataStax Enterprise (DSE) 6.8 node.

Once you’ve configured Pulsar, a Pulsar sink, the DataStax Apache® Pulsar Connector and the DSE instance, you’ll be able to send simple key/value pair messages using the Pulsar client utility, pulsar-client and retrieve them from a DSE keyspace and table.

Install and configure DSE 6.8

  1. Install the tarball distribution of DSE 6.8 as described in Installing DataStax Enterprise 6.8 using the binary tarball.

  2. Start DSE from the installation directory:

    bin/dse cassandra
  3. Verify that DataStax Enterprise is running from the installation directory:

    bin/nodetool status
    Datacenter: Cassandra
    =====================
    Status=Up/Down
    |/ State=Normal/Leaving/Joining/Moving/Stopped
    --  Address    Load       Owns (effective)  Host ID    Token    Rack
    UN  127.0.0.1  152.24 KiB  100.0%           <host-id>  <token>  rack1
  4. Start cqlsh:

    bin/cqlsh
    Connected to Test Cluster at 127.0.0.1:9042.
    [cqlsh 6.8.0 | DSE 6.8.9 | CQL spec 3.4.5 | DSE protocol v2]
    Use HELP for help.
    cqlsh>
  5. Create a new keyspace, pulsar_qs:

    cqlsh> CREATE KEYSPACE IF NOT EXISTS pulsar_qs
    WITH replication = {
      'class' : 'SimpleStrategy',
      'replication_factor' : 1
    };
  6. Create a new table, pulsar_kv in the keyspace to hold the Pulsar message content:

    cqlsh> CREATE TABLE pulsar_qs.pulsar_kv (
    	key text PRIMARY KEY,
    	content text
    );

Install and configure Apache Pulsar

  1. Download Apache Pulsar:

    wget https://archive.apache.org/dist/pulsar/pulsar-2.7.0/apache-pulsar-2.7.0-bin.tar.gz
  2. Untar the Pulsar binary:

    tar xvfz apache-pulsar-2.7.0-bin.tar.gz
  3. Download the DataStax Apache Pulsar Connector tar file from the DataStax downloads site. If you agree, enable the Terms checkbox and click the download icon.

  4. Extract the files:

    tar zxf cassandra-enhanced-pulsar-sink-1.4.0.tar.gz
  5. Create a connectors directory in the Pulsar home directory if it doesn’t exist.

  6. Copy the DataStax connector NAR to the Pulsar connectors directory:

    cp installation_location/cassandra-enhanced-pulsar-sink-1.4.0.nar pulsar_home/connectors
  7. Start Apache Pulsar in standalone mode:

    bin/pulsar standalone
  8. Open a new terminal and check that the DataStax Apache Pulsar Connector is running:

    curl -s http://localhost:8080/admin/v2/functions/connectors
    [{"name":"cassandra-enhanced","description":"A DataStax Pulsar Sink to load records from Pulsar topics to Apache Cassandra(R) or DataStax Enterprise(DSE)\n","sinkClass":"com.datastax.oss.sink.pulsar.RecordCassandraSinkTask"}]%

Configure the DataStax Apache Pulsar Connector

  1. Save following YAML contents to the Pulsar configuration directory, conf as qs.yml:

    configs:
      verbose: false
      batchSize: 3000
      batchFlushTimeoutMs: 1000
      topics: example_topic
      contactPoints: localhost
      loadBalancing.localDc: Cassandra
      port: 9042
      cloud.secureConnectBundle:
      ignoreErrors: None
      maxConcurrentRequests: 500
      maxNumberOfRecordsInBatch: 32
      queryExecutionTimeout: 30
      connectionPoolLocalSize: 4
      jmx: true
      compression: None
      auth:
        provider: None
        username:
        password:
        gssapi:
          keyTab:
          principal:
          service: dse
      ssl:
        provider: None
        hostnameValidation: true
        keystore:
          password:
          path:
        openssl:
          keyCertChain:
          privateKey:
        truststore:
          password:
          path:
        cipherSuites:
      topic:
        example_topic:
          pulsar_qs:
            pulsar_kv:
              mapping: 'key=key,content=value'
              consistencyLevel: LOCAL_ONE
              ttl: -1
              ttlTimeUnit : SECONDS
              timestampTimeUnit : MICROSECONDS
              nullToUnset: true
              deletesEnabled: true
          codec:
            locale: en_US
            timeZone: UTC
            timestamp: CQL_TIMESTAMP
            date: ISO_LOCAL_DATE
            time: ISO_LOCAL_TIME
            unit: MILLISECONDS
  2. Create a new Pulsar sink:

    bin/pulsar-admin sinks create \
    	--name dse-sink-kv \
    	--classname com.datastax.oss.sink.pulsar.StringCassandraSinkTask \
    	--sink-config-file conf/qs.yml \
    	--sink-type cassandra-enhanced \
    	--tenant public \
    	--namespace default \
    	--inputs "persistent://public/default/example_topic"
    "Created successfully"
  3. Send some messages to DSE:

    bin/pulsar-client produce -k "Message 1" -m "Content 1" persistent://public/default/example_topic
    bin/pulsar-client produce -k "Message 2" -m "Content 2" persistent://public/default/example_topic
    bin/pulsar-client produce -k "Message 3" -m "Content 3" persistent://public/default/example_topic
  4. Start cqlsh and view the messages in the pulsar_kv table:

    cqlsh> SELECT * FROM pulsar_qs.pulsar_kv;
    
     key       | content
    -----------+-----------
     Message 3 | Content 3
     Message 2 | Content 2
     Message 1 | Content 1
    
    (3 rows)

Where to go from here…​

Now that you’ve run through a simple end-to-end configuration, you can start experimenting with more complicated Pulsar mappings and DSE schemas. For more details, see the following topics: