Cycling comments example
Step-by-step implementation for test or demonstration environments running Apache Kafka and the target database on the same system.
Before you begin
This example assumes that DSE and Apache Kafka™ are running on the localhost. Before you begin, install DSE version 5.0 or later on the same system where you plan set up the tutorial. See Installing DataStax Enterprise 6.8.
The cycling comments provides a simple example of mapping a topic to a database table. The example assumes that Apache Kafka and the supported database are located on the same server.
The example uses a Kafka topic cyclingComments
that contains records with
key-value pairs which are mapped to columns in the DSE table
cycling.comments
.
Setting up the cycling comments topics on Kafka
Install DataStax Apache Kafka Connector and configure the cycling.comments topic.
CyclingComments
and the cycling.comments
table.
Configure DataStax Apache Kafka Connector and register it
with a running worker. -
Make directories for the DataStax Apache Kafka Connector and Apache Kafka
distributions.
mkdir kafka && mkdir dseconnectortmp &
- Download and extract Apache Kafka from https://kafka.apache.org/quickstart:
- Download and extract the latest DataStax Apache Kafka Connector from the DataStax downloads site. The current version is 1.4.0.
-
Copy the DataStax Connector JAR to the Kafka plug-in directory.
mv dseconnectortmp/kafka-connect-cassandra-sink-1.4.0.jar kafka/libs/ &
-
In the kafka/config/connector-distributed.properties file,
set the
plugin.path
to the fully qualified path to the DataStax Apache Kafka Connector JAR file. -
To ease the setup of this tutorial, make a log directory and allow access to all.
sudo -u root mkdir /var/log/kafka && sudo -u root chmod 777 /var/log/kafka &
-
Start Zookeeper and Kafka server with the default configuration:
- Start
Zookeeper:
kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties &> /var/log/kafka/zookeeper_start.log &
- Start Kafka:
kafka/bin/kafka-server-start.sh kafka/config/server.properties &> /var/log/kafka/kafka_start.log &
- Start
Zookeeper:
-
Configure the Kafka distributed worker key and value converters to match the
type of the example data (string).
-
Start the worker:
kafka/bin/connect-distributed.sh \ kafka/config/connect-distributed.properties &> worker.log &
To verify that the worker is running, use the following command:ps auwx | grep ConnectDistributed
-
Create the
CyclingComments
topic:kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic CyclingComments &
Setting up CQL cycling keyspace and comments table
Create a cycling keyspace with comments table on the DataStax Enterprise database.
Create a cycling keyspace with comments table on the DataStax Enterprise database
using CQL shell (cqlsh
).
-
Open
cqlsh
:cqlsh
-
Create the keyspace:
CREATE KEYSPACE cycling WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
-
Create the table:
CREATE TABLE cycling.comments ( id UUID, created_at TIMESTAMP, comment TEXT, commenter TEXT, record_id TIMEUUID, PRIMARY KEY (id, created_at)) WITH CLUSTERING ORDER BY (created_at DESC);
Setting up DataStax Connector
Copy and paste the DataStax Connector configuration from this example into a file on the Kafka server.
-
In the Kafka plug-in configuration directory, create a
cycling-comments-sink.json configuration file:
touch kafka/config/cycling-comments-sink.json
-
Open the cycling-comments-sink.json file for editing. Copy
the following JSON configuration and paste it into the file:
{ "name": "cycling-comments-sink", "config": { "connector.class": "com.datastax.kafkaconnector.DseSinkConnector", "tasks.max": "1", "topics": "CyclingComments", "topic.CyclingComments.cycling.comments.mapping": "record_id=value.rid,id=value.id,commenter=value.author,comment=value.comment,created_at=value.created_at" } }
- Save and exit the file.
-
Register the cycling comments example configuration with the distributed
worker.
nohup curl -X POST -H "Content-Type: application/json" \ -d @kafka/config/cycling-comments-sink.json \ http://localhost:8083/connectors &>/dev/null &
-
Verify that the connector is running:
curl -X GET "http://127.0.0.1:8083/connectors/cycling-comments-sink/status"
Inserting data from a JSON file into the Kafka topic
Use the Apache Kafka producer to stream data into the cyclingComments topic.
Use the Apache Kafka producer to stream data into the
cyclingComments
topic.
-
Create a directory for the example data:
mkdir kafka_examples
-
Create a file for the example data:
touch kafka_examples/data_all.json
-
Open the data_all.json file for editting. Copy the
following data and paste it into the file.
{"id": "e7ae5cf3-d358-4d99-b900-85902fda9bb0", "created_at": "2017-04-01 14:33:02.160Z", "comment": "LATE RIDERS SHOULD NOT DELAY THE START", "author": "Alex", "rid": "22d496d1-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "e7ae5cf3-d358-4d99-b900-85902fda9bb0", "created_at": "2017-03-21 21:11:09.999Z", "comment": "Second rest stop was out of water", "author": "Alex", "rid": "22d38561-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "e7ae5cf3-d358-4d99-b900-85902fda9bb0", "created_at": "2017-02-14 20:43:20.234Z", "comment": "Raining too hard should have postponed", "author": "Alex", "rid": "22d225d1-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "e7ae5cf3-d358-4d99-b900-85902fda9bb0", "created_at": "2017-02-14 20:43:20.000Z", "comment": "Raining too hard should have postponed", "author": "Alex", "rid": "22d0c640-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "c7fceba0-c141-4207-9494-a29f9809de6f", "created_at": "2018-10-13 20:11:14.503Z", "comment": "The gift certificate for winning was the best", "author": "Amy", "rid": "22d61d71-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "c7fceba0-c141-4207-9494-a29f9809de6f", "created_at": "2017-04-01 13:43:08.030Z", "comment": "Last climb was a killer", "author": "Amy", "rid": "22da1511-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "c7fceba0-c141-4207-9494-a29f9809de6f", "created_at": "2017-03-22 01:16:59.001Z", "comment": "Great snacks at all reststops", "author": "Amy", "rid": "22d8dc91-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "c7fceba0-c141-4207-9494-a29f9809de6f", "created_at": "2017-02-17 08:43:20.234Z", "comment": "Glad you ran the race in the rain", "author": "Amy", "rid": "22d755f1-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "8566eb59-07df-43b1-a21b-666a3c08c08a", "created_at": "2018-10-13 20:11:14.536Z", "comment": "Fastest womens time ever way to go amy!", "author": "Maryanne", "rid": "22db4d90-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "8566eb59-07df-43b1-a21b-666a3c08c08a", "created_at": "2017-04-14 11:16:52.009Z", "comment": "Not bad for a flatlander", "author": "Maryanne", "rid": "22de81e1-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "8566eb59-07df-43b1-a21b-666a3c08c08a", "created_at": "2017-03-20 21:45:10.101Z", "comment": "Saggers really rocked it", "author": "Maryanne", "rid": "22dd4961-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "8566eb59-07df-43b1-a21b-666a3c08c08a", "created_at": "2017-02-13 17:20:17.020Z", "comment": "Great race on a crappy day", "author": "Maryanne", "rid": "22dc5f01-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "fb372533-eb95-4bb4-8685-6ef61e994caa", "created_at": "2018-10-13 20:11:14.564Z", "comment": "Great course", "author": "Michael", "rid": "22df6c42-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "fb372533-eb95-4bb4-8685-6ef61e994caa", "created_at": "2017-04-07 19:21:14.001Z", "comment": "Thanks for waiting for me!", "author": "Michael", "rid": "22e40021-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "fb372533-eb95-4bb4-8685-6ef61e994caa", "created_at": "2017-03-22 09:19:44.060Z", "comment": "Awesome race glad you held it anyway", "author": "Michael", "rid": "22e2eeb1-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "fb372533-eb95-4bb4-8685-6ef61e994caa", "created_at": "2017-03-17 03:43:01.030Z", "comment": "Getting read for the race", "author": "Michael", "rid": "22e18f21-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "fb372533-eb95-4bb4-8685-6ef61e994caa", "created_at": "2017-02-16 02:22:11.000Z", "comment": "Some entries complain a lot", "author": "Michael", "rid": "22e07db1-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "9011d3be-d35c-4a8d-83f7-a3c543789ee7", "created_at": "2018-10-13 20:11:14.601Z", "comment": "Can't wait for the next race", "author": "Katarzyna", "rid": "22e51192-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "9011d3be-d35c-4a8d-83f7-a3c543789ee7", "created_at": "2017-01-01 17:20:17.020Z", "comment": "Gearing up for the seaon", "author": "Katarzyna", "rid": "22e64a11-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "5b6962dd-3f90-4c93-8f61-eabfa4a803e2", "created_at": "2018-10-13 20:11:14.621Z", "comment": "Thanks for all your hard work", "author": "Marianne", "rid": "22e81ed2-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "220844bf-4860-49d6-9a4b-6b5d3a79cbfb", "created_at": "2018-10-13 20:11:14.627Z", "comment": "A for effort!", "author": "Paolo", "rid": "22e90932-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "c4b65263-fe58-4846-83e8-f0e1c13d518f", "created_at": "2018-10-13 20:11:14.633Z", "comment": "Closing ceremony was a little lame", "author": "Rossella", "rid": "22e9f392-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "38ab64b6-26cc-4de9-ab28-c257cf011659", "created_at": "2018-10-13 20:11:14.641Z", "comment": "Next time guys!", "author": "Marcia", "rid": "22eb2c12-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "38ab64b6-26cc-4de9-ab28-c257cf011659", "created_at": "2017-02-11 14:09:56.000Z", "comment": "First race was amazing, can't wait for more", "author": "Marcia", "rid": "22ec3d81-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "6ab09bec-e68e-48d9-a5f8-97e6fb4c9b47", "created_at": "2018-10-13 20:11:14.655Z", "comment": "So many great races thanks y'all", "author": "Steven", "rid": "22ed4ef2-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "6ab09bec-e68e-48d9-a5f8-97e6fb4c9b47", "created_at": "2017-04-05 12:01:00.003Z", "comment": "Bike damaged in transit bummer", "author": "Steven", "rid": "234ab131-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "6ab09bec-e68e-48d9-a5f8-97e6fb4c9b47", "created_at": "2017-02-02 01:49:00.020Z", "comment": "Best of luck everybody I can't make it", "author": "Steven", "rid": "23499fc1-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "e7cd5752-bc0d-4157-a80f-7523add8dbcd", "created_at": "2018-10-13 20:11:15.273Z", "comment": "Go team, you rocked it", "author": "Anna", "rid": "234bc2a0-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "6d5f1663-89c0-45fc-8cfd-60a373b01622", "created_at": "2018-10-13 20:11:15.280Z", "comment": "Next year the tour of california!", "author": "Melissa", "rid": "234cad02-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "95addc4c-459e-4ed7-b4b5-472f19a67995", "created_at": "2018-10-13 20:11:15.286Z", "comment": "Next year for sure!", "author": "Vera", "rid": "234d9762-cf24-11e8-a84b-2b44b2d77e7c"} {"id": "95addc4c-459e-4ed7-b4b5-472f19a67995", "created_at": "2017-02-13 17:40:16.123Z", "comment": "I can do without the rain@@@@", "author": "Vera", "rid": "25f33bf1-cf24-11e8-a84b-2b44b2d77e7c"}
- Save the changes and close the file.
-
Load the cycling comments data using the Kafka producer:
kafka/bin/kafka-console-producer.sh \ --broker-list localhost:9092 \ --topic CyclingComments < kafka_examples/data_all.json &
Verifying records processed and writes
Ensure that the tutorial data was received by Kafka and records were processed by the connector.
Ensure that the tutorial data was received by Apache Kafka™ and records were processed by the DataStax Apache Kafka Connector.
-
Get a list of all messages in the
cyclingComments
topic:kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \ --topic CyclingComments --from-beginning > ~/CyclingComments-records.log &
-
Count the number of lines, which is equal to the number of records:
cat ~/CyclingComments-records.log | wc -l
-
Compare it to the number of records written to the database table
comments
:dsbulk count -k cycling -t comments
For details about DataStax Bulk Loader, including download instructions and information about using dsbulk commands, start by reading the installation topic.