Install the Starlight for Kafka extension

This guide explains how to install the Starlight for Kafka extension as a protocol handler, and then use it to produce and consume Apache Kafka® messages on a Apache Pulsar™ cluster.

To install Starlight for Kafka as a proxy extension, see Starlight for Kafka proxy extension.

Prerequisites

To use Starlight for Kafka, you need a Pulsar cluster from Astra Streaming, Luna Streaming, or a self-managed or standalone Pulsar deployment.

Starlight for Kafka version 2.10 is compatible with the following:

  • Pulsar version 2.10.x

  • Kafka client versions 0.10, 1.x, 2.x, and 3.x

  • Kafka Streams

  • Kafka CLI

Starlight for Kafka requires JDK 11.

Establish the Kafka protocol handler

Before a Kafka client can interact with your Pulsar cluster, you need the Starlight for Kafka protocol handler installed in the cluster.

  • Astra Streaming

  • Luna Streaming

  • Self-managed

  • Build from source

These steps explain how to enable the Starlight for Kafka protocol handler in an Astra Streaming Pulsar cluster, and then get the connection details for your Kafka client:

  1. In the Astra Portal header, click Applications, and then select Streaming.

  2. Create a tenant or click the name of an existing tenant.

  3. Go to your tenant’s Connect tab, select Kafka, and then click Enable Kafka.

  4. Review the information about the Starlight for Kafka extension, and then click Enable Kafka to confirm that you want to enable this extension on your tenant.

    This action creates a configuration file and the following three namespaces in your Astra Streaming tenant automatically:

    • kafka: Produces and consumes messages

    • __kafka: Supports required Kafka functionality

    • __kafka_unlimited: Stores metadata

    These namespaces are required for the Starlight for Kafka extension to function properly. These are permanent namespaces that cannot be removed except by deleting the entire tenant and all of its data.

  5. Save the Kafka connection details to a file named ssl.properties. The actual values depend on your Astra Streaming tenant’s configuration and cloud provider.

    ssl.properties
    username: TENANT_NAME
    password: token:******
    bootstrap.servers: kafka-PROVIDER-REGION.streaming.datastax.com:9093
    schema.registry.url: https://kafka-PROVIDER-REGION.streaming.datastax.com:8081
    security.protocol: SASL_SSL
    sasl.mechanism: PLAIN

    In Astra Streaming, if you click Download or Copy, then a Pulsar token is included in the password field automatically. You can also generate a token on your tenant’s Settings tab.

  6. Optional: Add a session timeout value to the ssl.properties file to avoid connection issues with some Kafka clients:

    session.timeout.ms=45000

    Continue to the next section to learn how to use the connection details in a Kafka client script.

The Starlight for Kafka extension is included in the luna-streaming-all image used to deploy a Luna cluster. The Luna Helm chart simplifies deployment of the Kafka extension.

