Getting started with the Starlight for Kafka extension

Starlight for Kafka brings the native Apache Kafka protocol support to Apache Pulsar by introducing a Kafka protocol handler on Pulsar brokers. By adding the Starlight for Kafka protocol handler to your existing Pulsar cluster, you can migrate your existing Kafka applications and services to Pulsar without modifying the code.

If source code is your thing, visit the project’s repo on GitHub.

Architecture reference

If you would like to get deep into how Starlight for Kafka works, read the docs.

Starlight for Kafka Architecture

Establishing the Kafka protocol handler

Before a Kafka client can interact with your Pulsar cluster, you need the Starlight for Kafka protocol handler installed in the cluster. Installation looks a bit different depending on where your Pulsar cluster is running. Choose the option that best fits your needs.

  • Astra Streaming

  • Luna Streaming

  • Self Managed

If you want a working Kafka extension as quickly as possible, this is your best bet. This is also a good option for those that already have a streaming tenant and are looking to extend it.

  1. Sign in to your Astra account and navigate to your streaming tenant.

    Don’t have a streaming tenant? Follow our "Getting started with Astra Streaming" guide.
  2. Go to the "Connect" tab and choose the "Kafka" option.

    Astra Streaming connect kafka

  3. You will see an "Enable Kafka" button. Click to continue.

    Astra Streaming enable kafka

  4. A message will let you know of the additions (and restrictions) that come with using Starlight for Kafka. Astra Streaming enable kafka message

  5. Select the "Enable Kafka" button to confirm your understanding.

Your Astra Streaming tenant is ready for prime time! Continue to the next section of the guide to see it in action.

The Starlight for Kafka extension is included in the luna-streaming-all image used to deploy a Luna cluster. The Luna helm chart makes deploying the Kafka extension quite easy. Follow the "A guide to deploying Luna Streaming with the Starlight for Kafka extension" guide to create a simple Pulsar cluster with the Starlight for Kafka extension ready for use.

Already have your own Pulsar cluster? Or maybe you’re using a standalone cluster? Starlight for Kafka can easily be a part of that cluster! Follow the "Installing the Starlight for Kafka extension" guide.

Messaging with Starlight for Kafka

Starlight for Kafka supports quite a few different use cases. With a Pulsar cluster between producers and consumers you can interchange the type of producer and consumer to fit your needs.

The below examples are using an Astra Streaming tenant as the Kafka bootstrap server. If you are using Luna Streaming or a self-managed tenant, switch the bootstrap server URL for your own.

Retrieve Kafka connection properties in Astra Streaming

In the Astra Streaming portal "Connect" tab, the "kafka" area provides important connection information. You will need this connection information to create a working Kafka client or use the CLI.

Astra Streaming kafka settings

Click the clipboard icon to copy the Kafka connection values, as well as a working token to paste in code.

Produce and consume a message

  • Kafka CLI

  • Kafka Client (Java)

Download the latest Kafka distribution here. With the tarball extracted, the producer and consumer CLIs are in the 'bin' folder.

  1. To get started, let’s set a few variables. If you’ve completed our "Getting started with Astra Streaming" guide, the below values will be a perfect fit for your existing tenant.

    SERVICE_URL="<REPLACE_WITH_BOOTSTRAP_SERVER_URL>"
    TENANT="<REPLACE_WITH_TENANT_NAME>"
    NAMESPACE="<REPLACE_WITH_NAMESPACE>"
    TOPIC="<REPLACE_WITH_TOPIC>"
  2. Now let’s enter those variables in Kafka’s producer shell.

    # cd kafka_2.13-3.3.1
    ./bin/kafka-console-producer.sh --topic "$TENANT/$NAMESPACE/$TOPIC" --bootstrap-server "$SERVICE_URL"
  3. Type in a super memorable message and hit 'enter' to send. Press 'Ctrl-C' to exit the shell.

    > This is my first S4K message.

    A new message has been produced in the provided tenant/namespace/topic and is ready for consumption.

  4. Start the Kafka consumer shell.

    # cd kafka_2.13-3.3.1
    ./bin/kafka-console-consumer.sh --topic "$TENANT/$NAMESPACE/$TOPIC" --from-beginning --bootstrap-server "$SERVICE_URL"
  5. The consumer should immediately find the new message and output its value.

    This is my first S4K message.
  6. Press 'Ctrl-C' to exit the consumer shell.

