Getting started with the Starlight for Kafka extension

Starlight for Kafka brings the native Apache Kafka® protocol support to Apache Pulsar™ by introducing a Kafka protocol handler on Pulsar brokers. By adding the Starlight for Kafka protocol handler to your existing Pulsar cluster, you can migrate your existing Kafka applications and services to Pulsar without modifying the code.

If source code is your thing, visit the project’s repo on GitHub.

Architecture reference

If you would like to get deep into how Starlight for Kafka works, read the docs.

Starlight for Kafka Architecture

Establishing the Kafka protocol handler

Before a Kafka client can interact with your Pulsar cluster, you need the Starlight for Kafka protocol handler installed in the cluster. Installation looks a bit different depending on where your Pulsar cluster is running. Choose the option that best fits your needs.

  • Astra Streaming

  • Luna Streaming

  • Self Managed

If you want a working Kafka extension as quickly as possible, this is your best bet. This is also a good option for those that already have a streaming tenant and are looking to extend it.

  1. Sign in to your Astra account and navigate to your streaming tenant.

    Don’t have a streaming tenant? Follow our "Astra Streaming quickstart" guide.
  2. Go to the "Connect" tab and choose the "Kafka" option.

  3. Click "Enable Kafka".

  4. A message will let you know of the additions (and restrictions) that come with using Starlight for Kafka.

  5. Select the "Enable Kafka" button to confirm your understanding.

Your Astra Streaming tenant is ready for prime time! Continue to the next section of the guide to see it in action.

The Starlight for Kafka extension is included in the luna-streaming-all image used to deploy a Luna cluster. The Luna helm chart makes deploying the Kafka extension quite easy. Follow the "Using Starlight for Kafka with Luna Streaming" guide to create a simple Pulsar cluster with the Starlight for Kafka extension ready for use.

Already have your own Pulsar cluster? Or maybe you’re using a standalone cluster? Starlight for Kafka can easily be a part of that cluster! Follow the "Installing the Starlight for Kafka extension" guide.

Messaging with Starlight for Kafka

Starlight for Kafka supports quite a few different use cases. With a Pulsar cluster between producers and consumers you can interchange the type of producer and consumer to fit your needs.

The below examples are using an Astra Streaming tenant as the Kafka bootstrap server. If you are using Luna Streaming or a self-managed tenant, switch the bootstrap server URL for your own.

Retrieve Kafka connection properties in Astra Streaming

In the Astra Streaming portal "Connect" tab, the "kafka" area provides important connection information. You will need this connection information to create a working Kafka client or use the CLI.

Astra Streaming kafka settings

Click the clipboard icon to copy the Kafka connection values, as well as a working token to paste in code.

Produce and consume a message

  • Kafka CLI

  • Kafka Client (Java)

Download the latest Kafka distribution here. With the tarball extracted, the producer and consumer CLIs are in the 'bin' folder.

  1. To get started, let’s set a few variables. If you’ve completed our "Getting started with Astra Streaming" guide, the below values will be a perfect fit for your existing tenant.

    SERVICE_URL="<REPLACE_WITH_BOOTSTRAP_SERVER_URL>"
    TENANT="<REPLACE_WITH_TENANT_NAME>"
    NAMESPACE="<REPLACE_WITH_NAMESPACE>"
    TOPIC="<REPLACE_WITH_TOPIC>"
  2. Now let’s enter those variables in Kafka’s producer shell.

    # cd kafka_2.13-3.3.1
    ./bin/kafka-console-producer.sh --topic "$TENANT/$NAMESPACE/$TOPIC" --bootstrap-server "$SERVICE_URL"
  3. Type in a super memorable message and hit 'enter' to send. Press 'Ctrl-C' to exit the shell.

    > This is my first S4K message.

    A new message has been produced in the provided tenant/namespace/topic and is ready for consumption.

  4. Start the Kafka consumer shell.

    # cd kafka_2.13-3.3.1
    ./bin/kafka-console-consumer.sh --topic "$TENANT/$NAMESPACE/$TOPIC" --from-beginning --bootstrap-server "$SERVICE_URL"
  5. The consumer should immediately find the new message and output its value.

    This is my first S4K message.
  6. Press 'Ctrl-C' to exit the consumer shell.

