• Glossary
  • Support
  • Downloads
  • DataStax Home
Get Live Help
Expand All
Collapse All

DataStax Pulsar Connector

    • Getting Started
      • About the DataStax Apache Pulsar™ Connector
        • System requirements
      • Pulsar Connector release notes
      • Installing DataStax Apache Pulsar™ Connector
      • Pulsar Connector single instance quick start
    • Guides and examples
      • Configuration
        • Configuring parallelism
        • Specify writetime timestamp column
        • Setting row-level TTL values from Pulsar fields
        • Pass Pulsar Connector settings directly to the DataStax Java driver
        • Mapping pulsar topics to database tables
          • Determining topic data structure
          • Mapping basic messages to table columns
          • Mapping a message that contain JSON fields
            • Mapping a message that contains both basic and JSON fields
            • Mapping JSON messages
          • Mapping Avro messages
          • Extract Pulsar record header values
          • Mapping messages to table that has a User Defined Type
          • Mapping a topic to multiple tables
          • Multiple topics to multiple tables
          • Provide CQL queries in mappings
          • The now() function in mappings
      • Operations
        • About operating and maintaining the DataStax Connector
        • Scaling the DataStax Apache Pulsar™ Connector
        • Changing the topic or table schema
        • Restarting the DataStax Apache Pulsar™ Connector
        • Displaying the DataStax Apache Pulsar™ Connector configuration
        • Updating the DataStax Apache Pulsar™ Connector configuration
        • Deleting the DataStax Apache Pulsar™ Connector
        • Getting the DataStax Connector status
      • Security
        • Using internal or LDAP authentication
      • DataStax Apache Pulsar™ Connector metrics
      • Troubleshooting
        • Record fails to write
        • Writing fails because of mutation size
        • Data parsing fails
        • Loading balancing datacenter is not specified
    • Reference
      • DataStax Apache Pulsar™ Connector details
      • DataStax connection
      • Pulsar topic-to-table settings
      • Converting date and times for a topic
      • Using the DataStax Apache Pulsar™ Connector with DataStax Enterprise authentication
        • Internal or LDAP authentication
      • SSL encrypted connection
      • Configure error handling
  • DataStax Pulsar Connector
  • Getting Started
  • Pulsar Connector single instance quick start

Pulsar Connector single instance quick start

This quick start will walk you through the simplest possible configuration connecting a standalone Apache Pulsar™ instance to a single standalone DataStax Enterprise (DSE) 6.8 node.

Once you’ve configured Pulsar, a Pulsar sink, the DataStax Apache Pulsar™ Connector and the DSE instance, you’ll be able to send simple key/value pair messages using the Pulsar client utility, pulsar-client and retrieve them from a DSE keyspace and table.

Install and configure DSE 6.8

  1. Install the tarball distribution of DSE 6.8 as described in Installing DataStax Enterprise 6.8 using the binary tarball.

  2. Start DSE from the installation directory:

    bin/dse cassandra
  3. Verify that DataStax Enterprise is running from the installation directory:

    bin/nodetool status
    Datacenter: Cassandra
    =====================
    Status=Up/Down
    |/ State=Normal/Leaving/Joining/Moving/Stopped
    --  Address    Load       Owns (effective)  Host ID    Token    Rack
    UN  127.0.0.1  152.24 KiB  100.0%           <host-id>  <token>  rack1
  4. Start cqlsh:

    bin/cqlsh
    Connected to Test Cluster at 127.0.0.1:9042.
    [cqlsh 6.8.0 | DSE 6.8.9 | CQL spec 3.4.5 | DSE protocol v2]
    Use HELP for help.
    cqlsh>
  5. Create a new keyspace, pulsar_qs:

    cqlsh> CREATE KEYSPACE IF NOT EXISTS pulsar_qs
    WITH replication = {
      'class' : 'SimpleStrategy',
      'replication_factor' : 1
    };
  6. Create a new table, pulsar_kv in the keyspace to hold the Pulsar message content:

    cqlsh> CREATE TABLE pulsar_qs.pulsar_kv (
    	key text PRIMARY KEY,
    	content text
    );

Install and configure Apache Pulsar

  1. Download Apache Pulsar:

    wget https://archive.apache.org/dist/pulsar/pulsar-2.7.0/apache-pulsar-2.7.0-bin.tar.gz
  2. Untar the Pulsar binary:

    tar xvfz apache-pulsar-2.7.0-bin.tar.gz
  3. Download the DataStax Apache Pulsar Connector tar file from the DataStax downloads site. If you agree, enable the Terms checkbox and click the download icon.

  4. Extract the files:

    tar zxf cassandra-enhanced-pulsar-sink-1.4.0.tar.gz
  5. Create a connectors directory in the Pulsar home directory if it doesn’t exist.

  6. Copy the DataStax connector NAR to the Pulsar connectors directory:

    cp installation_location/cassandra-enhanced-pulsar-sink-1.4.0.nar pulsar_home/connectors
  7. Start Apache Pulsar in standalone mode:

    bin/pulsar standalone
  8. Open a new terminal and check that the DataStax Apache Pulsar Connector is running:

    curl -s http://localhost:8080/admin/v2/functions/connectors
    [{"name":"cassandra-enhanced","description":"A DataStax Pulsar Sink to load records from Pulsar topics to Apache Cassandra(R) or DataStax Enterprise(DSE)\n","sinkClass":"com.datastax.oss.sink.pulsar.RecordCassandraSinkTask"}]%