Wow, you did it! A Kafka producer and consumer with an Apache Pulsar cluster. How about trying the Java client now?

This example uses Maven for the project structure. If you prefer Gradle or another tool, this code should still be a good fit.

Visit our examples repo to see the complete source for this example.
  1. Create a new Maven project.

    mvn archetype:generate \
        -DgroupId=org.example \
        -DartifactId=StarlightForKafkaClient \
        -DarchetypeArtifactId=maven-archetype-quickstart \
        -DinteractiveMode=false
    
    cd StarlightForKafkaClient
  2. Open the new project in your favorite IDE or text editor and add the Kafka client dependency to "pom.xml".

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.3.1</version>
    </dependency>
  3. Open the file "src/main/java/org/example/App.java" and replace the entire contents with the below code. Notice there are class variables that need replacing. Apply the values previously retrieved in Astra Streaming.

    package org.example;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.LongSerializer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class App {
      private static String bootstrapServers = "<REPLACE_WITH_BOOTSTRAP_SERVER_URL>";
      private static String pulsarToken = "<REPLACE_WITH_PULSAR_TOKEN>";
      private static String tenantName = "<REPLACE_WITH_TENANT_NAME>";
      private static final String namespace = "<REPLACE_WITH_NAMESPACE>";
      private static final String topicName = "<REPLACE_WITH_TOPIC>";
      private static final String topic = String.format("persistent://%s/%s/%s", tenantName,namespace,topicName);
    
      public static void main(String[] args) {
    Don’t worry if your editor shows errors, this isn’t a complete program…​ yet.
  4. Bring in the following code to build the configuration that will be used by both the producer and consumer.

        Properties config = new Properties();
        config.put("bootstrap.servers",bootstrapServers);
        config.put("security.protocol","SASL_SSL");
        config.put("sasl.jaas.config", String.format("org.apache.kafka.common.security.plain.PlainLoginModule required username='%s' password='token:%s';", tenantName, pulsarToken));
        config.put("sasl.mechanism","PLAIN");
        config.put("session.timeout.ms","45000");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        config.put("group.id", "my-consumer-group");
  5. Now paste the producer code into the file. This is a very simple flow that sends a single message and awaits acknowledgment.

        KafkaProducer<Long, String> producer = new KafkaProducer<>(config);
    
        final ProducerRecord<Long, String> producerRecord = new ProducerRecord<>(topic, System.currentTimeMillis(), "Hello World");
        producer.send(producerRecord, new Callback() {
          public void onCompletion(RecordMetadata metadata, Exception e) {
            if (e != null)
              System.out.println(String.format("Send failed for record, %s. \nRecord data: %s",e.getMessage(), producerRecord));
            else
              System.out.println("Successfully sent message");
          }
        });
    
        producer.flush();
        producer.close();
  6. Paste the consumer code into the file. This creates a basic subscription and retrieves the latest messages on the topic.

        final KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(config);
    
        consumer.subscribe(Collections.singletonList(topic));
        ConsumerRecords<Integer, String> consumerRecords = consumer.poll(Duration.ofMillis(5000));
    
        System.out.println(String.format("Found %d total record(s)", consumerRecords.count()));
    
        for (ConsumerRecord<Integer, String> consumerRecord : consumerRecords) {
          System.out.println(consumerRecord);
        }
    
        consumer.commitSync();
        consumer.close();
      }
    }
  7. Now you should have a complete program. Let’s see it in action! Build and run the jar with the following terminal commands.

    mvn clean package assembly:single
    java -jar target/StarlightForKafkaClient-1.0-SNAPSHOT-jar-with-dependencies.jar
  8. If all goes as it should, your output will be similar to this:

    Successfully sent message
    
    Found 1 total record(s)
    ConsumerRecord(topic = persistent://my-tenant-007/my-namespace/my-topic, partition = 0, leaderEpoch = null, offset = 22, CreateTime = 1673545962124, serialized key size = 8, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key =   xxxxx, value = Hello World)

Congrats! You have just used the Kafka client to send and receive messages in Pulsar. Next stop is the moon!

The Starlight for Kafka documentation provides more specifics about the below topics and more. Visit those for more detail.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com