The following steps explain how to deploy a Luna Streaming Helm chart to create a simple Pulsar cluster with the Starlight for Kafka extension ready to use.

  1. Make sure you meet the following prerequisites:

    • Install Helm 3 CLI version 3.8.0 or later.

    • Install Kafka CLI version 3.3.1 or later

    • Install kubectl CLI version 1.23.4 or later.

    • Have access to a Kubernetes cluster with permission to create a namespace, deployments, and pods.

  2. Add the DataStax Helm chart repo to your Helm store:

    helm repo add datastax-pulsar https://datastax.github.io/pulsar-helm-chart
  3. Install the Helm chart using a minimal values file. The following command creates a Helm release named my-pulsar-cluster using the DataStax Luna Helm chart within a Kubernetes namespace named datastax-pulsar. This minimal configuration creates only the essential components and has no ingress or load balanced services.

    VALUES_URL="https://raw.githubusercontent.com/datastaxdevs/luna-streaming-examples/main/starlight-for-kafka/values.yaml"
    helm install \
      --namespace datastax-pulsar \
      --create-namespace \
      --values $VALUES_URL \
      --version 3.0.4 \
      my-pulsar-cluster \
      datastax-pulsar/pulsar
  4. Wait for the broker pod to reach a running state. It might restart a few times while the components start up.

    kubectl -n datastax-pulsar wait --for=condition=Ready pod/pulsar-broker-0 --timeout=120s
  5. Use the Pulsar Admin CLI to inspect the tenants, namespaces, and topics that were created in your Pulsar cluster when you deployed the Helm chart with the Starlight for Kafka extension enabled.

    By default, the Helm chart create a tenant named public with a namespace named default. In addition, the Starlight for Kafka extension creates several namespaces and topics that are required for its functionality. These namespaces are prefixed with __kafka.

    List the namespaces in the public tenant:

    ~/apache-pulsar-3.1.3$ ./bin/pulsar-admin namespaces list public

    Make sure the output includes the default and __kafka namespaces:

    public/__kafka
    public/__kafka_producerid
    public/default
  6. Enable port forwarding for the Pulsar Admin and Starlight for Kafka services that are running on the Kubernetes cluster:

    1. In a new terminal, port forward the Pulsar Admin service:

      kubectl port-forward -n datastax-pulsar service/pulsar-broker 8080:8080
    2. In a separate terminal window, port forward the Starlight for Kafka service:

      kubectl port-forward -n datastax-pulsar service/pulsar-proxy 9092:9092

    You don’t need to open the Pulsar binary port to accept new messages when using Starlight for Kafka. This is because Kafka clients communicate using the Kafka protocol on port 9092, which is handled by the Starlight for Kafka extension.

    After deploying the Helm chart and enabling port forwarding, your applications can communicate with Pulsar as if it were a real Kafka host, as explained in the next section.

  1. If this is the first time you’ve used protocol handlers with your Pulsar deployment, create a protocols folder in the root of your Pulsar directory.

  2. Download the pulsar-protocol-handler-kafka-VERSION.nar from the Starlight for Kafka GitHub repository.

  3. Copy the .nar file to your Pulsar/protocols directory.

  4. Configure the Pulsar broker to run the Starlight for Kafka protocol handler as a plugin by adding configurations in the Pulsar configuration file broker.conf. For standalone deployments, add these configurations to standalone.conf.

    1. Add the following configuration values to your Pulsar configuration file:

      messagingProtocols=kafka
      protocolHandlerDirectory=./protocols
      allowAutoTopicCreationType=partitioned
      Property Default Value Suggested Value

      messagingProtocols

      kafka

      protocolHandlerDirectory

      ./protocols

      Location of Starlight for Kafka .nar file

      allowAutoTopicCreationType

      non-partitioned

      partitioned

      allowAutoTopicCreationType is set to non-partitioned by default. Since topics are partitioned by default in Kafka, it’s better to avoid creating non-partitioned topics unless Kafka clients need to interact with existing non-partitioned topics.

    2. Set Kafka listeners. kafkaListeners is a comma-separated list of listeners, and the host/IP and port which Kafka binds to for listening.

      kafkaListeners=PLAINTEXT://127.0.0.1:9092
    3. Set Kafka advertised listeners. kafkaAdvertisedListeners is a comma-separated list of listeners with their host/IP and port. kafkaAdvertisedListeners isn’t required unless you want to expose another address to the Kafka client. It defaults to the same address as kafkaListeners by default.

      kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092
    4. Set offset management. Offset management is required because Starlight for Kafka depends upon Pulsar broker-level entry metadata.

      brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
    5. Disable the deletion of inactive topics:

      brokerDeleteInactiveTopicsEnabled=false

      This isn’t required but it is critical in Starlight for Kafka.

      By default, Pulsar deletes inactive partitions of a partitioned topic, but the metadata of the partitioned topic isn’t deleted.

      Starlight for Kafka cannot create missed partitions if brokerDeleteInactiveTopicsEnabled is set to true.

  5. Set retention and time-to-live (TTL) policies for Starlight for Kafka namespaces. If you configure retention without configuring TTL, then all messages on Starlight for Kafka topics cannot be deleted because Starlight for Kafka doesn’t update a durable cursor.

    If a Pulsar consumer and a Kafka consumer both subscribe to the same topic with the same subscription or group name, then the two consumers consume messages independently and they don’t share the same subscription, even though the subscription name of a Pulsar client is the same with the group name of a Kafka client.

  6. If your topic is used by the Pulsar client only or the Kafka client only, then you can set entryFormat=kafka in broker.conf for better performance. Starlight for Kafka supports interaction between the Pulsar client and Kafka client by default.

