Messaging implementation of Starlight for Kafka

This document describes how to implement the Starlight for Kafka protocol handler.

Topic

In Kafka, all topics are stored in one flat namespace, but in Pulsar, topics are organized in hierarchical multi-tenant namespaces. Starlight for Kafka introduces a setting kafkaNamespace in the broker configuration which allows administrators to map Kafka topics to Pulsar topics.

To let Kafka users leverage the multi-tenancy of Apache Pulsar, a Kafka user can specify a Pulsar tenant and namespace as the topic prefix like Pulsar topics:

Kafka topic name Tenant Namespace Short topic name

my-topic

<kafkaTenant>

<kafkaNamespace>

my-topic

my-tenant/my-ns/my-topic

my-tenant

my-ns

my-topic

persistent://my-tenant/my-ns/my-topic

my-tenant

my-ns

my-topic

When kafkaEnableMultiTenantMetadata is enabled, <kafkaTenant> is the Pulsar tenant defined by the username passed during login.

Topic lookup

Starlight for Kafka uses the same topic lookup approach for the Kafka request handler and the Pulsar request handler. The request handler does topic discovery to look up all ownerships for the requested topic partitions, and responds with the ownership information as part of the Kafka TopicMetadata returned to Kafka clients.

When you are using the Proxy module, the proxy answers its own advertised address as owner of every topic. The Proxy automatically dispatches every request to the Broker that owns the topic, so Kafka clients do not have knowledge of the internal topology of the cluster.

Message

Kafka and Pulsar messages have keys, values, timestamps, and headers, but in Pulsar the headers are called properties. Starlight for Kafka converts these fields automatically between Kafka messages and Pulsar messages. The mapping relationships are as below.

Pulsar to Kafka message field mapping relationships

Pulsar Kafka Note

Ordering key
Partition key

Key

Ordering key has higher priority, which means:
- If a Pulsar message has an ordering key, the ordering key is converted to a key.
- If a Pulsar message does not have an ordering key, the partition key is converted to a key.
For example,
- If a Pulsar message has an ordering key (xxx) and a partition key (yyy), the message key of the converted Kafka message is xxx rather than yyy.
- If a Pulsar message does not have an ordering key but has a partition key (yyy), the message key of the converted Kafka message is yyy.

Event time
Publish time

Timestamp

Event time has higher priority, which means:
- If a Pulsar message has an event time, the event time is converted to timestamp.
- If a Pulsar message does not have an event time, the publish time is converted to timestamp.
For example,
- If a Pulsar message has an event time (1628826964820) and a publish time (1628826964821), the message key of the converted Kafka message is 1628826964820 rather than 1628826964821.
- If a Pulsar message does not have an event time but has a publish time (1628826964821), the message key of the converted Kafka message is 1628826964821.

NULL value

NULL value

If the field of a Pulsar message is NULL, the converted Kafka message is NULL.

Kafka to Pulsar message field mapping relationships

Pulsar Kafka Note

Key

Ordering key and partition key

If a Kafka message has a key, both ordering key and partition key are set for the converted Pulsar message.
If a Kafka message does not have a key, both ordering key and partition key are not set for the converted Pulsar message.

Timestamp

Event time and publish time

If a Kafka message has a timestamp, both event time and publish time are set for the converted Pulsar message.
If a Kafka message does not have a timestamp, both event time and publish time are not set for the converted Pulsar message.

NULL value

NULL value

If the field of a Kafka message is NULL, the converted Pulsar message is NULL.

Message ID and offset

In Kafka, each message is assigned with an offset once the message is successfully produced to a topic partition.
In Pulsar, each message is assigned with a MessageID. The message ID consists of ledger-id, entry-id, and batch-index components.
Starlight for Kafka uses the same approach in the Pulsar-Kafka wrapper to convert a Pulsar MessageID to an offset, and vice versa.

Produce Messages

When the Kafka request handler receives produced messages from a Kafka client, it converts Kafka messages to Pulsar messages by mapping the fields (such as the key, value, timestamp and headers) one by one, and uses the ManagedLedger append API to append those converted Pulsar messages to BookKeeper. Converting Kafka messages to Pulsar messages allows existing Pulsar applications to consume messages produced by Kafka clients.

Consume Messages

When the Kafka request handler receives a consumer request from a Kafka client, it opens a non-durable cursor to read the entries starting from the requested offset. The Kafka request handler converts the Pulsar messages back to Kafka messages to allow existing Kafka applications to consume the messages produced by Pulsar clients.

Group coordinator & offset management

Pulsar does not have a centralized group coordinator for assigning partitions to consumers of a consumer group or managing offsets for each consumer group. In Pulsar, partition assignment is managed by the broker on a per-partition basis, and the offset management is done by storing the acknowledgements in cursors by the owner broker of that partition.

To be fully compatible with Kafka clients, Starlight for Kafka implements the Kafka group coordinator by storing the coordinator group changes and offsets in a system topic called kafkaTenant/kafka/consumer_offsets in Pulsar.

This bridges the gap between Pulsar and Kafka and allows use of existing Pulsar tools and policies to manage subscriptions and monitor Kafka consumers. Starlight for Kafka adds a background thread in the implemented group coordinator to periodically synchronize offset updates from the system topic to Pulsar cursors. Therefore, a Kafka consumer group is effectively treated as a Pulsar subscription to allow existing Pulsar tools to be used for managing Kafka consumer groups.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com