Pulsar Connector single instance quickstart
This quickstart will walk you through the simplest possible configuration connecting a standalone Apache Pulsar™ instance to a single standalone DataStax Enterprise (DSE) 6.8 node.
Once you’ve configured Pulsar, a Pulsar sink, the DataStax Apache Pulsar™ Connector and the DSE instance, you’ll be able to send simple key/value pair messages using the Pulsar client utility, pulsar-client
and retrieve them from a DSE keyspace and table.
Install and configure DSE 6.8
-
Install the tarball distribution of DSE 6.8 as described in Installing DataStax Enterprise 6.8 using the binary tarball.
-
Start DSE from the installation directory:
bin/dse cassandra
-
Verify that DataStax Enterprise is running from the installation directory:
bin/nodetool status Datacenter: Cassandra ===================== Status=Up/Down |/ State=Normal/Leaving/Joining/Moving/Stopped -- Address Load Owns (effective) Host ID Token Rack UN 127.0.0.1 152.24 KiB 100.0% <host-id> <token> rack1
-
Start
cqlsh
:bin/cqlsh Connected to Test Cluster at 127.0.0.1:9042. [cqlsh 6.8.0 | DSE 6.8.9 | CQL spec 3.4.5 | DSE protocol v2] Use HELP for help. cqlsh>
-
Create a new keyspace,
pulsar_qs
:cqlsh> CREATE KEYSPACE IF NOT EXISTS pulsar_qs WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
-
Create a new table,
pulsar_kv
in the keyspace to hold the Pulsar message content:cqlsh> CREATE TABLE pulsar_qs.pulsar_kv ( key text PRIMARY KEY, content text );
Install and configure Apache Pulsar
-
Download Apache Pulsar:
wget https://archive.apache.org/dist/pulsar/pulsar-2.7.0/apache-pulsar-2.7.0-bin.tar.gz
-
Untar the Pulsar binary:
tar xvfz apache-pulsar-2.7.0-bin.tar.gz
-
Download the DataStax Apache Pulsar Connector tar file from the DataStax downloads site. If you agree, enable the Terms checkbox and click the download icon.
-
Extract the files:
tar zxf cassandra-enhanced-pulsar-sink-1.4.0.tar.gz
-
Create a
connectors
directory in the Pulsar home directory if it doesn’t exist. -
Copy the DataStax connector NAR to the Pulsar
connectors
directory:cp installation_location/cassandra-enhanced-pulsar-sink-1.4.0.nar pulsar_home/connectors
-
Start Apache Pulsar in standalone mode:
bin/pulsar standalone
-
Open a new terminal and check that the DataStax Apache Pulsar Connector is running:
curl -s http://localhost:8080/admin/v2/functions/connectors [{"name":"cassandra-enhanced","description":"A DataStax Pulsar Sink to load records from Pulsar topics to Apache Cassandra(R) or DataStax Enterprise(DSE)\n","sinkClass":"com.datastax.oss.sink.pulsar.RecordCassandraSinkTask"}]%
Configure the DataStax Apache Pulsar Connector
-
Save following YAML contents to the Pulsar configuration directory,
conf
asqs.yml
:configs: verbose: false batchSize: 3000 batchFlushTimeoutMs: 1000 topics: example_topic contactPoints: localhost loadBalancing.localDc: Cassandra port: 9042 cloud.secureConnectBundle: ignoreErrors: None maxConcurrentRequests: 500 maxNumberOfRecordsInBatch: 32 queryExecutionTimeout: 30 connectionPoolLocalSize: 4 jmx: true compression: None auth: provider: None username: password: gssapi: keyTab: principal: service: dse ssl: provider: None hostnameValidation: true keystore: password: path: openssl: keyCertChain: privateKey: truststore: password: path: cipherSuites: topic: example_topic: pulsar_qs: pulsar_kv: mapping: 'key=key,content=value' consistencyLevel: LOCAL_ONE ttl: -1 ttlTimeUnit : SECONDS timestampTimeUnit : MICROSECONDS nullToUnset: true deletesEnabled: true codec: locale: en_US timeZone: UTC timestamp: CQL_TIMESTAMP date: ISO_LOCAL_DATE time: ISO_LOCAL_TIME unit: MILLISECONDS
-
Create a new Pulsar sink:
bin/pulsar-admin sinks create \ --name dse-sink-kv \ --classname com.datastax.oss.sink.pulsar.StringCassandraSinkTask \ --sink-config-file conf/qs.yml \ --sink-type cassandra-enhanced \ --tenant public \ --namespace default \ --inputs "persistent://public/default/example_topic" "Created successfully"
-
Send some messages to DSE:
bin/pulsar-client produce -k "Message 1" -m "Content 1" persistent://public/default/example_topic bin/pulsar-client produce -k "Message 2" -m "Content 2" persistent://public/default/example_topic bin/pulsar-client produce -k "Message 3" -m "Content 3" persistent://public/default/example_topic
-
Start cqlsh and view the messages in the
pulsar_kv
table:cqlsh> SELECT * FROM pulsar_qs.pulsar_kv; key | content -----------+----------- Message 3 | Content 3 Message 2 | Content 2 Message 1 | Content 1 (3 rows)
Where to go from here…
Now that you’ve run through a simple end-to-end configuration, you can start experimenting with more complicated Pulsar mappings and DSE schemas. For more details, see the following topics:
-
Determining topic data structure Display messages to determine the data structure of the topic messages.
-
Mapping basic messages to table columns Create a topic-table map for Pulsar messages that only contain a key and value in each record.
-
Mapping a message that contain JSON fields For JSON fields, map individual fields in the structure to columns.
-
Mapping Avro messages Supports mapping individual fields from a Avro format field.
-
Extract Pulsar record header values Extract values from Pulsar record header and write to the database table.
-
Mapping messages to table that has a User Defined Type Write complex types directly into User-defined Types (UDT).
-
Mapping a topic to multiple tables Ingest a single topic into multiple tables using a single connector instance.
-
Multiple topics to multiple tables Ingest multiple topics and write to different tables using a single connector instance.
-
Selectively update maps and UDTs based on Pulsar fields Selectively update maps and UDTs based on Pulsar fields.
-
Provide CQL queries in mappings Provide CQL queries when new record arrives on the Pulsar topic.
-
The now() function in mappings You can use the now() function in mappings.