Install the Starlight for Kafka extension
This guide explains how to install the Starlight for Kafka extension as a protocol handler, and then use it to produce and consume Apache Kafka® messages on a Apache Pulsar™ cluster.
To install Starlight for Kafka as a proxy extension, see Starlight for Kafka proxy extension.
Prerequisites
To use Starlight for Kafka, you need a Pulsar cluster from Astra Streaming, Luna Streaming, or a self-managed or standalone Pulsar deployment.
Starlight for Kafka version 2.10 is compatible with the following:
-
Pulsar version 2.10.x
-
Kafka client versions 0.10, 1.x, 2.x, and 3.x
-
Kafka Streams
-
Kafka CLI
Starlight for Kafka requires JDK 11.
Establish the Kafka protocol handler
Before a Kafka client can interact with your Pulsar cluster, you need the Starlight for Kafka protocol handler installed in the cluster.
-
Astra Streaming
-
Luna Streaming
-
Self-managed
-
Build from source
These steps explain how to enable the Starlight for Kafka protocol handler in an Astra Streaming Pulsar cluster, and then get the connection details for your Kafka client:
-
In the Astra Portal header, click Applications, and then select Streaming.
-
Create a tenant or click the name of an existing tenant.
-
Go to your tenant’s Connect tab, select Kafka, and then click Enable Kafka.
-
Review the information about the Starlight for Kafka extension, and then click Enable Kafka to confirm that you want to enable this extension on your tenant.
This action creates a configuration file and the following three namespaces in your Astra Streaming tenant automatically:
-
kafka: Produces and consumes messages -
__kafka: Supports required Kafka functionality -
__kafka_unlimited: Stores metadata
These namespaces are required for the Starlight for Kafka extension to function properly. These are permanent namespaces that cannot be removed except by deleting the entire tenant and all of its data.
-
-
Save the Kafka connection details to a file named
ssl.properties. The actual values depend on your Astra Streaming tenant’s configuration and cloud provider.ssl.propertiesusername: TENANT_NAME password: token:****** bootstrap.servers: kafka-PROVIDER-REGION.streaming.datastax.com:9093 schema.registry.url: https://kafka-PROVIDER-REGION.streaming.datastax.com:8081 security.protocol: SASL_SSL sasl.mechanism: PLAINIn Astra Streaming, if you click Download or Copy, then a Pulsar token is included in the
passwordfield automatically. You can also generate a token on your tenant’s Settings tab. -
Optional: Add a session timeout value to the
ssl.propertiesfile to avoid connection issues with some Kafka clients:session.timeout.ms=45000Continue to the next section to learn how to use the connection details in a Kafka client script.
The Starlight for Kafka extension is included in the luna-streaming-all image used to deploy a Luna cluster.
The Luna Helm chart simplifies deployment of the Kafka extension.
The following steps explain how to deploy a Luna Streaming Helm chart to create a simple Pulsar cluster with the Starlight for Kafka extension ready to use.
-
Make sure you meet the following prerequisites:
-
Install Helm 3 CLI version 3.8.0 or later.
-
Install Kafka CLI version 3.3.1 or later
-
Install kubectl CLI version 1.23.4 or later.
-
Have access to a Kubernetes cluster with permission to create a namespace, deployments, and pods.
-
-
Add the DataStax Helm chart repo to your Helm store:
helm repo add datastax-pulsar https://datastax.github.io/pulsar-helm-chart -
Install the Helm chart using a minimal values file. The following command creates a Helm release named
my-pulsar-clusterusing the DataStax Luna Helm chart within a Kubernetes namespace nameddatastax-pulsar. This minimal configuration creates only the essential components and has no ingress or load balanced services.VALUES_URL="https://raw.githubusercontent.com/datastaxdevs/luna-streaming-examples/main/starlight-for-kafka/values.yaml" helm install \ --namespace datastax-pulsar \ --create-namespace \ --values $VALUES_URL \ --version 3.0.4 \ my-pulsar-cluster \ datastax-pulsar/pulsar -
Wait for the broker pod to reach a running state. It might restart a few times while the components start up.
kubectl -n datastax-pulsar wait --for=condition=Ready pod/pulsar-broker-0 --timeout=120s -
Use the Pulsar Admin CLI to inspect the tenants, namespaces, and topics that were created in your Pulsar cluster when you deployed the Helm chart with the Starlight for Kafka extension enabled.
By default, the Helm chart create a tenant named
publicwith a namespace nameddefault. In addition, the Starlight for Kafka extension creates several namespaces and topics that are required for its functionality. These namespaces are prefixed with__kafka.List the namespaces in the
publictenant:~/apache-pulsar-3.1.3$ ./bin/pulsar-admin namespaces list publicMake sure the output includes the
defaultand__kafkanamespaces:public/__kafka public/__kafka_producerid public/default -
Enable port forwarding for the Pulsar Admin and Starlight for Kafka services that are running on the Kubernetes cluster:
-
In a new terminal, port forward the Pulsar Admin service:
kubectl port-forward -n datastax-pulsar service/pulsar-broker 8080:8080 -
In a separate terminal window, port forward the Starlight for Kafka service:
kubectl port-forward -n datastax-pulsar service/pulsar-proxy 9092:9092
You don’t need to open the Pulsar binary port to accept new messages when using Starlight for Kafka. This is because Kafka clients communicate using the Kafka protocol on port 9092, which is handled by the Starlight for Kafka extension.
After deploying the Helm chart and enabling port forwarding, your applications can communicate with Pulsar as if it were a real Kafka host, as explained in the next section.
-
-
If this is the first time you’ve used protocol handlers with your Pulsar deployment, create a
protocolsfolder in the root of your Pulsar directory. -
Download the
pulsar-protocol-handler-kafka-VERSION.narfrom the Starlight for Kafka GitHub repository. -
Copy the
.narfile to yourPulsar/protocolsdirectory. -
Configure the Pulsar broker to run the Starlight for Kafka protocol handler as a plugin by adding configurations in the Pulsar configuration file
broker.conf. For standalone deployments, add these configurations tostandalone.conf.-
Add the following configuration values to your Pulsar configuration file:
messagingProtocols=kafka protocolHandlerDirectory=./protocols allowAutoTopicCreationType=partitionedProperty Default Value Suggested Value messagingProtocolskafkaprotocolHandlerDirectory./protocolsLocation of Starlight for Kafka
.narfileallowAutoTopicCreationTypenon-partitionedpartitionedallowAutoTopicCreationTypeis set tonon-partitionedby default. Since topics are partitioned by default in Kafka, it’s better to avoid creating non-partitioned topics unless Kafka clients need to interact with existing non-partitioned topics. -
Set Kafka listeners.
kafkaListenersis a comma-separated list of listeners, and the host/IP and port which Kafka binds to for listening.kafkaListeners=PLAINTEXT://127.0.0.1:9092 -
Set Kafka advertised listeners.
kafkaAdvertisedListenersis a comma-separated list of listeners with their host/IP and port.kafkaAdvertisedListenersisn’t required unless you want to expose another address to the Kafka client. It defaults to the same address askafkaListenersby default.kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092 -
Set offset management. Offset management is required because Starlight for Kafka depends upon Pulsar broker-level entry metadata.
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor -
Disable the deletion of inactive topics:
brokerDeleteInactiveTopicsEnabled=falseThis isn’t required but it is critical in Starlight for Kafka.
By default, Pulsar deletes inactive partitions of a partitioned topic, but the metadata of the partitioned topic isn’t deleted.
Starlight for Kafka cannot create missed partitions if
brokerDeleteInactiveTopicsEnabledis set totrue.
-
-
Set retention and time-to-live (TTL) policies for Starlight for Kafka namespaces. If you configure retention without configuring TTL, then all messages on Starlight for Kafka topics cannot be deleted because Starlight for Kafka doesn’t update a durable cursor.
If a Pulsar consumer and a Kafka consumer both subscribe to the same topic with the same subscription or group name, then the two consumers consume messages independently and they don’t share the same subscription, even though the subscription name of a Pulsar client is the same with the group name of a Kafka client.
-
If your topic is used by the Pulsar client only or the Kafka client only, then you can set
entryFormat=kafkainbroker.conffor better performance. Starlight for Kafka supports interaction between the Pulsar client and Kafka client by default.
Build the protocol handler from source if you want to customize the code or contribute to the project.
-
Clone the Starlight for Kafka GitHub repository to your local machine:
git clone https://github.com/datastax/starlight-for-kafka.git -
Change to the local repository directory:
cd starlight-for-kafka -
If needed, set
jenv global 11to use JDK 11 because Starlight for Kafka requires JDK11. -
Build the Maven project:
mvn clean install -DskipTests -
If this is the first time you’ve used protocol handlers with your Pulsar deployment, create a
protocolsfolder in the root of your Pulsar directory. -
Copy the protocol handler
.narfile to your Pulsarprotocolsdirectory. After building the project, the.narfile is available atstarlight-for-kafka/kafka-impl/target/pulsar-protocol-handler-kafka-VERSION.nar. -
Configure the Pulsar broker to run the Starlight for Kafka protocol handler as a plugin by adding configurations in the Pulsar configuration file
broker.conf. For standalone deployments, add these configurations tostandalone.conf.-
Add the following configuration values to your Pulsar configuration file:
messagingProtocols=kafka protocolHandlerDirectory=./protocols allowAutoTopicCreationType=partitionedProperty Default Value Suggested Value messagingProtocolskafkaprotocolHandlerDirectory./protocolsLocation of Starlight for Kafka
.narfileallowAutoTopicCreationTypenon-partitionedpartitionedallowAutoTopicCreationTypeis set tonon-partitionedby default. Since topics are partitioned by default in Kafka, it’s better to avoid creating non-partitioned topics unless Kafka clients need to interact with existing non-partitioned topics. -
Set Kafka listeners.
kafkaListenersis a comma-separated list of listeners, and the host/IP and port which Kafka binds to for listening.kafkaListeners=PLAINTEXT://127.0.0.1:9092 -
Set Kafka advertised listeners.
kafkaAdvertisedListenersis a comma-separated list of listeners with their host/IP and port.kafkaAdvertisedListenersisn’t required unless you want to expose another address to the Kafka client. It defaults to the same address askafkaListenersby default.kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092 -
Set offset management. Offset management is required because Starlight for Kafka depends upon Pulsar broker-level entry metadata.
brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor -
Disable the deletion of inactive topics:
brokerDeleteInactiveTopicsEnabled=falseThis isn’t required but it is critical in Starlight for Kafka.
By default, Pulsar deletes inactive partitions of a partitioned topic, but the metadata of the partitioned topic isn’t deleted.
Starlight for Kafka cannot create missed partitions if
brokerDeleteInactiveTopicsEnabledis set totrue.
-
-
Set retention and time-to-live (TTL) policies for Starlight for Kafka namespaces. If you configure retention without configuring TTL, then all messages on Starlight for Kafka topics cannot be deleted because Starlight for Kafka doesn’t update a durable cursor.
If a Pulsar consumer and a Kafka consumer both subscribe to the same topic with the same subscription or group name, then the two consumers consume messages independently and they don’t share the same subscription, even though the subscription name of a Pulsar client is the same with the group name of a Kafka client.
-
If your topic is used by the Pulsar client only or the Kafka client only, then you can set
entryFormat=kafkainbroker.conffor better performance. Starlight for Kafka supports interaction between the Pulsar client and Kafka client by default.
Produce and consume messages with Starlight for Kafka
This section explains how to use your Pulsar tenant’s connection details with a Kafka client to produce and consume messages with Starlight for Kafka.
Starlight for Kafka supports many different use cases. With a Pulsar cluster between publishers and consumers, you can change the type of publisher and consumer to fit your needs.
The following examples use the Kafka CLI and Java. For complete source code examples and examples for other languages, see the DataStax streaming examples repository. These examples are written for Astra Streaming but they can be adapted for Luna Streaming or self-managed Pulsar clusters by replacing the connection details with those for your cluster.
-
Astra Streaming
-
Luna Streaming
-
Self-managed
-
Kafka CLI
-
Java client
This example uses command line tools included with the Apache Kafka tarball.
-
Enable Starlight for Kafka and get the
ssl.propertiesconnection details, as explained in Establish the Kafka protocol handler. -
In Astra Streaming, in the
kafkanamespace, create a topic to receive messages from thekafka-console-producer. You can use any valid name for the topic. -
Move your Astra Streaming
ssl.propertiesfile into your Kafka installation’sconfigsubdirectory, such askafka_2.13-3.1.0/config. -
In a terminal window, use the
kafka-console-producertool to create a Kafka producer that produces messages on the topic you created in thekafkanamespace:bin/kafka-console-producer \ --broker-list kafka-PROVIDER-REGION.streaming.datastax.com:9093 \ --topic TENANT_NAME/kafka/TOPIC_NAME \ --producer.config config/ssl.propertiesWhen the producer is ready, it accepts standard input from the user. To send a message, type it in the terminal, and then press Return. Optionally, you can send some messages before starting the consumer to see how it retrieves past messages.
-
In a new terminal window, use the
kafka-console-consumertool to create a Kafka consumer that consumes messages from the beginning of your topic:bin/kafka-console-consumer \ --bootstrap-server kafka-PROVIDER-REGION.streaming.datastax.com:9093 \ --topic TENANT_NAME/kafka/TOPIC_NAME \ --consumer.config config/ssl.properties \ --from-beginningBecause this command uses the
--from-beginningoption, when the consumer starts, it retrieves any messages that were sent to the topic before it started. -
Send a few messages in the
kafka-console-producerterminal to generate more traffic on the tenant.As you send messages and the consumer receives them, they appear in the
kafka-console-consumerterminal. -
In Astra Streaming, go to your tenant’s Namespaces and Topics tab to inspect the activity in the
kafkanamespace. To verify that your Kafka messages are being produced and consumed in your Astra Streaming Pulsar cluster, check the Data In metrics. The number of messages should be equal to the number of messages you sent with thekafka-console-producer. -
To exit the producer and consumer shells, press Ctrl+C in each terminal.
The following example uses a Java program to create a connection between Kafka and your Astra Streaming tenant, configures a producer and consumer, and then sends a message. This example uses Maven, but you can also use Gradle.
-
Enable Starlight for Kafka and get the
ssl.propertiesconnection details, as explained in Establish the Kafka protocol handler. -
Create a new Maven project:
mvn archetype:generate \ -DgroupId=org.example \ -DartifactId=StarlightForKafkaClient \ -DarchetypeArtifactId=maven-archetype-quickstart \ -DinteractiveMode=false -
Change to the new project directory:
cd StarlightForKafkaClient -
Open the new project in your IDE, and then add the Kafka client dependency to
pom.xml:pom.xml<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.3.1</version> </dependency> -
Open the
App.javafile atsrc/main/java/org/example/App.java, and then delete any preexisting code in this file. In the next steps, you will add code to this file to create a complete program that produces and consumes messages. -
Paste the following code in the file, and then replace the placeholder values with the values from your
ssl.propertiesfile. Your editor will report errors because this isn’t a complete program yet.package org.example; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.{kafka-short}Consumer; import org.apache.kafka.clients.producer.*; import org.apache.kafka.common.serialization.LongSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class App { private static String bootstrapServers = "BOOTSTRAP_SERVER_URL"; private static String pulsarToken = "PASSWORD"; private static String tenantName = "USERNAME"; private static final String namespace = "kafka"; private static final String topicName = "quickstart"; private static final String topic = String.format("persistent://%s/%s/%s", tenantName,namespace,topicName); public static void main(String[] args) { -
Optional: Replace
quickstartwith another name for the topic that receives messages from the Kafka producer. If the topic doesn’t exist, it is created it automatically when the producer sends the first message. -
Add the following code that builds the configuration for the producer and consumer:
Properties config = new Properties(); config.put("bootstrap.servers",bootstrapServers); config.put("security.protocol","SASL_SSL"); config.put("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username='%s' password='token:%s';", tenantName, pulsarToken)); config.put("sasl.mechanism","PLAIN"); config.put("session.timeout.ms","45000"); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName()); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); config.put("group.id", "my-consumer-group"); -
Add the producer code, which is a simple flow that sends a single message and awaits acknowledgment:
KafkaProducer<Long, String> producer = new KafkaProducer<>(config); final ProducerRecord<Long, String> producerRecord = new ProducerRecord<>(topic, System.currentTimeMillis(), "Hello World"); producer.send(producerRecord, new Callback() { public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) System.out.println(String.format("Send failed for record, %s. \nRecord data: %s",e.getMessage(), producerRecord)); else System.out.println("Successfully sent message"); } }); producer.flush(); producer.close(); -
Add the consumer code, which creates a basic subscription and retrieves the latest messages on the topic:
final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(config); consumer.subscribe(Collections.singletonList(topic)); ConsumerRecords<Integer, String> consumerRecords = consumer.poll(Duration.ofMillis(5000)); System.out.println(String.format("Found %d total record(s)", consumerRecords.count())); for (ConsumerRecord<Integer, String> consumerRecord : consumerRecords) { System.out.println(consumerRecord); } consumer.commitSync(); consumer.close(); } } -
Save
App.java, and then build and run the JAR file for the complete program:mvn clean package assembly:single java -jar target/StarlightForKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar -
Make sure the result shows that a message was produces and consumed:
Successfully sent message Found 1 total record(s) ConsumerRecord(topic = persistent://my-tenant-007/my-namespace/my-topic, partition = 0, leaderEpoch = null, offset = 22, CreateTime = 1673545962124, serialized key size = 8, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = xxxxx, value = Hello World) -
In Astra Streaming, go to your tenant’s Namespaces and Topics tab to inspect the activity in the
kafkanamespace.If you everything was configured correctly, then the
kafkanamespace should have a topic namedquickstartthat was created by the Java program. Additionally, the namespace’s metrics should reflect that at least one message was published and consumed by your Astra Streaming Pulsar topic.
To use the Kafka CLI or a Kafka client with Starlight for Kafka, you use your Luna Streaming Pulsar tenant as the Kafka bootstrap server.
The following example command line tools included with the Apache Kafka tarball, and it connects on localhost:9092 because port forwarding was enabled in Establish the Kafka protocol handler.
-
In your Kafka installation directory, start the
kafka-console-producershell:bin/kafka-console-producer.sh \ --topic quickstart \ --bootstrap-server localhost:9092You can replace
quickstartwith any other name for the topic that will receive messages from the producer. If the specified topic doesn’t exist in your Luna Streaming tenant, it is created automatically when the producer sends the first message. You can configure this behavior and other Starlight for Kafka parameters in the Helm chart.Connect to a remote cluster
To connect to a remote Luna Streaming cluster, provide the full topic name and the bootstrap server URL for your Luna Streaming tenant:
bin/kafka-console-producer.sh --topic "TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME" --bootstrap-server "SERVICE_URL" -
When the producer is ready, type a message, and then press Return to send it. Optionally, you can send several messages before starting the consumer to see how it retrieves past messages.
-
In a new terminal window, start the
kafka-console-consumershell:bin/kafka-console-consumer.sh \ --topic quickstart \ --bootstrap-server localhost:9092 \ --from-beginningBecause this command uses the
--from-beginningoption, when the consumer starts, it retrieves any messages that were sent to the topic before it started. -
Send a few messages in the
kafka-console-producerterminal to generate more traffic on the tenant.As you send messages and the consumer receives them, they appear in the
kafka-console-consumerterminal. -
Use the Pulsar Admin CLI or the Luna Streaming Pulsar Admin Console to inspect your tenant’s activity, and verify that messages were published and consumed on the specified topic.
-
To exit the producer and consumer shells, press Ctrl+C in each terminal.
To use the Kafka CLI or a Kafka client with Starlight for Kafka, you use your Pulsar tenant as the Kafka bootstrap server.
The following example command line tools included with the Apache Kafka tarball, and it assumes port forwarding is enabled to connect on localhost:9092.
-
Restart your Pulsar brokers if you haven’t done so already.
-
In your Kafka installation directory, start the
kafka-console-producershell:bin/kafka-console-producer.sh \ --topic quickstart \ --bootstrap-server localhost:9092You can replace
quickstartwith any other name for the topic that will receive messages from the producer. If the specified topic doesn’t exist in your Pulsar tenant, it is created automatically when the producer sends the first message. You can configure this behavior and other Starlight for Kafka parameters.Connect to a remote cluster
To connect to a remote Pulsar cluster, provide the full topic name and the bootstrap server URL for your tenant:
bin/kafka-console-producer.sh --topic "TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME" --bootstrap-server "SERVICE_URL" -
When the producer is ready, type a message, and then press Return to send it. Optionally, you can send several messages before starting the consumer to see how it retrieves past messages.
You can also start the consumer first, and then start the producer afterwards.
-
In a new terminal window, start the
kafka-console-consumershell:bin/kafka-console-consumer.sh \ --topic quickstart \ --bootstrap-server localhost:9092 \ --from-beginningBecause this command uses the
--from-beginningoption, when the consumer starts, it retrieves any messages that were sent to the topic before it started.The
--bootstrap-serveris the address of your Pulsar cluster where the Starlight for Kafka extension is installed. Another way to format the localhost address isPLAINTEXT://IP_ADDRESS:9092.Alternatively, you can create the consumer with the Pulsar Client CLI. For example, the following command creates a consumer on the
quickstarttopic with a subscription namedmy-subscription. The-n 0option means the consumer continues running instead of closing the connection after consuming a message.pulsar-client consume quickstart -s "my-subscription" -n 0 -
Send a few messages in the
kafka-console-producerterminal to generate more traffic on the tenant.As you send messages and the consumer receives them, they appear in the terminal where you started the consumer.
-
Use the Pulsar Admin CLI to inspect your tenant’s activity, and verify that messages were published and consumed on the specified topic.
-
To exit the producer and consumer shells, press Ctrl+C in each terminal.
Next steps
Learn how to configure and manage Starlight for Kafka.