Cycling comments example

Step-by-step implementation for test or demonstration environments running Apache Kafka and the target database on the same system.

Before you begin

This example assumes that DSE and Apache Kafka are running on the localhost. Before you begin, install DSE version 5.0 or later on the same system where you plan set up the tutorial. See Installing DataStax Enterprise 6.8.

The cycling comments provides a simple example of mapping a topic to a database table. The example assumes that Apache Kafka and the supported database are located on the same server.

The example uses a Kafka topic cyclingComments that contains records with key-value pairs which are mapped to columns in the DSE table cycling.comments.

Setting up the cycling comments topics on Kafka

Install DataStax Apache Kafka Connector and configure the cycling.comments topic.

Install DataStax Enterprise (DSE), Apache Kafka, and DataStax Apache Kafka Connector on the same system in a test or development environment. Set up a Kafka topic CyclingComments and the cycling.comments table. Configure DataStax Apache Kafka Connector and register it with a running worker.
Tip: For additional Kafka Connector examples, we recommend that you also visit our GitHub site. Feel free to clone and modify any of the examples for your own purposes. There is no warranty or implied official support. However, the examples are a great starting point to show you various ways of using the DataStax Apache Kafka Connector.

  1. Make directories for the DataStax Apache Kafka Connector and Apache Kafka distributions.
    mkdir kafka && mkdir dseconnectortmp &
  2. Download and extract Apache Kafka from https://kafka.apache.org/quickstart:
  3. Download and extract the latest DataStax Apache Kafka Connector from the DataStax downloads site. The current version is 1.4.0.
  4. Copy the DataStax Connector JAR to the Kafka plug-in directory.
    mv dseconnectortmp/kafka-connect-cassandra-sink-1.4.0.jar kafka/libs/ &
  5. In the kafka/config/connector-distributed.properties file, set the plugin.path to the fully qualified path to the DataStax Apache Kafka Connector JAR file.
  6. To ease the setup of this tutorial, make a log directory and allow access to all.
    sudo -u root mkdir /var/log/kafka &&
    sudo -u root chmod 777 /var/log/kafka &
  7. Start Zookeeper and Kafka server with the default configuration:
    • Start Zookeeper:
      kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties &> /var/log/kafka/zookeeper_start.log &
    • Start Kafka:
      kafka/bin/kafka-server-start.sh kafka/config/server.properties &> /var/log/kafka/kafka_start.log &
  8. Configure the Kafka distributed worker key and value converters to match the type of the example data (string).
    1. Comment out the default converters and converter schema settings in the connect-distributed.properties file:
      sed -e '/key\.converter\=/s/^/#/g' -i kafka/config/connect-distributed.properties &
      sed -e '/value\.converter\=/s/^/#/g' -i kafka/config/connect-distributed.properties &
      sed -e '/key\.converter\.schemas\.enable\=/s/^/#/g' -i kafka/config/connect-distributed.properties &
      sed -e '/value\.converter\.schemas\.enable\=/s/^/#/g' -i kafka/config/connect-distributed.properties &
    2. Add the string converter settings to the connect-distributed.properties file:
      echo 'key.converter=org.apache.kafka.connect.storage.StringConverter' >> kafka/config/connect-distributed.properties &
      echo 'value.converter=org.apache.kafka.connect.storage.StringConverter' >> kafka/config/connect-distributed.properties &
      echo 'key.converter.schemas.enable=false' >> kafka/config/connect-distributed.properties &
      echo 'value.converter.schemas.enable=false' >> kafka/config/connect-distributed.properties &
  9. Start the worker:
    kafka/bin/connect-distributed.sh \
    kafka/config/connect-distributed.properties &> worker.log &
    To verify that the worker is running, use the following command:
    ps auwx | grep ConnectDistributed
  10. Create the CyclingComments topic:
    kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic CyclingComments &

Setting up CQL cycling keyspace and comments table

Create a cycling keyspace with comments table on the DataStax Enterprise database.

Create a cycling keyspace with comments table on the DataStax Enterprise database using CQL shell (cqlsh).

  1. Open cqlsh:
    cqlsh
  2. Create the keyspace:
    CREATE KEYSPACE cycling WITH 
    replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
  3. Create the table:
    CREATE TABLE cycling.comments (     
        id UUID,
        created_at TIMESTAMP, 
        comment TEXT,
        commenter TEXT,
        record_id TIMEUUID,
        PRIMARY KEY (id, created_at))
      WITH CLUSTERING ORDER BY (created_at DESC);

Setting up DataStax Connector

