DataStax Apache Kafka Connector
Deploy the DataStax Apache Kafka Connector to stream records from Apache Kafka® topics to your Astra Managed Cluster databases.
The Kafka Connector distribution package includes a sample JSON properties file (dse-sink-distributed.json.sample).
Use the sample file as a reference when configuring your deployment.
The dse-sink-distributed.json.sample file is located in the conf directory of the Kafka Connector installation.
-
Configure the distributed worker configuration file
connect-distributed.propertiesfor your use case. For example:bootstrap.servers=localhost:9092 group.id=json-example-group key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false # The internal converter used for offsets, config, and status data is configurable and must be specified, but most users will # always want to use the built-in default. Offset, config, and status data is never visible outside of Kafka Connect in this format. internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false offset.storage.topic=connect-offsets offset.storage.replication.factor=1 config.storage.topic=connect-configs config.storage.replication.factor=1 status.storage.topic=connect-status status.storage.replication.factor=1 offset.flush.interval.ms=10000 plugin.path=/home/automaton/kafka-connect-cassandra-sink-1.4.0.jarSpecify the converter for the
key.converterandvalue.converterproperties that matches the form of your Kafka data. For more information, see Configuring converters in the Confluent documentation. -
From your Kafka installation directory, start the distributed worker:
bin/connect-distributed.sh config/connect-distributed.propertiesThe worker startup process outputs a large number of informational messages. When the process is complete, the final output is similar to the following:
[2019-10-13 19:49:25,385] INFO Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder:852) -
Configure the JSON configuration file (such as
dse-sink.json) to use the Secure Connect Bundle (SCB):{ "name": "dse-sink", "config": { "connector.class": "com.datastax.kafkaconnector.DseSinkConnector", "cloud.secureConnectBundle": "/path/to/scb.zip", "auth.username": "clientId", "auth.password": "clientSecret" ... } }Define the following configuration settings:
-
name: Unique name for the connector. Default:dse-sink. -
connector.class: DataStax connector Java class provided in thekafka-connect-dse-N.N.N.jar. Default:com.datastax.kafkaconnector.DseSinkConnector. -
cloud.secureConnectBundle: The full path to the SCB for your Managed Cluster database (secure-connect-DATABASE_NAME.zip).If this option is specified, you must also include the
auth.usernameandauth.passwordfor the database user. -
auth.username: The database username or the literal stringtoken. -
auth.password:The database password or an Astra application token (AstraCS:…).The user or application token must have a minimum of
modifyprivileges on tables receiving data from the Kafka Connector.
-
-
Register the connector configuration with the distributed worker:
curl -X POST -H "Content-Type: application/json" -d @dse-sink.json "http://IP/:PORT/connectors"ipandportare the IP address and port number of the Kafka worker. Use the same port as therest.portparameter set inconnect-distributed.properties. The default port is8083.