Starlight for Kafka security

This page covers enabling authentication features for Starlight for Kafka.

Starlight for Kafka supports SASL/PLAIN authentication, JWT authorization, and SSL encryption.

Starlight for Kafka is installed with no authentication, authorization, or encryption, but these can be enabled to improve security.

Currently, these features are only available in Java.

Authentication

Starlight for Kafka authentication uses the Kafka SASL/PLAIN mechanisms and Pulsar token-based mechanisms.

OAUTHBEARER authentication is also available.

Enabling authentication for Starlight for Kafka requires enabling authentication for the following components:

  • Pulsar brokers

  • Starlight for Kafka (some configurations of S4K rely on the configurations of Pulsar brokers)

  • Kafka clients

Currently, Starlight for Kafka only supports the PLAIN SASL mechanism, but there are more to come.

PLAIN authentication

Authenticate Starlight for Kafka with the PLAIN SASL mechanism, follow the steps below.

  1. Enable authentication on the Pulsar broker. For the PLAIN mechanism, Kafka authentication is forwarded to the JWT authentication of Pulsar, so you need to configure the JWT authentication by setting the following properties in conf/broker.conf.

    Modify conf/standalone.conf if you’re using a standalone Pulsar deployment.

    1. Enable authentication for the Pulsar broker.

      authenticationEnabled=true
      authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
    2. Enable authentication between the Pulsar broker and Starlight for Kafka.

      brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.AuthenticationToken
      brokerClientAuthenticationParameters=token:<token-of-super-user-role>
      superUserRoles=<super-user-roles>
    3. Specify the key. For more information, see Enable Token Authentication on Brokers.

      • If using a secret key, set the property as below.

        tokenSecretKey=file:///path/to/secret.key
      • To pass the key inline, set the property as below.

        tokenSecretKey=data:;base64,FLFyW0oLJ2Fi22KKCm21J18mbAdztfSHN/lAT5ucEKU=
      • If using a public/private key, set the property as below.

        tokenPublicKey=file:///path/to/public.key
  2. Enable authentication on Starlight for Kafka. Set the following property in the conf/broker.conf:

    saslAllowedMechanisms=PLAIN
  3. Enable authentication on the Kafka client. To forward your credentials, SASL-PLAIN is used on the Kafka client side. To enable SASL-PLAIN, set the following properties through Kafka JAAS.

Property Description Example Value

username

username

The username of Kafka JAAS is tenant/namespace or tenant, where Kafka’s topics are stored in Pulsar.

password

password must be your token authentication parameters from Pulsar.
The token can be created by Pulsar token tools. The role is the subject for the token. It is embedded in the created token and the broker can get role by parsing this token.

token:xxx

security.protocol=SASL_PLAINTEXT
# or security.protocol=SASL_SSL if SSL connection is used
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule \
required username="public/default" password="token:xxx";

For more on passing credentials with JAAS, see Configuring SASL/PLAIN.

OAUTHBEARER authentication