Copy and paste the DataStax Connector configuration from this example into a file on the Kafka server.

  1. In the Kafka plug-in configuration directory, create a cycling-comments-sink.json configuration file:
    touch kafka/config/cycling-comments-sink.json
  2. Open the cycling-comments-sink.json file for editing. Copy the following JSON configuration and paste it into the file:
    {
      "name": "cycling-comments-sink",
      "config": {
        "connector.class": "com.datastax.kafkaconnector.DseSinkConnector",
        "tasks.max": "1",
        "topics": "CyclingComments",
        "topic.CyclingComments.cycling.comments.mapping": 
             "record_id=value.rid,id=value.id,commenter=value.author,comment=value.comment,created_at=value.created_at"
      }
    }
  3. Save and exit the file.
  4. Register the cycling comments example configuration with the distributed worker.
    nohup curl -X POST -H "Content-Type: application/json" \
    -d @kafka/config/cycling-comments-sink.json \
    http://localhost:8083/connectors &>/dev/null &
  5. Verify that the connector is running:
    curl -X GET "http://127.0.0.1:8083/connectors/cycling-comments-sink/status"

Inserting data from a JSON file into the Kafka topic

Use the Apache Kafka producer to stream data into the cyclingComments topic.

Use the Apache Kafka producer to stream data into the cyclingComments topic.

  1. Create a directory for the example data:
    mkdir kafka_examples
  2. Create a file for the example data:
    touch kafka_examples/data_all.json
  3. Open the data_all.json file for editting. Copy the following data and paste it into the file.
    {"id": "e7ae5cf3-d358-4d99-b900-85902fda9bb0", "created_at": "2017-04-01 14:33:02.160Z", "comment": "LATE RIDERS SHOULD NOT DELAY THE START", "author": "Alex", "rid": "22d496d1-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "e7ae5cf3-d358-4d99-b900-85902fda9bb0", "created_at": "2017-03-21 21:11:09.999Z", "comment": "Second rest stop was out of water", "author": "Alex", "rid": "22d38561-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "e7ae5cf3-d358-4d99-b900-85902fda9bb0", "created_at": "2017-02-14 20:43:20.234Z", "comment": "Raining too hard should have postponed", "author": "Alex", "rid": "22d225d1-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "e7ae5cf3-d358-4d99-b900-85902fda9bb0", "created_at": "2017-02-14 20:43:20.000Z", "comment": "Raining too hard should have postponed", "author": "Alex", "rid": "22d0c640-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "c7fceba0-c141-4207-9494-a29f9809de6f", "created_at": "2018-10-13 20:11:14.503Z", "comment": "The gift certificate for winning was the best", "author": "Amy", "rid": "22d61d71-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "c7fceba0-c141-4207-9494-a29f9809de6f", "created_at": "2017-04-01 13:43:08.030Z", "comment": "Last climb was a killer", "author": "Amy", "rid": "22da1511-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "c7fceba0-c141-4207-9494-a29f9809de6f", "created_at": "2017-03-22 01:16:59.001Z", "comment": "Great snacks at all reststops", "author": "Amy", "rid": "22d8dc91-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "c7fceba0-c141-4207-9494-a29f9809de6f", "created_at": "2017-02-17 08:43:20.234Z", "comment": "Glad you ran the race in the rain", "author": "Amy", "rid": "22d755f1-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "8566eb59-07df-43b1-a21b-666a3c08c08a", "created_at": "2018-10-13 20:11:14.536Z", "comment": "Fastest womens time ever way to go amy!", "author": "Maryanne", "rid": "22db4d90-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "8566eb59-07df-43b1-a21b-666a3c08c08a", "created_at": "2017-04-14 11:16:52.009Z", "comment": "Not bad for a flatlander", "author": "Maryanne", "rid": "22de81e1-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "8566eb59-07df-43b1-a21b-666a3c08c08a", "created_at": "2017-03-20 21:45:10.101Z", "comment": "Saggers really rocked it", "author": "Maryanne", "rid": "22dd4961-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "8566eb59-07df-43b1-a21b-666a3c08c08a", "created_at": "2017-02-13 17:20:17.020Z", "comment": "Great race on a crappy day", "author": "Maryanne", "rid": "22dc5f01-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "fb372533-eb95-4bb4-8685-6ef61e994caa", "created_at": "2018-10-13 20:11:14.564Z", "comment": "Great course", "author": "Michael", "rid": "22df6c42-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "fb372533-eb95-4bb4-8685-6ef61e994caa", "created_at": "2017-04-07 19:21:14.001Z", "comment": "Thanks for waiting for me!", "author": "Michael", "rid": "22e40021-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "fb372533-eb95-4bb4-8685-6ef61e994caa", "created_at": "2017-03-22 09:19:44.060Z", "comment": "Awesome race glad you held it anyway", "author": "Michael", "rid": "22e2eeb1-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "fb372533-eb95-4bb4-8685-6ef61e994caa", "created_at": "2017-03-17 03:43:01.030Z", "comment": "Getting read for the race", "author": "Michael", "rid": "22e18f21-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "fb372533-eb95-4bb4-8685-6ef61e994caa", "created_at": "2017-02-16 02:22:11.000Z", "comment": "Some entries complain a lot", "author": "Michael", "rid": "22e07db1-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "9011d3be-d35c-4a8d-83f7-a3c543789ee7", "created_at": "2018-10-13 20:11:14.601Z", "comment": "Can't wait for the next race", "author": "Katarzyna", "rid": "22e51192-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "9011d3be-d35c-4a8d-83f7-a3c543789ee7", "created_at": "2017-01-01 17:20:17.020Z", "comment": "Gearing up for the seaon", "author": "Katarzyna", "rid": "22e64a11-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "5b6962dd-3f90-4c93-8f61-eabfa4a803e2", "created_at": "2018-10-13 20:11:14.621Z", "comment": "Thanks for all your hard work", "author": "Marianne", "rid": "22e81ed2-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "220844bf-4860-49d6-9a4b-6b5d3a79cbfb", "created_at": "2018-10-13 20:11:14.627Z", "comment": "A for effort!", "author": "Paolo", "rid": "22e90932-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "c4b65263-fe58-4846-83e8-f0e1c13d518f", "created_at": "2018-10-13 20:11:14.633Z", "comment": "Closing ceremony was a little lame", "author": "Rossella", "rid": "22e9f392-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "38ab64b6-26cc-4de9-ab28-c257cf011659", "created_at": "2018-10-13 20:11:14.641Z", "comment": "Next time guys!", "author": "Marcia", "rid": "22eb2c12-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "38ab64b6-26cc-4de9-ab28-c257cf011659", "created_at": "2017-02-11 14:09:56.000Z", "comment": "First race was amazing, can't wait for more", "author": "Marcia", "rid": "22ec3d81-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "6ab09bec-e68e-48d9-a5f8-97e6fb4c9b47", "created_at": "2018-10-13 20:11:14.655Z", "comment": "So many great races thanks y'all", "author": "Steven", "rid": "22ed4ef2-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "6ab09bec-e68e-48d9-a5f8-97e6fb4c9b47", "created_at": "2017-04-05 12:01:00.003Z", "comment": "Bike damaged in transit bummer", "author": "Steven", "rid": "234ab131-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "6ab09bec-e68e-48d9-a5f8-97e6fb4c9b47", "created_at": "2017-02-02 01:49:00.020Z", "comment": "Best of luck everybody I can't make it", "author": "Steven", "rid": "23499fc1-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "e7cd5752-bc0d-4157-a80f-7523add8dbcd", "created_at": "2018-10-13 20:11:15.273Z", "comment": "Go team, you rocked it", "author": "Anna", "rid": "234bc2a0-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "6d5f1663-89c0-45fc-8cfd-60a373b01622", "created_at": "2018-10-13 20:11:15.280Z", "comment": "Next year the tour of california!", "author": "Melissa", "rid": "234cad02-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "95addc4c-459e-4ed7-b4b5-472f19a67995", "created_at": "2018-10-13 20:11:15.286Z", "comment": "Next year for sure!", "author": "Vera", "rid": "234d9762-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "95addc4c-459e-4ed7-b4b5-472f19a67995", "created_at": "2017-02-13 17:40:16.123Z", "comment": "I can do without the rain@@@@", "author": "Vera", "rid": "25f33bf1-cf24-11e8-a84b-2b44b2d77e7c"}
  4. Save the changes and close the file.
  5. Load the cycling comments data using the Kafka producer:
    kafka/bin/kafka-console-producer.sh \
    --broker-list localhost:9092 \
    --topic CyclingComments < kafka_examples/data_all.json &

Verifying records processed and writes

Ensure that the tutorial data was received by Kafka and records were processed by the connector.

Ensure that the tutorial data was received by Apache Kafka and records were processed by the DataStax Apache Kafka Connector.

  1. Get a list of all messages in the cyclingComments topic:
    kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic CyclingComments --from-beginning > ~/CyclingComments-records.log &
  2. Count the number of lines, which is equal to the number of records:
    cat ~/CyclingComments-records.log | wc -l
  3. Compare it to the number of records written to the database table comments:
    dsbulk count -k cycling -t comments

    For details about DataStax Bulk Loader, including download instructions and information about using dsbulk commands, start by reading the installation topic.