Cycling comments example

Step-by-step implementation for test or demonstration environments running Apache Kafka and DataStax database on the same system.

The cycling comments provides a simple example of mapping a topic to table. The example assumes that Apache Kafka and the DataStax database are located on the same server.

The example uses a Kafka topic cyclingComments that contains records with key-value pairs which are mapped to columns in the DataStax Enterprise table cycling.comments.

Before you begin

This example assumes that both DataStax Enterprise (DSE) and Apache Kafka™ are running on the localhost. Before you begin install DataStax Enterprise (DSE) version 5.0 or later on the same system where you plan set up the tutorial. See Installing DataStax Enterprise 6.7.

Setting up the cycling comments topics on Kafka

Install DataStax Apache Kafka Connector and configure the cycling comments topic.

Install DataStax Enterprise (DSE), Apache Kafka, and DataStax Apache Kafka™ Connector on the same system in a test or development environment and set up a Kafka topic CyclingComments and DataStax database table cycling.comments, then configure DataStax Connector and register it with a running worker.

  1. Make directories for the DataStax Connector and Apache Kafka distributions.
    mkdir kafka && mkdir dseconnectortmp &
  2. Download and extract Apache Kafka:
    curl http://apache.spinellicreations.com/kafka/2.0.0/kafka_2.11-2.0.0.tgz \
    | tar xz -C kafka --strip-components=1 && wait
  3. Download and extract the DataStax Apache Kafka Connector from the DataStax Academy using your download key:
    curl --user username:downloadkey -L \
    https://downloads.datastax.com/kafka/kafka-connect-dse-1.0.0.tar.gz | \
    tar xz -C dseconnectortmp --strip-components=1 && wait
    Replace username and downloadkey with your own.
  4. Copy the DataStax Connector JAR to the Kafka plug-in directory.
    mv dseconnectortmp/kafka-connect-dse-1.0.0.jar kafka/libs/ &
  5. In the kafka/config/connector-distributed.properties file, set the plugin.path to the fully qualified path to the DataStax Connector JAR file.
  6. Make a log directory and allow access to all (to ease setup of this example).
    sudo -u root mkdir /var/log/kafka &&
    sudo -u root chmod 777 /var/log/kafka &
  7. Start Zookeeper and Kafka server with the default configuration:
    • Start Zookeeper:
      kafka/bin/zookeeper-server-start.sh kafka/config/zookeeper.properties &> /var/log/kafka/zookeeper_start.log &
    • Start Kafka:
      kafka/bin/kafka-server-start.sh kafka/config/server.properties &> /var/log/kafka/kafka_start.log &
  8. Configure the Kafka distributed worker key and value converters to match the type of the example data (string).
    1. Comment out the default converters and converter schema settings in the connect-distributed.properties file:
      sed -e '/key\.converter\=/s/^/#/g' -i kafka/config/connect-distributed.properties &
      sed -e '/value\.converter\=/s/^/#/g' -i kafka/config/connect-distributed.properties & 
      sed -e '/key\.converter\.schemas\.enable\=/s/^/#/g' -i kafka/config/connect-distributed.properties &
      sed -e '/value\.converter\.schemas\.enable\=/s/^/#/g' -i kafka/config/connect-distributed.properties &
    2. Add the string converter settings to the connect-distributed.properties file:
      echo 'key.converter=org.apache.kafka.connect.storage.StringConverter' >> kafka/config/connect-distributed.properties &
      echo 'value.converter=org.apache.kafka.connect.storage.StringConverter' >> kafka/config/connect-distributed.properties &
      echo 'key.converter.schemas.enable=false' >> kafka/config/connect-distributed.properties & 
      echo 'value.converter.schemas.enable=false' >> kafka/config/connect-distributed.properties &
  9. Start the worker:
    kafka/bin/connect-distributed.sh \
    kafka/config/connect-distributed.properties &> worker.log &
    To verify that the worker is running, use the following command:
    ps auwx | grep ConnectDistributed
  10. Create the CyclingComments topic:
    kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 \
    --replication-factor 1 \
    --partitions 1 \
    --topic CyclingComments &

Setting up CQL cycling keyspace and comments table

Create a cycling keyspace with comments table on the DataStax Enterprise database.

Create a cycling keyspace with comments table on the DataStax Enterprise database using CQL shell (cqlsh).

  1. Open cqlsh:
    cqlsh
  2. Create the keyspace:
    CREATE KEYSPACE cycling WITH 
    replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
  3. Create the table:
    CREATE TABLE cycling.comments (     
        id UUID,
        created_at TIMESTAMP, 
        comment TEXT,
        commenter TEXT,
        record_id TIMEUUID,
        PRIMARY KEY (id, created_at))
      WITH CLUSTERING ORDER BY (created_at DESC);

