Kafka streams in Starlight for Kafka
Starlight for Kafka can run Apache Kafka® Streams applications with minimal configuration changes. This guide explains how to make those changes, and it provides an example Java application to try out Kstreams with Starlight for Kafka.
Configure username and password authentication (SASL with PLAIN mechanism)
To use the username and password authentication in Kstreams, set the following properties through Kafka JAAS:
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="tenantname"
password="token:XXXX-JWT-TOKEN;"
security.protocol=SASL_PLAINTEXT
# security.protocol=SASL_SSL if SSL connection is being used
sasl.mechanism=PLAIN
Configure replication factor
In Starlight for Kafka, the replication factor is handled by BookKeeper and set at the namespace level.
Kafka Streams sets its own internal replication.factor number, but Pulsar applies its namespace replication factor from BookKeeper.
Set replication.factor=1 in KStreams, or any positive integer.
Application code
This Java application accepts a stream of words from a KStreams input, processes them, and prints the number of occurrences of each word to the KStreams output.
-
Create two topics in the
kafkanamespace of your Pulsar tenant calledstreams-plaintext-inputandstreams-plaintext-output:bin/pulsar-admin topics create-partitioned-topic \ persistent://my-tenant-1/kafka/streams-plaintext-input \ --partitions 4 bin/pulsar-admin topics create-partitioned-topic \ persistent://my-tenant-1/kafka/streams-plaintext-output \ --partitions 4 -
Add your username (Pulsar tenant), auth token, and broker URL, and run the program.
import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Printed; import org.apache.kafka.streams.kstream.Produced; import java.util.Arrays; import java.util.Properties; import java.util.regex.Pattern; public class WordCountExample { static final String username = "my-tenant-1"; static final String password= "token:XXXX-JWT-TOKEN"; static final String broker = "kafka-aws-useast1.streaming.datastax.com:9093"; static final String inputTopic = "streams-plaintext-input"; static final String outputTopic = "streams-wordcount-output"; //The Streams application as a whole can be launched like any normal Java application that has a `main()` method public static void main(final String[] args) { final String bootstrapServers = args.length > 0 ? args[0] : broker; // Configure the Streams application final Properties streamsConfiguration = getStreamsConfiguration(bootstrapServers); // Define the processing topology of the Streams application final StreamsBuilder builder = new StreamsBuilder(); createWordCountStream(builder); final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); // Always (and unconditionally) clean local state prior to starting the processing topology. // An unconditional call is used because it makes it easier for you to manipulate this example // when resetting the application for doing a re-run with the Application Reset Tool: // https://docs.confluent.io/platform/current/streams/developer-guide/app-reset-tool.html // // The drawback of cleaning up local state prior is that your app must rebuilt its local state from scratch. // This takes time and requires reading all the state-relevant data from the Kafka cluster over the network. // In a typical production scenario, you don't want to clean up always as done here, but rather only when needed. // For example, only under certain conditions like the presence of a command line flag for your app. // See `ApplicationResetExample.java` for a production-like example. streams.cleanUp(); // Run the processing topology with `start()` to begin processing its input data streams.start(); // Add shutdown hook to respond to SIGTERM and gracefully close the Streams application Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); } /** * Configure the Streams application. * * Various Kafka Streams related settings are defined here such as the location of the target Kafka cluster to use. * Additionally, you could also define Kafka Producer and Kafka Consumer settings when needed. * * @param bootstrapServers Kafka cluster address * @return Properties getStreamsConfiguration */ static Properties getStreamsConfiguration(final String bootstrapServers) { final Properties streamsConfiguration = new Properties(); // Give the Streams application a unique name. The name must be unique in the Kafka cluster // against which the application is run. streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-lambda-example"); streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "wordcount-lambda-example-client"); // Where to find Kafka broker(s). streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // Specify default (de)serializers for record keys and for record values. streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); // Records should be flushed every 10 seconds. This is less than the default // in order to keep this example interactive. streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000); // For this example, record caches are disabled. streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); streamsConfiguration.put(StreamsConfig.topicPrefix(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG), 1); // Authentication streamsConfiguration.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required "+ "username=\""+username+"\" " + "password=\""+password+"\";"); streamsConfiguration.put("security.protocol","SASL_SSL"); streamsConfiguration.put("sasl.mechanism","PLAIN"); // In Starlight for Kafka replication is handled by BookKeeper // and you usually set it at namespace level // in KStreams you can set it to 1, but the Broker // applies the namespace policies streamsConfiguration.put("replication.factor","1"); return streamsConfiguration; } /** * Define the processing topology for Word Count. * * @param builder StreamsBuilder to use */ static void createWordCountStream(final StreamsBuilder builder) { // Construct a `KStream` from the input topic "streams-plaintext-input" // Here, message values represent lines of text. // This example ignores whatever is stored in the message keys. // The default key and value serdes are used. final KStream<String, String> textLines = builder.stream(inputTopic); final Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS); final KTable<String, Long> wordCounts = textLines // Split each text line, by whitespace, into words. // The text lines are the record values, the data in the record keys is ignored. // Then, the program can invoke `flatMapValues()` instead of the more generic `flatMap()`. .flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase()))) // Group the split data by word so the program can subsequently count the occurrences per word. // This step re-keys (re-partitions) the input data with the new record key being the words. // There is no need to specify explicit serdes because the resulting key and value types // (String and String) match the application's default serdes. .groupBy((keyIgnored, word) -> word) // Count the occurrences of each word (record key). .count(); // Write the `KTable<String, Long>` to the output topic. wordCounts.toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.Long())); wordCounts.toStream().print(Printed.toSysOut()); } } -
Run the program. The output is similar to the following:
[wordcount-lambda-example-client-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=wordcount-lambda-example-client-StreamThread-1-consumer, groupId=wordcount-lambda-example] Requesting the log end offset for streams-plaintext-input-0 in order to compute lag [wordcount-lambda-example-client-StreamThread-1] INFO org.apache.kafka.clients.consumer.KafkaConsumer - [Consumer clientId=wordcount-lambda-example-client-StreamThread-1-consumer, groupId=wordcount-lambda-example] Requesting the log end offset for wordcount-lambda-example-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition-0 in order to compute lag [KTABLE-TOSTREAM-0000000010]: hello, 1 [KTABLE-TOSTREAM-0000000010]: hello, 2 [KTABLE-TOSTREAM-0000000010]: again, 1 [KTABLE-TOSTREAM-0000000010]: hello, 3 [KTABLE-TOSTREAM-0000000010]: hello, 4 [KTABLE-TOSTREAM-0000000010]: again, 2
Troubleshoot the Kstreams demo
If you are on an M1 Mac and encounter a jnilib error, add the following dependency to the example application’s pom.xml file:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.2.0</version>
</dependency>