Build the protocol handler from source if you want to customize the code or contribute to the project.

  1. Clone the Starlight for Kafka GitHub repository to your local machine:

    git clone https://github.com/datastax/starlight-for-kafka.git
  2. Change to the local repository directory:

    cd starlight-for-kafka
  3. If needed, set jenv global 11 to use JDK 11 because Starlight for Kafka requires JDK11.

  4. Build the Maven project:

    mvn clean install -DskipTests
  5. If this is the first time you’ve used protocol handlers with your Pulsar deployment, create a protocols folder in the root of your Pulsar directory.

  6. Copy the protocol handler .nar file to your Pulsar protocols directory. After building the project, the .nar file is available at starlight-for-kafka/kafka-impl/target/pulsar-protocol-handler-kafka-VERSION.nar.

  7. Configure the Pulsar broker to run the Starlight for Kafka protocol handler as a plugin by adding configurations in the Pulsar configuration file broker.conf. For standalone deployments, add these configurations to standalone.conf.

    1. Add the following configuration values to your Pulsar configuration file:

      messagingProtocols=kafka
      protocolHandlerDirectory=./protocols
      allowAutoTopicCreationType=partitioned
      Property Default Value Suggested Value

      messagingProtocols

      kafka

      protocolHandlerDirectory

      ./protocols

      Location of Starlight for Kafka .nar file

      allowAutoTopicCreationType

      non-partitioned

      partitioned

      allowAutoTopicCreationType is set to non-partitioned by default. Since topics are partitioned by default in Kafka, it’s better to avoid creating non-partitioned topics unless Kafka clients need to interact with existing non-partitioned topics.

    2. Set Kafka listeners. kafkaListeners is a comma-separated list of listeners, and the host/IP and port which Kafka binds to for listening.

      kafkaListeners=PLAINTEXT://127.0.0.1:9092
    3. Set Kafka advertised listeners. kafkaAdvertisedListeners is a comma-separated list of listeners with their host/IP and port. kafkaAdvertisedListeners isn’t required unless you want to expose another address to the Kafka client. It defaults to the same address as kafkaListeners by default.

      kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092
    4. Set offset management. Offset management is required because Starlight for Kafka depends upon Pulsar broker-level entry metadata.

      brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
    5. Disable the deletion of inactive topics:

      brokerDeleteInactiveTopicsEnabled=false

      This isn’t required but it is critical in Starlight for Kafka.

      By default, Pulsar deletes inactive partitions of a partitioned topic, but the metadata of the partitioned topic isn’t deleted.

      Starlight for Kafka cannot create missed partitions if brokerDeleteInactiveTopicsEnabled is set to true.

  8. Set retention and time-to-live (TTL) policies for Starlight for Kafka namespaces. If you configure retention without configuring TTL, then all messages on Starlight for Kafka topics cannot be deleted because Starlight for Kafka doesn’t update a durable cursor.

    If a Pulsar consumer and a Kafka consumer both subscribe to the same topic with the same subscription or group name, then the two consumers consume messages independently and they don’t share the same subscription, even though the subscription name of a Pulsar client is the same with the group name of a Kafka client.

  9. If your topic is used by the Pulsar client only or the Kafka client only, then you can set entryFormat=kafka in broker.conf for better performance. Starlight for Kafka supports interaction between the Pulsar client and Kafka client by default.

