Pulsar connector single instance quickstart for DSE
This quickstart provides the minimal configuration to connect a standalone Apache Pulsar™ instance to a single, standalone DataStax Enterprise (DSE) node.
It covers installation and configuration of DSE, Apache Pulsar, and the connector. Then, it demonstrates how to send simple key/value pair messages from Pulsar to DSE with the Pulsar client utility and verify that the records were written to the database table.
Install DSE
-
If you don’t have DSE installed already, install the tarball distribution of DSE.
-
Enable authentication or get the credentials for your DSE cluster.
By default, DSE doesn’t enable authentication. While this may be acceptable for this quickstart, DataStax recommends enabling authentication for production clusters.
-
Start DSE from the installation directory:
bin/dse cassandra
You can run
bin/nodetool status
to verify that DSE is running from the installation directory, and get the node address and other connection details. -
Create a keyspace and table for your Pulsar records:
-
Start
cqlsh
:bin/cqlsh
-
Create a keyspace named
pulsar_qs
:cqlsh> CREATE KEYSPACE IF NOT EXISTS pulsar_qs WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
-
Create a table named
pulsar_kv
:cqlsh> CREATE TABLE pulsar_qs.pulsar_kv ( key text PRIMARY KEY, content text );
-
Install Apache Pulsar and the connector
-
Download Apache Pulsar 2.7.0 or later:
wget https://archive.apache.org/dist/pulsar/pulsar-2.7.0/apache-pulsar-2.7.0-bin.tar.gz
-
Untar the Pulsar binary:
tar xvfz apache-pulsar-2.7.0-bin.tar.gz
-
Download the DataStax Apache Pulsar connector tar file from the DataStax downloads site.
For more information about system requirements for the connector, see Install DataStax Apache Pulsar™ connector.
-
Extract the files, replacing VERSION with the version number of the tar file you downloaded:
tar zxf cassandra-enhanced-pulsar-sink-VERSION.tar.gz
-
In your Pulsar
home
directory, find theconnectors
directory. If there isn’t aconnectors
directory, create one. -
Move the DataStax Pulsar connector NAR file to the Pulsar
connectors
directory:mv installation_location/cassandra-enhanced-pulsar-sink-1.4.0.nar pulsar_home/connectors
-
In your Pulsar
config
directory, create aqs.yml
file with the following contents:qs.ymlconfigs: verbose: false batchSize: 3000 batchFlushTimeoutMs: 1000 topics: example_topic contactPoints: localhost loadBalancing.localDc: Cassandra port: 9042 cloud.secureConnectBundle: ignoreErrors: None maxConcurrentRequests: 500 maxNumberOfRecordsInBatch: 32 queryExecutionTimeout: 30 connectionPoolLocalSize: 4 jmx: true compression: None auth: provider: None username: password: gssapi: keyTab: principal: service: dse ssl: provider: None hostnameValidation: true keystore: password: path: openssl: keyCertChain: privateKey: truststore: password: path: cipherSuites: topic: example_topic: pulsar_qs: pulsar_kv: mapping: 'key=key,content=value' consistencyLevel: LOCAL_ONE ttl: -1 ttlTimeUnit : SECONDS timestampTimeUnit : MICROSECONDS nullToUnset: true deletesEnabled: true codec: locale: en_US timeZone: UTC timestamp: CQL_TIMESTAMP date: ISO_LOCAL_DATE time: ISO_LOCAL_TIME unit: MILLISECONDS
-
If your cluster has authentication enabled, you want to use an SSL-encrypted connection, or your cluster isn’t compatible with the connection properties in
qs.yml
, you must edit the configuration file as explained in Connect the DataStax Apache Pulsar™ connector. -
Ensure that the user running Pulsar has permission to access the configuration and NAR files.
Run Pulsar with the connector
-
Start Apache Pulsar in standalone mode:
bin/pulsar standalone
-
In a new terminal, check that the DataStax Pulsar connector is running:
curl -s http://localhost:8080/admin/v2/functions/connectors
Make sure the response includes the DataStax Pulsar connector:
[{"name":"cassandra-enhanced","description":"A DataStax Pulsar Sink to load records from Pulsar topics to Apache Cassandra(R) or DataStax Enterprise(DSE)\n","sinkClass":"com.datastax.oss.sink.pulsar.RecordCassandraSinkTask"}]
-
Create a Pulsar sink:
bin/pulsar-admin sinks create \ --name dse-sink-kv \ --classname com.datastax.oss.sink.pulsar.StringCassandraSinkTask \ --sink-config-file config/qs.yml \ --sink-type cassandra-enhanced \ --tenant public \ --namespace default \ --inputs "persistent://public/default/example_topic"
The topic name and mapping are set in the connector configuration YAML file.
-
Send some messages to your new sink:
bin/pulsar-client produce -k "Message 1" -m "Content 1" persistent://public/default/example_topic bin/pulsar-client produce -k "Message 2" -m "Content 2" persistent://public/default/example_topic bin/pulsar-client produce -k "Message 3" -m "Content 3" persistent://public/default/example_topic
-
To verify that the messages were written to DSE, start
cqlsh
, and then query yourpulsar_kv
table:cqlsh> SELECT * FROM pulsar_qs.pulsar_kv;
The result should include the test messages you sent:
key | content -----------+----------- Message 3 | Content 3 Message 2 | Content 2 Message 1 | Content 1 (3 rows)
Next steps
Your schemas probably require more complex mappings than the example used in this quickstart. Explore the following documentation to get started with more complex mappings and schemas:
-
Print messages to determine the data structure of the topic messages.
-
Create key-value pair mappings, JSON mappings, and Avro mappings.
-
Try advanced and specialized mapping functionality, such as mapping UDTs.
-
Use one connector instance to ingest a single topic into multiple tables or ingest multiple topics to different tables.