Authenticate Starlight for Kafka using the OAUTHBEARER mechanism by following the steps below.

  1. Enable authentication on the Pulsar broker. Set the following properties in the conf/broker.conf or conf/standalone.conf file.

    The properties are the same as PLAIN authentication except for brokerClientAuthenticationPlugin and brokerClientAuthenticationParameters.

    Property Description Required or optional Example value

    audience

    An OAuth 2.0 "resource server" identifier for the Pulsar cluster

    Optional

    https://broker.example.com

    issuerUrl

    URL of the authentication provider which allows the Pulsar client to obtain an access token

    Required

    https://accounts.google.com

    privateKey

    URL to a JSON credential file
    The following pattern formats are supported:
    * file:///path/to/file
    * file:/path/to/file
    * data:application/json;base64,<base64-encoded value>

    Required

    file:///path/to/credentials_file.json

    scope

    The scope of the access request that is expressed as a list of space-delimited, case-sensitive strings

    Optional

    api://pulsar-cluster-1/.default

    type

    OAuth 2.0 authentication type
    The default value is client_credentials

    Optional

    client_credentials

    Here is an example conf/broker.conf file.

    authenticationEnabled=true
    authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
    superUserRoles=<super-user-roles>
    brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
    brokerClientAuthenticationParameters={"type":"client_credentials","privateKey":"file:///path/to/credentials_file.json""issuerUrl":"<issuer-url>","audience":"<audience>"}
    tokenPublicKey=<token-public-key>
  2. Enable authentication on S4K. Set the following property in the conf/broker.conf or conf/standalone.conf file.

    saslAllowedMechanisms=OAUTHBEARER
  3. Specify the Kafka server callback handler. For the OAUTHBEARER mechanism, use AuthenticationProviderToken or customize your authentication provider to process the access tokens from an OAuth 2.0 server. S4K provides a built-in AuthenticateCallbackHandler that uses the authentication provider of Pulsar for authentication. You need to configure the following properties in the conf/kop-handler.properties file.

    # Use S4K's built-in handler:
    kopOauth2AuthenticateCallbackHandler=io.streamnative.pulsar.handlers.kop.security.oauth.OauthValidatorCallbackHandler
    # Java property configuration file of OauthValidatorCallbackHandler
    kopOauth2ConfigFile=conf/kop-handler.properties
  4. Specify the authentication method name of the provider (oauth.validate.method) in the conf/kop-handler.properties file. By default, it uses the token authentication method.

    • If you have configured the token authentication method, you do not need to specify the authentication method name.

    • If you use AuthenticationProviderToken, since AuthenticationProviderToken#getAuthMethodName() returns token, set the oauth.validate.method as the token.

    • If you use other providers, set the oauth.validate.method as the result of getAuthMethodName().

      oauth.validate.method=token
  5. Enable authentication on Kafka client. Install the S4K built-in callback handler to your local Maven repository.

  6. To retrieve an access token from an OAuth 2.0 server, use the S4K built-in callback handler instead of the Kafka login callback handler.

    mvn clean install -pl oauth-client -DskipTests
  7. Add the following dependencies to the pom.xml file. For stable releases, the pulsar.version is the same as the kop.version.

    <dependency>
        <groupId>com.datastax.oss</groupId>
        <artifactId>oauth-client</artifactId>
        <version>${protocol_version}</version>
    </dependency>
  8. Configure the producer or consumer with the following required properties.

    Property Description Example value Note

    sasl.login.callback.handler.class

    Class of SASL login callback handler

    com.datastax.oss.kafka.oauth.OauthLoginCallbackHandler

    Set this value to the value in the example properties configuration below.

    security.protocol

    Security protocol

    SASL_PLAINTEXT

    sasl.mechanism

    SASL mechanism

    OAUTHBEARER

    sasl.jaas.config

    JAAS configuration

    org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule

    oauth.issuer.url

    URL of the authentication provider which allows the Pulsar client to obtain an access token.

    https://accounts.google.com

    This property is the same to the issuerUrl property as Pulsar client credentials

    oauth.credentials.url

    URL to a JSON credentials file.

    The following pattern formats are supported:
    * file:///path/to/file
    * file:/path/to/file
    * data:application/json;base64,<base64-encoded value>
    * file:///path/to/credentials_file.json

    This property is the same as the privateKey property in Pulsar client credentials

    oauth.audience

    OAuth 2.0 "resource server" identifier for the Pulsar cluster.

    https://broker.example.com

    This property is the same as the audience property in <a href="http://pulsar.apache.org/docs/en/security-oauth2/#authentication-types">Pulsar client credentials

    oauth.scope

    The scope of the access request that is expressed as a list of space-delimited, case-sensitive strings.

    api://pulsar-cluster-1/.default

    This property is the same as the scope property in Pulsar client credentials

Here is an example properties configuration.

sasl.login.callback.handler.class=com.datastax.oss.kafka.oauth.OauthLoginCallbackHandler
security.protocol=SASL_PLAINTEXT # or security.protocol=SASL_SSL if SSL connection is used
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule \
   required oauth.issuer.url="https://accounts.google.com"\
   oauth.credentials.url="file:///path/to/credentials_file.json"\
   oauth.audience="https://broker.example.com";

Authorization

To enable authorization on Starlight for Kafka, ensure that authentication is enabled first.

For more on authorization in Pulsar, see Pulsar Authorization.

  1. Enable authorization and assign superusers for the Pulsar broker.

    authorizationEnabled=true
  2. Generate JWT tokens. A token is the credential associated with a user. The association is done through the principal or role. In the case of JWT tokens, this field is typically referred as subject, though they are exactly the same concept.

    Use this command to require the generated token to have a subject field set:

    bin/pulsar tokens create --secret-key file:///path/to/secret.key \
     --subject <user-role>

    This command will print the token string on stdout.

  3. Grant permission to a specific role. The token itself does not have any permissions associated with it. The authorization engine determines whether the token should have permissions or not. Once you have created the token, you can grant permission for this token.
    For example, to grant a role to the user-role created above:

    bin/pulsar-admin --auth-plugin "org.apache.pulsar.client.impl.auth.AuthenticationToken" --auth-params "token:<token-of-super-user-role>" \
        namespaces grant-permission <tenant>/<namespace> \
        --role <user-role> \
        --actions produce,consume