Produce and consume messages with Starlight for Kafka

This section explains how to use your Pulsar tenant’s connection details with a Kafka client to produce and consume messages with Starlight for Kafka.

Starlight for Kafka supports many different use cases. With a Pulsar cluster between publishers and consumers, you can change the type of publisher and consumer to fit your needs.

The following examples use the Kafka CLI and Java. For complete source code examples and examples for other languages, see the DataStax streaming examples repository. These examples are written for Astra Streaming but they can be adapted for Luna Streaming or self-managed Pulsar clusters by replacing the connection details with those for your cluster.

  • Astra Streaming

  • Luna Streaming

  • Self-managed

  • Kafka CLI

  • Java client

This example uses command line tools included with the Apache Kafka tarball.

  1. Enable Starlight for Kafka and get the ssl.properties connection details, as explained in Establish the Kafka protocol handler.

  2. In Astra Streaming, in the kafka namespace, create a topic to receive messages from the kafka-console-producer. You can use any valid name for the topic.

  3. Move your Astra Streaming ssl.properties file into your Kafka installation’s config subdirectory, such as kafka_2.13-3.1.0/config.

  4. In a terminal window, use the kafka-console-producer tool to create a Kafka producer that produces messages on the topic you created in the kafka namespace:

    bin/kafka-console-producer \
    --broker-list kafka-PROVIDER-REGION.streaming.datastax.com:9093 \
    --topic TENANT_NAME/kafka/TOPIC_NAME \
    --producer.config config/ssl.properties

    When the producer is ready, it accepts standard input from the user. To send a message, type it in the terminal, and then press Return. Optionally, you can send some messages before starting the consumer to see how it retrieves past messages.

  5. In a new terminal window, use the kafka-console-consumer tool to create a Kafka consumer that consumes messages from the beginning of your topic:

    bin/kafka-console-consumer \
    --bootstrap-server kafka-PROVIDER-REGION.streaming.datastax.com:9093 \
    --topic TENANT_NAME/kafka/TOPIC_NAME \
    --consumer.config config/ssl.properties \
    --from-beginning

    Because this command uses the --from-beginning option, when the consumer starts, it retrieves any messages that were sent to the topic before it started.

  6. Send a few messages in the kafka-console-producer terminal to generate more traffic on the tenant.

    As you send messages and the consumer receives them, they appear in the kafka-console-consumer terminal.

  7. In Astra Streaming, go to your tenant’s Namespaces and Topics tab to inspect the activity in the kafka namespace. To verify that your Kafka messages are being produced and consumed in your Astra Streaming Pulsar cluster, check the Data In metrics. The number of messages should be equal to the number of messages you sent with the kafka-console-producer.

  8. To exit the producer and consumer shells, press Ctrl+C in each terminal.