This example uses Maven for the project structure. If you prefer Gradle or another tool, this code should still be a good fit.

For complete source code examples, see the Astra Streaming examples repository.

  1. Create a new Maven project.

    mvn archetype:generate \
        -DgroupId=org.example \
        -DartifactId=StarlightForKafkaClient \
        -DarchetypeArtifactId=maven-archetype-quickstart \
        -DinteractiveMode=false
    
    cd StarlightForKafkaClient
  2. Open the new project in your IDE or text editor, and then add the Kafka client dependency to pom.xml:

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.3.1</version>
    </dependency>
  3. Open the file src/main/java/org/example/App.java, and then enter the following code. If you cloned the example repo, replace the entire contents of App.java with the following code. Your editor will report an error because this isn’t a complete script yet.

    Replace placeholders with the values you previously retrieved from Astra Streaming.

    package org.example;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.LongSerializer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class App {
      private static String bootstrapServers = "<REPLACE_WITH_BOOTSTRAP_SERVER_URL>";
      private static String pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";
      private static String tenantName = "<REPLACE_WITH_TENANT_NAME>";
      private static final String namespace = "<REPLACE_WITH_NAMESPACE>";
      private static final String topicName = "<REPLACE_WITH_TOPIC>";
      private static final String topic = String.format("persistent://%s/%s/%s", tenantName,namespace,topicName);
    
      public static void main(String[] args) {
  4. Add the following code that builds the configuration that will be used by both the producer and consumer:

        Properties config = new Properties();
        config.put("bootstrap.servers",bootstrapServers);
        config.put("security.protocol","SASL_SSL");
        config.put("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username='%s' password='token:%s';", tenantName, pulsarToken));
        config.put("sasl.mechanism","PLAIN");
        config.put("session.timeout.ms","45000");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        config.put("group.id", "my-consumer-group");
  5. Add the producer code, which is a simple flow that sends a single message and awaits acknowledgment:

        KafkaProducer<Long, String> producer = new KafkaProducer<>(config);
    
        final ProducerRecord<Long, String> producerRecord = new ProducerRecord<>(topic, System.currentTimeMillis(), "Hello World");
        producer.send(producerRecord, new Callback() {
          public void onCompletion(RecordMetadata metadata, Exception e) {
            if (e != null)
              System.out.println(String.format("Send failed for record, %s. \nRecord data: %s",e.getMessage(), producerRecord));
            else
              System.out.println("Successfully sent message");
          }
        });
    
        producer.flush();
        producer.close();
  6. Add the consumer code, which creates a basic subscription and retrieves the latest messages on the topic:

        final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(config);
    
        consumer.subscribe(Collections.singletonList(topic));
        ConsumerRecords<Integer, String> consumerRecords = consumer.poll(Duration.ofMillis(5000));
    
        System.out.println(String.format("Found %d total record(s)", consumerRecords.count()));
    
        for (ConsumerRecord<Integer, String> consumerRecord : consumerRecords) {
          System.out.println(consumerRecord);
        }
    
        consumer.commitSync();
        consumer.close();
      }
    }
  7. Build and run a JAR file for the complete program:

    mvn clean package assembly:single
    java -jar target/StarlightForKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar
    Result
    Successfully sent message
    
    Found 1 total record(s)
    ConsumerRecord(topic = persistent://my-tenant-007/my-namespace/my-topic, partition = 0, leaderEpoch = null, offset = 22, CreateTime = 1673545962124, serialized key size = 8, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key =   xxxxx, value = Hello World)

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax, an IBM Company | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com