SSL encryption

This section explains connecting and configuring Starlight for Kafka with SSL. There are two methods available:

  • (Recommended) Use the TLS certificates already configured for the Pulsar TLS endpoint to also connect the Kafka client. This is a new feature of Starlight for Kafka designed to make TLS connection fast and painless. Proceed to Create SSL connection with Pulsar.

  • Create dedicated TLS certificates and modify the Pulsar broker and Kafka client. Instructions for this method are here.

Create SSL configuration with Pulsar

Starlight for Kafka features fast, painless TLS configuration between Pulsar and Kafka. Instead of creating different certificates for the Pulsar broker and Kafka client, simply modify broker.conf to pick up the existing Pulsar TLS certifications and configurations.

  1. Configure TLS on the Pulsar broker. For more, see Create TLS certificates.

  2. Expose TLS endpoints by adding the following configurations to broker.conf:

    kopTlsEnabledWithBroker=true
    kafkaListeners=PLAINTEXT://127.0.0.1:9092, SSL://PRIVATE-HOSTNAME-OF-THE-BROKER:9093
    kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092, SSL://PUBLIC-HOSTNAME-OF-THE-BROKER:9093

    These settings will enable TLS on the broker, and Starlight for Kafka will pick up the TLS certificates automatically.

  3. Proceed to Test Starlight for Kafka to test your configuration.

Create SSL configuration manually

The following example shows how to manually configure Starlight for Kafka with SSL.

Starlight for Kafka supports PLAINTEXT and SSL configuration types for Kafka listeners. SSL listeners are added to the comma-separated list of URIs in kafkaListeners, as below.

kafkalisteners=PLAINTEXT://localhost:9092,SSL://localhost:9093

For more on generating SSL keys for Kafka brokers, see Kafka SSL.

  1. Create SSL related keys. This example creates the related CA and JKS files.

    # Input a password, for example "server-keystore".
    keytool -keystore server.keystore.jks -alias localhost -validity 365 -keyalg RSA -genkey
    # Input a password, for example "server".
    openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
    # Input a password, for example "server-truststore"
    keytool -keystore server.truststore.jks -alias CARoot -import -file ca-cert
    # Input a password, for example "client-truststore"
    keytool -keystore client.truststore.jks -alias CARoot -import -file ca-cert
    # Input the password of server.keystore.jks: "server-keystore"
    keytool -keystore server.keystore.jks -alias localhost -certreq -file cert-file
    # The password followed by `-passin pass:` is the password of ca-cert: "server"
    openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:server
    # You must input the password of server.keystore.jks: "server-keystore"
    keytool -keystore server.keystore.jks -alias CARoot -import -file ca-cert
    # You must input the password of server.keystore.jks: "server-keystore"
    keytool -keystore server.keystore.jks -alias localhost -import -file cert-signed

    In the above example, we have input four passwords:

    • server-keystore for server.keystore.jks

    • server for ca-cert and ca-key

    • server-truststore for server.truststore.jks

    • client-truststore for client.truststore.jks

  2. Configure the Pulsar broker. In conf/broker.conf, add the related configurations using the JKS configurations created in the previous step.

    listeners=PLAINTEXT://localhost:9092,SSL://localhost:9093
    
    # You need to use the full path of server.keystore.jks
    kopSslKeystoreLocation=server.keystore.jks
    kopSslKeystorePassword=server-keystore
    kopSslKeyPassword=server-keystore
    # You need to use the full path of server.truststore.jks
    kopSslTruststoreLocation=server.truststore.jks
    kopSslTruststorePassword=server-truststore
  3. Configure the Kafka client. Create a file named client-ssl.properties in kafka/config with the following configuration:

    security.protocol=SSL
    # Include the full path of client.truststore.jks
    ssl.truststore.location=client.truststore.jks
    ssl.truststore.password=client-truststore
    # The identification algorithm must be empty
    ssl.endpoint.identification.algorithm=
  4. Verify the console-producer and the console-consumer send messages when started with client-ssl.properties:

kafka-console-producer.sh --broker-list localhost:9093 --topic test --producer.config client-ssl.properties
kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --consumer.config client-ssl.properties

For more on configuring Kafka, see Configure Kafka client.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com