Configure the DataStax Apache Pulsar Connector

  1. Save following YAML contents to the Pulsar configuration directory, conf as qs.yml:

    configs:
      verbose: false
      batchSize: 3000
      batchFlushTimeoutMs: 1000
      topics: example_topic
      contactPoints: localhost
      loadBalancing.localDc: Cassandra
      port: 9042
      cloud.secureConnectBundle:
      ignoreErrors: None
      maxConcurrentRequests: 500
      maxNumberOfRecordsInBatch: 32
      queryExecutionTimeout: 30
      connectionPoolLocalSize: 4
      jmx: true
      compression: None
      auth:
        provider: None
        username:
        password:
        gssapi:
          keyTab:
          principal:
          service: dse
      ssl:
        provider: None
        hostnameValidation: true
        keystore:
          password:
          path:
        openssl:
          keyCertChain:
          privateKey:
        truststore:
          password:
          path:
        cipherSuites:
      topic:
        example_topic:
          pulsar_qs:
            pulsar_kv:
              mapping: 'key=key,content=value'
              consistencyLevel: LOCAL_ONE
              ttl: -1
              ttlTimeUnit : SECONDS
              timestampTimeUnit : MICROSECONDS
              nullToUnset: true
              deletesEnabled: true
          codec:
            locale: en_US
            timeZone: UTC
            timestamp: CQL_TIMESTAMP
            date: ISO_LOCAL_DATE
            time: ISO_LOCAL_TIME
            unit: MILLISECONDS
  2. Create a new Pulsar sink:

    bin/pulsar-admin sinks create \
    	--name dse-sink-kv \
    	--classname com.datastax.oss.sink.pulsar.StringCassandraSinkTask \
    	--sink-config-file conf/qs.yml \
    	--sink-type cassandra-enhanced \
    	--tenant public \
    	--namespace default \
    	--inputs "persistent://public/default/example_topic"
    "Created successfully"
  3. Send some messages to DSE:

    bin/pulsar-client produce -k "Message 1" -m "Content 1" persistent://public/default/example_topic
    bin/pulsar-client produce -k "Message 2" -m "Content 2" persistent://public/default/example_topic
    bin/pulsar-client produce -k "Message 3" -m "Content 3" persistent://public/default/example_topic
  4. Start cqlsh and view the messages in the pulsar_kv table:

    cqlsh> SELECT * FROM pulsar_qs.pulsar_kv;
    
     key       | content
    -----------+-----------
     Message 3 | Content 3
     Message 2 | Content 2
     Message 1 | Content 1
    
    (3 rows)

Where to go from here…​

Now that you’ve run through a simple end-to-end configuration, you can start experimenting with more complicated Pulsar mappings and DSE schemas. For more details, see the following topics:

  • Determining topic data structure Display messages to determine the data structure of the topic messages.

  • Mapping basic messages to table columns Create a topic-table map for Pulsar messages that only contain a key and value in each record.

  • Mapping a message that contain JSON fields For JSON fields, map individual fields in the structure to columns.

  • Mapping Avro messages Supports mapping individual fields from a Avro format field.

  • Extract Pulsar record header values Extract values from Pulsar record header and write to the database table.

  • Mapping messages to table that has a User Defined Type Write complex types directly into User-defined Types (UDT).

  • Mapping a topic to multiple tables Ingest a single topic into multiple tables using a single connector instance.

  • Multiple topics to multiple tables Ingest multiple topics and write to different tables using a single connector instance.

  • Selectively update maps and UDTs based on Pulsar fields Selectively update maps and UDTs based on Pulsar fields.

  • Provide CQL queries in mappings Provide CQL queries when new record arrives on the Pulsar topic.

  • The now() function in mappings You can use the now() function in mappings.

Installing DataStax Apache Pulsar™ Connector Configuring parallelism

General Inquiries: +1 (650) 389-6000 info@datastax.com

© DataStax | Privacy policy | Terms of use

DataStax, Titan, and TitanDB are registered trademarks of DataStax, Inc. and its subsidiaries in the United States and/or other countries.

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries.

Kubernetes is the registered trademark of the Linux Foundation.

landing_page landingpage