The following example uses a Java program to create a connection between Kafka and your Astra Streaming tenant, configures a producer and consumer, and then sends a message. This example uses Maven, but you can also use Gradle.

  1. Enable Starlight for Kafka and get the ssl.properties connection details, as explained in Establish the Kafka protocol handler.

  2. Create a new Maven project:

    mvn archetype:generate \
        -DgroupId=org.example \
        -DartifactId=StarlightForKafkaClient \
        -DarchetypeArtifactId=maven-archetype-quickstart \
        -DinteractiveMode=false
  3. Change to the new project directory:

    cd StarlightForKafkaClient
  4. Open the new project in your IDE, and then add the Kafka client dependency to pom.xml:

    pom.xml
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.3.1</version>
    </dependency>
  5. Open the App.java file at src/main/java/org/example/App.java, and then delete any preexisting code in this file. In the next steps, you will add code to this file to create a complete program that produces and consumes messages.

  6. Paste the following code in the file, and then replace the placeholder values with the values from your ssl.properties file. Your editor will report errors because this isn’t a complete program yet.

    package org.example;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.{kafka-short}Consumer;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.LongSerializer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class App {
      private static String bootstrapServers = "BOOTSTRAP_SERVER_URL";
      private static String pulsarToken = "PASSWORD";
      private static String tenantName = "USERNAME";
      private static final String namespace = "kafka";
      private static final String topicName = "quickstart";
      private static final String topic = String.format("persistent://%s/%s/%s", tenantName,namespace,topicName);
    
      public static void main(String[] args) {
  7. Optional: Replace quickstart with another name for the topic that receives messages from the Kafka producer. If the topic doesn’t exist, it is created it automatically when the producer sends the first message.

  8. Add the following code that builds the configuration for the producer and consumer:

        Properties config = new Properties();
        config.put("bootstrap.servers",bootstrapServers);
        config.put("security.protocol","SASL_SSL");
        config.put("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username='%s' password='token:%s';", tenantName, pulsarToken));
        config.put("sasl.mechanism","PLAIN");
        config.put("session.timeout.ms","45000");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        config.put("group.id", "my-consumer-group");
  9. Add the producer code, which is a simple flow that sends a single message and awaits acknowledgment:

        KafkaProducer<Long, String> producer = new KafkaProducer<>(config);
    
        final ProducerRecord<Long, String> producerRecord = new ProducerRecord<>(topic, System.currentTimeMillis(), "Hello World");
        producer.send(producerRecord, new Callback() {
          public void onCompletion(RecordMetadata metadata, Exception e) {
            if (e != null)
              System.out.println(String.format("Send failed for record, %s. \nRecord data: %s",e.getMessage(), producerRecord));
            else
              System.out.println("Successfully sent message");
          }
        });
    
        producer.flush();
        producer.close();
  10. Add the consumer code, which creates a basic subscription and retrieves the latest messages on the topic:

        final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(config);
    
        consumer.subscribe(Collections.singletonList(topic));
        ConsumerRecords<Integer, String> consumerRecords = consumer.poll(Duration.ofMillis(5000));
    
        System.out.println(String.format("Found %d total record(s)", consumerRecords.count()));
    
        for (ConsumerRecord<Integer, String> consumerRecord : consumerRecords) {
          System.out.println(consumerRecord);
        }
    
        consumer.commitSync();
        consumer.close();
      }
    }
  11. Save App.java, and then build and run the JAR file for the complete program:

    mvn clean package assembly:single
    java -jar target/StarlightForKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar
  12. Make sure the result shows that a message was produces and consumed:

    Successfully sent message
    
    Found 1 total record(s)
    ConsumerRecord(topic = persistent://my-tenant-007/my-namespace/my-topic, partition = 0, leaderEpoch = null, offset = 22, CreateTime = 1673545962124, serialized key size = 8, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key =   xxxxx, value = Hello World)
  13. In Astra Streaming, go to your tenant’s Namespaces and Topics tab to inspect the activity in the kafka namespace.

    If you everything was configured correctly, then the kafka namespace should have a topic named quickstart that was created by the Java program. Additionally, the namespace’s metrics should reflect that at least one message was published and consumed by your Astra Streaming Pulsar topic.

To use the Kafka CLI or a Kafka client with Starlight for Kafka, you use your Luna Streaming Pulsar tenant as the Kafka bootstrap server. The following example command line tools included with the Apache Kafka tarball, and it connects on localhost:9092 because port forwarding was enabled in Establish the Kafka protocol handler.

  1. In your Kafka installation directory, start the kafka-console-producer shell:

    bin/kafka-console-producer.sh \
    --topic quickstart \
    --bootstrap-server localhost:9092

    You can replace quickstart with any other name for the topic that will receive messages from the producer. If the specified topic doesn’t exist in your Luna Streaming tenant, it is created automatically when the producer sends the first message. You can configure this behavior and other Starlight for Kafka parameters in the Helm chart.

    Connect to a remote cluster

    To connect to a remote Luna Streaming cluster, provide the full topic name and the bootstrap server URL for your Luna Streaming tenant:

    bin/kafka-console-producer.sh --topic "TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME" --bootstrap-server "SERVICE_URL"
  2. When the producer is ready, type a message, and then press Return to send it. Optionally, you can send several messages before starting the consumer to see how it retrieves past messages.

  3. In a new terminal window, start the kafka-console-consumer shell:

    bin/kafka-console-consumer.sh \
    --topic quickstart \
    --bootstrap-server localhost:9092 \
    --from-beginning

    Because this command uses the --from-beginning option, when the consumer starts, it retrieves any messages that were sent to the topic before it started.

  4. Send a few messages in the kafka-console-producer terminal to generate more traffic on the tenant.

    As you send messages and the consumer receives them, they appear in the kafka-console-consumer terminal.

  5. Use the Pulsar Admin CLI or the Luna Streaming Pulsar Admin Console to inspect your tenant’s activity, and verify that messages were published and consumed on the specified topic.

  6. To exit the producer and consumer shells, press Ctrl+C in each terminal.

To use the Kafka CLI or a Kafka client with Starlight for Kafka, you use your Pulsar tenant as the Kafka bootstrap server. The following example command line tools included with the Apache Kafka tarball, and it assumes port forwarding is enabled to connect on localhost:9092.

  1. Restart your Pulsar brokers if you haven’t done so already.

  2. In your Kafka installation directory, start the kafka-console-producer shell:

    bin/kafka-console-producer.sh \
    --topic quickstart \
    --bootstrap-server localhost:9092

    You can replace quickstart with any other name for the topic that will receive messages from the producer. If the specified topic doesn’t exist in your Pulsar tenant, it is created automatically when the producer sends the first message. You can configure this behavior and other Starlight for Kafka parameters.

    Connect to a remote cluster

    To connect to a remote Pulsar cluster, provide the full topic name and the bootstrap server URL for your tenant:

    bin/kafka-console-producer.sh --topic "TENANT_NAME/NAMESPACE_NAME/TOPIC_NAME" --bootstrap-server "SERVICE_URL"
  3. When the producer is ready, type a message, and then press Return to send it. Optionally, you can send several messages before starting the consumer to see how it retrieves past messages.

    You can also start the consumer first, and then start the producer afterwards.

  4. In a new terminal window, start the kafka-console-consumer shell:

    bin/kafka-console-consumer.sh \
    --topic quickstart \
    --bootstrap-server localhost:9092 \
    --from-beginning

    Because this command uses the --from-beginning option, when the consumer starts, it retrieves any messages that were sent to the topic before it started.

    The --bootstrap-server is the address of your Pulsar cluster where the Starlight for Kafka extension is installed. Another way to format the localhost address is PLAINTEXT://IP_ADDRESS:9092.

    Alternatively, you can create the consumer with the Pulsar Client CLI. For example, the following command creates a consumer on the quickstart topic with a subscription named my-subscription. The -n 0 option means the consumer continues running instead of closing the connection after consuming a message.

    pulsar-client consume quickstart -s "my-subscription" -n 0
  5. Send a few messages in the kafka-console-producer terminal to generate more traffic on the tenant.

    As you send messages and the consumer receives them, they appear in the terminal where you started the consumer.

  6. Use the Pulsar Admin CLI to inspect your tenant’s activity, and verify that messages were published and consumed on the specified topic.

  7. To exit the producer and consumer shells, press Ctrl+C in each terminal.

Was this helpful?

Give Feedback

How can we improve the documentation?

© Copyright IBM Corporation 2026 | Privacy policy | Terms of use Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: Contact IBM