Setting up DataStax Connector

Copy and paste the DataStax Connector configuration from this example into a file on the Kafka server.

  1. In the Kafka plug-in configuration directory, create a cycling-comments-sink.json configuration file:
    touch kafka/config/cycling-comments-sink.json
  2. Open the cycling-comments-sink.json file for editing. Copy the following JSON configuration and paste it into the file:
    {
      "name": "cycling-comments-sink",
      "config": {
        "connector.class": "com.datastax.kafkaconnector.DseSinkConnector",
        "tasks.max": "1",
        "topics": "CyclingComments",
        "topic.CyclingComments.cycling.comments.mapping": 
             "record_id=value.rid,id=value.id,commenter=value.author,comment=value.comment,created_at=value.created_at"
      }
    }
  3. Save and exit the file.
  4. Register the cycling comments example configuration with the distributed worker.
    nohup curl -X POST -H "Content-Type: application/json" \
    -d @kafka/config/cycling-comments-sink.json \
    http://localhost:8083/connectors &>/dev/null &
  5. Verify that the connector is running:
    curl -X GET "http://127.0.0.1:8083/connectors/cycling-comments-sink/status"

Inserting data from a JSON file into the Kafka topic

Use the Apache Kafka producer to stream data into the cyclingComments topic.

Use the Apache Kafka producer to stream data into the cyclingComments topic.

  1. Create a directory for the example data:
    mkdir kafka_examples
  2. Create a file for the example data:
    touch kafka_examples/data_all.json
  3. Open the data_all.json file for editting. Copy the following data and paste it into the file.
    {"id": "e7ae5cf3-d358-4d99-b900-85902fda9bb0", "created_at": "2017-04-01 14:33:02.160Z", "comment": "LATE RIDERS SHOULD NOT DELAY THE START", "author": "Alex", "rid": "22d496d1-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "e7ae5cf3-d358-4d99-b900-85902fda9bb0", "created_at": "2017-03-21 21:11:09.999Z", "comment": "Second rest stop was out of water", "author": "Alex", "rid": "22d38561-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "e7ae5cf3-d358-4d99-b900-85902fda9bb0", "created_at": "2017-02-14 20:43:20.234Z", "comment": "Raining too hard should have postponed", "author": "Alex", "rid": "22d225d1-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "e7ae5cf3-d358-4d99-b900-85902fda9bb0", "created_at": "2017-02-14 20:43:20.000Z", "comment": "Raining too hard should have postponed", "author": "Alex", "rid": "22d0c640-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "c7fceba0-c141-4207-9494-a29f9809de6f", "created_at": "2018-10-13 20:11:14.503Z", "comment": "The gift certificate for winning was the best", "author": "Amy", "rid": "22d61d71-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "c7fceba0-c141-4207-9494-a29f9809de6f", "created_at": "2017-04-01 13:43:08.030Z", "comment": "Last climb was a killer", "author": "Amy", "rid": "22da1511-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "c7fceba0-c141-4207-9494-a29f9809de6f", "created_at": "2017-03-22 01:16:59.001Z", "comment": "Great snacks at all reststops", "author": "Amy", "rid": "22d8dc91-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "c7fceba0-c141-4207-9494-a29f9809de6f", "created_at": "2017-02-17 08:43:20.234Z", "comment": "Glad you ran the race in the rain", "author": "Amy", "rid": "22d755f1-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "8566eb59-07df-43b1-a21b-666a3c08c08a", "created_at": "2018-10-13 20:11:14.536Z", "comment": "Fastest womens time ever way to go amy!", "author": "Maryanne", "rid": "22db4d90-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "8566eb59-07df-43b1-a21b-666a3c08c08a", "created_at": "2017-04-14 11:16:52.009Z", "comment": "Not bad for a flatlander", "author": "Maryanne", "rid": "22de81e1-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "8566eb59-07df-43b1-a21b-666a3c08c08a", "created_at": "2017-03-20 21:45:10.101Z", "comment": "Saggers really rocked it", "author": "Maryanne", "rid": "22dd4961-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "8566eb59-07df-43b1-a21b-666a3c08c08a", "created_at": "2017-02-13 17:20:17.020Z", "comment": "Great race on a crappy day", "author": "Maryanne", "rid": "22dc5f01-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "fb372533-eb95-4bb4-8685-6ef61e994caa", "created_at": "2018-10-13 20:11:14.564Z", "comment": "Great course", "author": "Michael", "rid": "22df6c42-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "fb372533-eb95-4bb4-8685-6ef61e994caa", "created_at": "2017-04-07 19:21:14.001Z", "comment": "Thanks for waiting for me!", "author": "Michael", "rid": "22e40021-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "fb372533-eb95-4bb4-8685-6ef61e994caa", "created_at": "2017-03-22 09:19:44.060Z", "comment": "Awesome race glad you held it anyway", "author": "Michael", "rid": "22e2eeb1-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "fb372533-eb95-4bb4-8685-6ef61e994caa", "created_at": "2017-03-17 03:43:01.030Z", "comment": "Getting read for the race", "author": "Michael", "rid": "22e18f21-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "fb372533-eb95-4bb4-8685-6ef61e994caa", "created_at": "2017-02-16 02:22:11.000Z", "comment": "Some entries complain a lot", "author": "Michael", "rid": "22e07db1-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "9011d3be-d35c-4a8d-83f7-a3c543789ee7", "created_at": "2018-10-13 20:11:14.601Z", "comment": "Can't wait for the next race", "author": "Katarzyna", "rid": "22e51192-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "9011d3be-d35c-4a8d-83f7-a3c543789ee7", "created_at": "2017-01-01 17:20:17.020Z", "comment": "Gearing up for the seaon", "author": "Katarzyna", "rid": "22e64a11-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "5b6962dd-3f90-4c93-8f61-eabfa4a803e2", "created_at": "2018-10-13 20:11:14.621Z", "comment": "Thanks for all your hard work", "author": "Marianne", "rid": "22e81ed2-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "220844bf-4860-49d6-9a4b-6b5d3a79cbfb", "created_at": "2018-10-13 20:11:14.627Z", "comment": "A for effort!", "author": "Paolo", "rid": "22e90932-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "c4b65263-fe58-4846-83e8-f0e1c13d518f", "created_at": "2018-10-13 20:11:14.633Z", "comment": "Closing ceremony was a little lame", "author": "Rossella", "rid": "22e9f392-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "38ab64b6-26cc-4de9-ab28-c257cf011659", "created_at": "2018-10-13 20:11:14.641Z", "comment": "Next time guys!", "author": "Marcia", "rid": "22eb2c12-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "38ab64b6-26cc-4de9-ab28-c257cf011659", "created_at": "2017-02-11 14:09:56.000Z", "comment": "First race was amazing, can't wait for more", "author": "Marcia", "rid": "22ec3d81-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "6ab09bec-e68e-48d9-a5f8-97e6fb4c9b47", "created_at": "2018-10-13 20:11:14.655Z", "comment": "So many great races thanks y'all", "author": "Steven", "rid": "22ed4ef2-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "6ab09bec-e68e-48d9-a5f8-97e6fb4c9b47", "created_at": "2017-04-05 12:01:00.003Z", "comment": "Bike damaged in transit bummer", "author": "Steven", "rid": "234ab131-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "6ab09bec-e68e-48d9-a5f8-97e6fb4c9b47", "created_at": "2017-02-02 01:49:00.020Z", "comment": "Best of luck everybody I can't make it", "author": "Steven", "rid": "23499fc1-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "e7cd5752-bc0d-4157-a80f-7523add8dbcd", "created_at": "2018-10-13 20:11:15.273Z", "comment": "Go team, you rocked it", "author": "Anna", "rid": "234bc2a0-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "6d5f1663-89c0-45fc-8cfd-60a373b01622", "created_at": "2018-10-13 20:11:15.280Z", "comment": "Next year the tour of california!", "author": "Melissa", "rid": "234cad02-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "95addc4c-459e-4ed7-b4b5-472f19a67995", "created_at": "2018-10-13 20:11:15.286Z", "comment": "Next year for sure!", "author": "Vera", "rid": "234d9762-cf24-11e8-a84b-2b44b2d77e7c"}
    {"id": "95addc4c-459e-4ed7-b4b5-472f19a67995", "created_at": "2017-02-13 17:40:16.123Z", "comment": "I can do without the rain@@@@", "author": "Vera", "rid": "25f33bf1-cf24-11e8-a84b-2b44b2d77e7c"}
  4. Save the changes and close the file.
  5. Load the cycling comments data using the Kafka producer:
    kafka/bin/kafka-console-producer.sh \
    --broker-list localhost:9092 \
    --topic CyclingComments < kafka_examples/data_all.json &

Verifying records processed and writes

Ensure that the tutorial data was received by Kafka and records were processed by the connector.

Ensure that the tutorial data was received by Apache Kafka™ and records were processed by the DataStax Connector.

  1. Get a list of all messages in the cyclingComments topic:
    kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
    --topic CyclingComments --from-beginning > ~/CyclingComments-records.log &
  2. Count the number of lines, which is equal to the number of records:
    cat ~/CyclingComments-records.log | wc -l
  3. Compare it to the number of records written to the database table comments:
    cqlsh -e "SELECT COUNT(*) FROM cycling.comments;"