About Change Data Capture (CDC) workflows with Apache Cassandra® and Apache Pulsar™

Change Data Capture (CDC) is a design pattern used in software development to capture and propagate changes made to data in a system. The CDC pattern is commonly used in real-time data streaming applications to enable near-real-time processing of data changes. In a typical CDC implementation, a change to a row of data (insert, update, delete) is detected and recorded. The change (or mutation) is made available to downstream systems as an event for further processing. This allows applications to react quickly to changes in the data while not adding unneeded load on the data store, enabling real-time data processing and analytics.

Before learning the specifics of CDC, you must first understand the components needed to complete a CDC workflow.

Apache Pulsar source connectors

Source connectors in Apache Pulsar are responsible for ingesting data from external sources into the Pulsar system. They can be used to collect data from a variety of sources including databases, message queues, and file systems. When the source connector "sees" data, it streams the data to a Pulsar topic. This enables users to easily integrate data from disparate sources into their Pulsar-based applications. Source connectors make it easy to ingest, process, and analyze large volumes of data from a variety of sources into Pulsar.

Pulsar offers extensible APIs where developers can use a defined interface to develop their own connector. The interface takes much of the boilerplate burdens away from a developer and gets them right to the purpose of the connector. Creating a connector means adding in the know-how to work with data from the source and adapt it to produce a compliant message with the Pulsar client.

As explained later in this guide, among the processes needed to capture change data, the DataStax Cassandra Source Connector (CSC) for Apache Pulsar™ is critical. This is one of many available source connectors for Pulsar, but is specifically designed to work with Cassandra and its CDC features.

To run a source connector, you provide configuration about what data is selected, how to connect with the upstream system, and the destination topic for the new message. The source connector takes care of producing the message. Pulsar source connectors run as Pulsar functions within the cluster, so many of the features of functions apply, such as the number of instances to run and how to configure the function instance running environment. Metrics and logs for a source connector are automatically made a part of the cluster.

Monitor source connectors

Monitoring a source connector includes two areas: health and performance. Every connector in Pulsar emits basic metrics about its health, including stats like the number of records received from the source, and the number of messages written to the destination topic. Connectors also emit debugging metrics like the number of exceptions thrown by the source. Performance metrics include health metrics as well as specific knowledge about the source. Refer to the Pulsar connectors metrics documentation for a complete list and explanation of metrics.

Source connector logs

Most Pulsar source connectors emit logs that show lifecycle events as well as custom events specific to the connector type. All logs are handled the same way core cluster logs are handled. By default, they are written to the console and collected by log4j destinations. If you are using function workers, you can access log files on their disk. Refer to the Apache Pulsar connector debugging guide for more information.

Pulsar schemas and the schema registry

The Pulsar schema registry is a feature of a Pulsar cluster that manages the schemas of messages sent and received on Pulsar topics. In Pulsar, messages are stored as bytes. Schemas provide a way to serialize and deserialize messages with a particular structure or type, allowing for interoperability between different systems.

The schema registry in Pulsar stores and manages schema definitions for all message types sent and received in Pulsar. The schema registry enforces schema compatibility rules, such as requiring a producer to send messages that conform to a certain schema, or rejecting messages that don’t match the schema.

Schemas follow a primitive or complex type. Primitive schemas are simple data types like bool, int, string, and float. Because Pulsar is written in Java, that is where the primitives are based. When a different client runtime is used, a conversion might need to occur. Refer to the Pulsar primitive types table for a full reference.

Complex schemas introduce a more structured way of messaging. The two types of complex messages are KeyValue and Struct. KeyValue is JSON formatted text that offers a separation of custom labels and their values. Struct is a custom class definition set as Avro, Json, or Protobuf.

KeyValue offers an interesting way to encode a message called "Separated". This option separates a message key and the message payload. This in turn has the option to store message key information as a different data type than the message payload. It also offers special compression capabilities. CDC takes advantage of separating KeyValue messages when it produces both the event and data topic.

Namespace schema configurations

In the context of CDC there are a few schema configurations of note. All of these are specific to the namespace where the event and data topics are logically located.

schema-compatibility-strategy

This setting instructs the Pulsar broker how to handle new schemas introduced to existing topics by producers. This is relevant to CDC when a table’s design is changed. For example, if a new column is added, the registered schema is changed to include that new value. The chosen schema-compatibility-strategy decides if the namespace allows this. If schema validations are enabled, this option decides what strategy is used. The Pulsar default strategy is "FULL" which means existing optional table columns can be modified. For more information, see the Apache Pulsar schema compatibility check strategies.

allow-auto-update-schema

Given the compatibility strategy, this setting is a flag that determines if an update to the schema is generally allowed. CDC sets this to true so changes in a table’s design can automatically propagate to the topic.

schema-autoupdate-strategy

When auto update is enabled (set to true) this setting directs the Broker how to ensure consumers of a topic are able to process messages. If a consumer attempts to connect with a schema that doesn’t match the current schema, this strategy decides if it is allowed to receive messages. CDC sets this to 'BACKWARDTRANSITIVE', which means if optional table columns have been added or a column has been removed, the old schema is allowed.

schema-validation-enforce

This flag limits how producers and consumers are allowed to be configured. When enabled (true) producer and consumer clients must have a schema set before sending the message. When disabled (false) Pulsar allows producers and consumers without a set schema to send or receive messages. CDC disables (false) this option, so producers and consumers don’t have to know the message schema ahead of time.

The DataStax Change Agent for Apache Cassandra® and DataStax Cassandra Source Connector (CSC) for Apache Pulsar™

The Change Agent for Cassandra is a process running on each node in a Cassandra cluster that watches for data changes on tables that have enabled the CDC feature. Using Cassandra’s commitlog_sync option, the agent periodically syncs a separate log in a special "cdc_raw" directory. Each log entry is a CDC event. The Change Agent for Cassandra creates a new event message containing the row coordinates of the changed data and produces the message to a downstream Pulsar cluster.

In Pulsar, each table that has CDC enabled also has a corresponding DataStax CSC for Pulsar. This is unlike the Change Agent for Cassandra where the process runs on each Cassandra node, keeping a log of all table changes. Each table-specific Cassandra source connector subscribes to the events topic the agent is producing messages to. When the connector "sees" a message for its table, it uses the row coordinates within the message to retrieve the mutated data from Cassandra and create a new message with the specifics. That new message is written to a data topic where others can subscribe and receive CDC messages.

Event deduplication

A particular advantage in the DataStax CSC for Pulsar is its deduplication feature. The Apache Pulsar built-in deduplication capabilities aren’t used in the message flow because CDC needs a finer grain control to detect duplicates. As the Change Agent for Cassandra discovers a new commit log, an authentic identifier is created using the MD5 hash algorithm. That key identifier is added to the event message.

When message consumers, like the DataStax CSC for Pulsar, connect to the event topic, they establish a subscription type. Pulsar has four types of subscriptions: Exclusive, shared, failover, and key_shared. In a typical CDC flow, the Cassandra source connector has multiple instances running in parallel. When multiple consumers are a part of a key_shared subscription, Pulsar delivers a duplicate hash key to the same consumer no matter how many times it’s sent.

When a Cassandra cluster has multiple hosts (with multiple commit logs), and they all use the same mutation to calculate the same hash key, then the same consumer always receives it. Each Cassandra source connector keeps a cache of hashes it has seen and ensures duplicates are dropped before producing the data message.

Learn more about the Pulsar key_shared subscription type in the Apache Pulsar documentation.

Understand the CDC workflow

Now that you understand the different components used in the CDC pattern, the following steps summarize a generic CDC workflow, including configuring the workflow and producing messages:

  1. Create a Pulsar tenant to hold CDC messages with the following namespace and topics:

    1. Create a namespace or use the default namespace.

    2. Create a topic for event messages.

    3. Create a topic for data messages.

  2. Start the Cassandra source connector in Pulsar by setting the destination topic (the data messages topic), the event topic, and Cassandra connection info, along with other settings.

  3. Configure the Cassandra change agent with a working directory, Pulsar service URL, and other settings in the Cassandra node. A restart is required.

  4. Create a Cassandra table and enable CDC.

  5. Insert a row of data into the table to produce messages.

    From here the following sequence occurs:

    1. The change agent detects a mutation to the table, and then it writes a log.

    2. The log is converted to an event message, and then it is written to the events topic.

    3. The source connector completes the flow by producing a final change message to the data topic.

Cassandra table schema evolution with CDC

This section describes how table schema changes are handled in the DataStax Cassandra Source Connector (CSC) for Apache Pulsar™.

Message schema translation

The message schema is of particular importance in completing the CDC pattern. Initially, it is set to match the Cassandra table’s schema as closely as possible, but some data types aren’t known in Pulsar (or more accurately, not known in Avro). To overcome this, there are adaptations performed when the DataStax CSC for Pulsar builds the Pulsar message. Some types aren’t compatible and cannot be adapted. In this case, those columns of data are dropped while creating the Pulsar message.

To better understand how exactly the Change Agent for Cassandra constructs the event message, here is the pseudo code of how the schema is created:

org.apache.pulsar.common.schema.SchemaType.AVRO GenericRecord = all key fields in the Cassandra table
org.apache.cassandra.schema TableMetadata = convert a log entry to a mutation instance

Schema<KeyValue<byte[], MutationValue>> keyValueSchema = Schema.KeyValue(
  (convert GenericRecord to byte[]),
  (set TableMetadata to SchemaType.AVRO),
  KeyValueEncodingType.SEPARATED
);

Notice the two types used in KeyValue. The byte array is an Avro-encoded record that documents the table’s primary key(s). The MutationValue is an extended Avro record that has direction on what changed and how to get its specifics.

CDC sets the initial topic schema on the first change it detects. Once the initial topic schema has been set, an ideal path has been established to create change data events in Pulsar.

Inevitably, table designs change: Columns are added, updated, or removed. When these changes occur, the components that are part of the CDC flow must adapt to preserve the ideal path of event data. This can become quite a complex set of decisions and as such, there are specific changes a CDC flow can tolerate before the flow needs to be re-created entirely.

Here is a summary of how the data message schema is created:

  1. Receive GenericRecord event message of type KeyValue.

  2. Use a Cassandra client to query for row data.

  3. Convert data as GenericRecord of type KeyValue and return.

  4. The connector interface produces a new message to the destination topic.

Add a table column

This is the easiest of scenarios for table design change. Assuming the new column’s data type is compatible with the source connector, a new schema replaces the existing and message compatibility is retained. Note that because the schema auto-update compatibility strategy is set to BACKWARD_TRANSITIVE, the new column must be optional, which is the default of any non-primary-key column.

For example:

ALTER TABLE [keyspace_name.] table_name ADD some-column text;

Update a table column

Altering a table column includes renaming a column or changing a column’s type. Assuming the new column’s data type is compatible with the source connector, a new schema replaces the existing schema and message compatibility is retained. Once a table has been created, a table’s primary key(s) cannot be modified. This fits well with the CDC pattern.

While technically updating columns is possible when CDC is enabled, it isn’t recommended. Instead, changes to a Cassandra table should be additive only. If you are familiar with data migrations, this concept is the same.

To change the name or type of table column, add a new column. The resulting event messages have a reference to both columns, and you can handle this migration downstream.

Note that this recommendation assumes a schema compatibility strategy of BACKWARD_TRANSITIVE. If you are using a different schema compatibility strategy, table updates are handled differently.

Remove a table column

Removing a table column is a standard command in CQL. The resulting CDC data messages won’t include that data anymore.

For example:

ALTER TABLE [keyspace_name.] table_name DROP some-column;

Consume change data with Apache Pulsar

This section describes how to consume change data with Apache Pulsar.

Pulsar clients

Each client handles message consumption a little differently but there is one overall pattern to follow. As covered in the previous sections, a CDC message arrives as an Avro GenericRecord of type KeyValue. Typically, the first step is to separate the key and value portions of the message. You can find the Cassandra table’s key fields in the key of the record and the change data in the value portion of the record. Both of which are Avro records themselves. From there you’ll want to deserialize the Avro record and extract the interesting info.

The following example projects demonstrate implementations for each runtime consuming messages from a CDC data topic. Make sure to set the connection details according to your cluster configuration.

  • Go

  • Java

  • Python

  • C#

  • Node.js

main.go
package main

import (
	"context"
	"encoding/base64"
	"log"
	"regexp"
	"strings"

	"encoding/json"
	"fmt"
	"io/ioutil"
	"net/http"

	"github.com/apache/pulsar-client-go/pulsar"
	"github.com/tidwall/gjson"
)

const (
	TOKEN             = "YOUR PULSAR TOKEN"
	PULSAR_BROKER     = "pulsar+ssl://pulsar-aws-useast2.streaming.datastax.com:6651"
	PULSAR_WEB        = "https://pulsar-aws-useast2.api.streaming.datastax.com"
	TOPIC_NAME        = "njcdcawsuseast2/astracdc/data-6ee78bd3-78af-4ddd-be73-093f38d094bd-ks1.tbl1"
	SUBSCRIPTION_NAME = "my-subscription22"
)

func main() {

	log.SetFlags(log.LstdFlags | log.Lmicroseconds)

	log.Println("Astra Streaming CDC Consumer")

	// Get the Astra CDC Topic Schema From the Schema registry
	keyavroSchema, valueavroSchema := getSchema()

	// Configuration variables pertaining to this consumer

	token := pulsar.NewAuthenticationToken(TOKEN)

	// Pulsar client
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:            PULSAR_BROKER,
		Authentication: token,
	})

	if err != nil {
		log.Fatal(err)
	}

	defer client.Close()

	consumerOptions := pulsar.ConsumerOptions{
		Topic:                       "persistent://" + TOPIC_NAME,
		SubscriptionName:            SUBSCRIPTION_NAME,
		SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
	}

	consumer, err := client.Subscribe(consumerOptions)

	if err != nil {
		log.Fatal(err)
	}

	defer consumer.Close()

	ctx := context.Background()

	var keyMap map[string]interface{}
	var valueMap map[string]interface{}

	// infinite loop to receive messages
	for {

		msg, err := consumer.Receive(ctx)

		//Key is Base64 Encoded, so it needs be decoded
		keyAsBytes, err := base64.StdEncoding.DecodeString(msg.Key())

		//Use the KeyAvroSchema to decode the key
		err = keyavroSchema.Decode(keyAsBytes, &keyMap)

		//Use the ValueAvroSchema to decode the value
		err = valueavroSchema.Decode(msg.Payload(), &valueMap)

		if err != nil {
			log.Fatal(err)
		} else {
			log.Printf("Received key: %v message : %v", keyMap, valueMap)
		}

		consumer.Ack(msg)
	}

}

type PulsarSchema struct {
	Version    int               `json:"version"`
	Type       string            `json:"type"`
	Timestamp  int64             `json:"timestamp"`
	Properties map[string]string `json:"properties"`
	Data       string            `json:"data"`
}

func getSchema() (*pulsar.AvroSchema, *pulsar.AvroSchema) {

	url := fmt.Sprintf("%s/admin/v2/schemas/%s/schema", PULSAR_WEB, TOPIC_NAME)

	method := "GET"
	client := &http.Client{}
	req, err := http.NewRequest(method, url, nil)

	if err != nil {
		log.Fatal(err)
		return nil, nil
	}
	req.Header.Add("Authorization", "Bearer "+TOKEN)

	res, err := client.Do(req)
	if err != nil {
		log.Fatal(err)
		return nil, nil
	}
	defer res.Body.Close()

	body, err := ioutil.ReadAll(res.Body)
	if err != nil {
		log.Fatal(err)
		return nil, nil
	}
	jsonString := (string(body))

	//This isn't great
	//the data part of the json has extra back slashes
	jsonString = strings.Replace(jsonString, "\\\\", "", -1)

	var schemaResponse PulsarSchema

	json.Unmarshal([]byte(jsonString), &schemaResponse)

	keySchema := gjson.Get(schemaResponse.Data, "key").String()
	log.Printf(keySchema)
	valueSchema := gjson.Get(schemaResponse.Data, "value").String()
	log.Printf(keySchema)

	//the namespaces start with numbers and AVRO doesn't like it
	//so strip them out for now
	var re = regexp.MustCompile(`\"namespace\":\"[[:alnum:]]*_`)
	keySchema = re.ReplaceAllString(keySchema, "\"namespace\":\"")
	log.Printf(keySchema)
	keyavroSchema := pulsar.NewAvroSchema(keySchema, nil)
	valueSchema = re.ReplaceAllString(valueSchema, "\"namespace\":\"")
	log.Printf(valueSchema)
	valueavroSchema := pulsar.NewAvroSchema(valueSchema, nil)

	return keyavroSchema, valueavroSchema
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.datastax.astrastreaming.javaexamples</groupId>
  <artifactId>astra-streaming-java-examples</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>astra-streaming-java-examples</name>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <pulsar.version>2.8.0</pulsar.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-client</artifactId>
      <version>${pulsar.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-functions-api</artifactId>
      <version>${pulsar.version}</version>
    </dependency>

    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.13.2.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.10.2</version>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>
CDCConsumer.java
package com.datastax.astrastreaming.javaexamples.consumers;

import org.apache.pulsar.client.api.*;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.schema.KeyValue;

public class CDCConsumer {

        private static final String SERVICE_URL = "pulsar+ssl://pulsar-aws-useast2.streaming.datastax.com:6651";
        private static final String TOPIC_NAME = "njcdcawsuseast2/astracdc/data-6ee78bd3-78af-4ddd-be73-093f38d094bd-ks1.tbl1";
        private static final String PULSAR_TOKEN = "YOUR PULSAR TOKEN";
        private static final String SUBSCRIPTION_NAME = "my-subscription";


        public static void main(String[] args) throws IOException
        {

            // Create client object
            PulsarClient client = PulsarClient.builder()
                    .serviceUrl(SERVICE_URL)
                    .authentication(
                        AuthenticationFactory.token(PULSAR_TOKEN)
                    )
                    .build();

            // Create consumer on a topic with a subscription
            Consumer<GenericRecord> consumer = client.newConsumer(Schema.AUTO_CONSUME())
                    .topic(TOPIC_NAME)
                    .subscriptionName(SUBSCRIPTION_NAME)
                    .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                    .subscribe();

            boolean receivedMsg = false;
            // Loop until a message is received
            do {
                // Block for up to 1 second for a message
                Message<GenericRecord> msg = consumer.receive(1, TimeUnit.SECONDS);

                if(msg != null){

                    GenericRecord input = msg.getValue();
                    //CDC Uses KeyValue Schema
                    KeyValue<GenericRecord, GenericRecord> keyValue = (KeyValue<GenericRecord, GenericRecord>) input.getNativeObject();

                    GenericRecord keyGenRecord = keyValue.getKey();
                    displayGenericRecordFields("key",keyGenRecord);

                    GenericRecord valGenRecord = keyValue.getValue();
                    displayGenericRecordFields("value",valGenRecord);

                    // Acknowledge the message to remove it from the message backlog
                    consumer.acknowledge(msg);

                    receivedMsg = true;
                }

            } while (!receivedMsg);

            //Close the consumer
            consumer.close();

            // Close the client
            client.close();

        }

        private static void displayGenericRecordFields(String recordName,GenericRecord genericRecord) {
            System.out.printf("---Fields in %s: ", recordName);
            genericRecord.getFields().stream().forEach((fieldtmp) ->
                System.out.printf("%s=%s",fieldtmp.getName(),genericRecord.getField(fieldtmp) ));
            System.out.println(" ---");
        }

}
requirements.txt
avro==1.11.0
pulsar==2.0.2
pulsar_client==2.10.0
cdc_consumer.py
import base64
import io
import json
import re
import time
from urllib.request import Request, urlopen

import avro.schema
import pulsar
from avro.io import BinaryDecoder, DatumReader

import logging

logging.basicConfig(
    format='%(asctime)s.%(msecs)05d %(levelname)-8s %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S')

service_url = "pulsar+ssl://pulsar-aws-useast2.streaming.datastax.com:6651"
admin_url = "https://pulsar-aws-useast2.api.streaming.datastax.com"
token = "YOUR PULSAR TOKEN"
topic_name = "njcdcawsuseast2/astracdc/data-6ee78bd3-78af-4ddd-be73-093f38d094bd-ks1.tbl1"
subscription_name = "my-subscription22"


def http_get(url):
    req = Request(url)
    req.add_header("Accept", "application/json")
    req.add_header("Authorization", "Bearer " + token)
    return urlopen(req).read()


def getSchema():
    schema_url = "%s/admin/v2/schemas/%s/schema" % (admin_url, topic_name)
    topic_schema = http_get(schema_url).decode("utf-8")
    # This isn't great
    # the data part of the json has extra back slashes
    topic_schema = topic_schema.replace("\\", "")
    topic_schema = topic_schema.replace('data":"', 'data":')
    topic_schema = topic_schema.replace('}","properties', '},"properties')

    logging.info("Topic'{}' Schema='{}'".format(topic_name, topic_schema))

    schema_json = json.loads(topic_schema)

    data_schema = schema_json["data"]

    keyschema_json = data_schema["key"]
    valueschema_json = data_schema["value"]

    # the namespaces start with numbers and AVRO doesn't like it
    # so strip them out for now
    key_namespace = keyschema_json["namespace"]
    key_namespace = re.sub("\d.*_", "", key_namespace)
    keyschema_json["namespace"] = key_namespace

    value_namespace = valueschema_json["namespace"]
    value_namespace = re.sub("\d.*_", "", value_namespace)
    valueschema_json["namespace"] = value_namespace

    keyAvroSchema = avro.schema.Parse(json.dumps(keyschema_json))
    valueAvroSchema = avro.schema.Parse(json.dumps(valueschema_json))

    return keyAvroSchema, valueAvroSchema


keyAvroSchema, valueAvroSchema = getSchema()

keyAvroReader = DatumReader(keyAvroSchema)
valueAvroReader = DatumReader(valueAvroSchema)




waitingForMsg = True
while waitingForMsg:
    try:
        msg = consumer.receive()

        # The PartitionKey is Base64 Encoded, so it needs to be decoded
        msgKey = msg.partition_key()
        msgKey_decoded = base64.b64decode(msgKey)

        messageKey_bytes = io.BytesIO(msgKey_decoded)
        keydecoder = BinaryDecoder(messageKey_bytes)
        msgKey = keyAvroReader.read(keydecoder)

        message_bytes = io.BytesIO(msg.data())
        decoder = BinaryDecoder(message_bytes)
        msgvalue = valueAvroReader.read(decoder)

        logging.info("Received message key='{}' value='{}'".format(msgKey, msgvalue))

        # logging.info("Received message")

        # Acknowledging the message to remove from message backlog
        consumer.acknowledge(msg)

        # waitingForMsg = False
    except:
        logging.info("Still waiting for a message...")



client.close()
dotnet-pulsar-astracdc.csproj
<Project Sdk="Microsoft.NET.Sdk">

  <PropertyGroup>
    <OutputType>Exe</OutputType>
    <TargetFramework>net6.0</TargetFramework>
    <RootNamespace>dotnet_pulsar_astracdc</RootNamespace>
    <ImplicitUsings>enable</ImplicitUsings>
    <Nullable>enable</Nullable>
  </PropertyGroup>

  <ItemGroup>
    <PackageReference Include="Pulsar.Client" Version="2.10.3" />
  </ItemGroup>

</Project>
Program.cs
// See https://aka.ms/new-console-template for more information

using System.Net.Http.Headers;
using System.Net.Http.Json;
using System.Text.Json;
using Pulsar.Client.Api;
using Pulsar.Client.Common;

const string SERVICE_URL = "pulsar+ssl://pulsar-gcp-uscentral1.streaming.datastax.com:6651";
const string ADMIN_URL = "https://pulsar-gcp-uscentral1.api.streaming.datastax.com";
const string TOPIC_NAME = "<my-super-cool-tenant>/astracdc/data-6109b1d9-0217-4783-809e-ed19d1299cbf-xxxxxx.yyyyyyy";
const string PULSAR_TOKEN = "YOUR PULSAR TOKEN";
const string SUBSCRIPTION_NAME = "my-subscription";

//Retrieve the schemas
var (keyAvroSchema, valueAvroSchema) = await GetSchema($"{ADMIN_URL}/admin/v2/schemas/{TOPIC_NAME}/schema");

//Build the avro readers with schema info
var keyAvroReader = new Avro.Generic.GenericDatumReader<Avro.Generic.GenericRecord>(keyAvroSchema, keyAvroSchema);
var valueAvroReader = new Avro.Generic.GenericDatumReader<Avro.Generic.GenericRecord>(valueAvroSchema, valueAvroSchema);

/*
 *
 * NOTE: We are using the F# Pulsar client below, but you could easily switch to the DotPulsar client. This is because the client isn't doing any decoding.
 *   It's simply listening for a (binary) message and making the bytes available. All the decoding work is being done by the Avro libs.
 */

// Create client object
var client = await new PulsarClientBuilder()
	.ServiceUrl(SERVICE_URL)
	.Authentication(
		AuthenticationFactory.Token(PULSAR_TOKEN)
	)
	.BuildAsync();

// Create consumer on a topic with a subscription
var consumer = await client.NewConsumer()
	.Topic(TOPIC_NAME)
	.SubscriptionName(SUBSCRIPTION_NAME)
	.SubscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
	.SubscribeAsync();

var receivedMsg = false;

while (!receivedMsg)
{
	var msg = await consumer.ReceiveAsync();

	if (msg is null)
	{
		continue;
	}

	//The Key is Base64 Encoded, so it needs to be decoded
	var msgKeyDecoded = Convert.FromBase64String(msg.Key);

	// Stream Pulsar's binary bytes to an avro decoder and load into a reader
	using (var keyBytes = new MemoryStream(msgKeyDecoded))
	{
		var keyDecoder = new Avro.IO.BinaryDecoder(keyBytes);
		var messageKey = keyAvroReader.Read(null!, keyDecoder);
		Console.WriteLine(@messageKey);
	}

	// Stream Pulsar's binary bytes to an avro decoder and load into a reader
	using (var valueBytes = new MemoryStream(msg.Data))
	{
		var valueDecoder = new Avro.IO.BinaryDecoder(valueBytes);
		var messageValue = valueAvroReader.Read(null!, valueDecoder);
		Console.WriteLine(@messageValue);
	}

	// Acknowledge the message to remove it from the message backlog
	await consumer.AcknowledgeAsync(msg.MessageId);

	receivedMsg = true;
}

//Close the consumer
await consumer.DisposeAsync();

// Close the client
await client.CloseAsync();

async Task<(Avro.Schema, Avro.Schema)> GetSchema(string schemaUrl)
{
	using HttpClient httpClient = new();
	httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue($"Bearer",$"{PULSAR_TOKEN}");
	using var response = await httpClient.GetAsync(schemaUrl);

	// Throw an exception if request was unsuccessful
	response.EnsureSuccessStatusCode();

	// Parse the response
	var responseBody = await response.Content.ReadFromJsonAsync<JsonElement>();
	var responseData = responseBody.GetProperty("data").GetString() ?? throw new Exception("Could not find a data property in json response");

	// The contents of the data element in the response is a string, so we deserialize it as an object
	var dataElement = JsonSerializer.Deserialize<JsonElement>(responseData);

	var keySchema = dataElement.GetProperty("key");
	var valueSchema = dataElement.GetProperty("value");

	try
	{
		var keyAvroSchema = Avro.Schema.Parse(keySchema.GetRawText());
		var valueAvroSchema = Avro.Schema.Parse(valueSchema.GetRawText());

		return (keyAvroSchema, valueAvroSchema);
	}
	catch (Exception)
	{
		Console.WriteLine($"An error occurred trying to parse json schemas to avro");
		throw;
	}
}
package.json
{
  "dependencies": {
    "avro-js": "^1.11.0",
    "pulsar-client": "^1.6.2"
  }
}
consumer.js
const Pulsar = require("pulsar-client");
const https = require('https');
const avro = require('avro-js');


(async () => {
  // Token based on authentication
  const tokenStr =
    " YOUR PULSAR TOKEN ";
  const pulsarUri =
    "pulsar+ssl://pulsar-aws-useast2.streaming.datastax.com:6651";
  const adminUrl = "https://pulsar-aws-useast2.api.streaming.datastax.com";
  const topicName =
    "njcdcawsuseast2/astracdc/data-6ee78bd3-78af-4ddd-be73-093f38d094bd-ks1.tbl1";
  const subscriptionName = "my-subscription21";


  // CentOS RHEL:
//   const trustStore = "/etc/ssl/certs/ca-bundle.crt";
  // Debian Ubuntu:
  // const trustStore = '/etc/ssl/certs/ca-certificates.crt'
//   const trustStore = "/System/Library/Keychains/SystemCACertificates.keychain";

  const auth = new Pulsar.AuthenticationToken({ token: tokenStr });

  let url = adminUrl + "/admin/v2/schemas/" + topicName + "/schema"
  var options = {
      'headers' : {
        'Authorization': 'Bearer ' + tokenStr
      }
  }

  var topicSchema;
  var keyAvroSchema;
  var valAvroSchema;



  https.get(url,options, (res) => {
    var chunks = [];

    res.on("data", function (chunk) {
      chunks.push(chunk);
    });

    res.on("end", function (chunk) {
      var body = Buffer.concat(chunks);
      var bodyString = body.toString()
      console.log(bodyString);
      var replace = bodyString.replace(/\\/g,'');

      replace = replace.replace(/"{/g,'{');
      replace = replace.replace(/}"/g,'}');
      console.log(replace);
      topicSchema = JSON.parse(replace);

      var keySchema = topicSchema.data.key;
      var ns = keySchema.namespace.replace(/\d.*_/,'');
      keySchema.namespace=ns;
      keyAvroSchema = avro.parse(keySchema);

      var valSchema = topicSchema.data.value;
      var valns = valSchema.namespace.replace(/\d.*_/,'');
      valSchema.namespace=valns;
      valAvroSchema = avro.parse(valSchema);



    });

    res.on("error", function (error) {
      console.error(error);
    });
  });

  // Create a client
  const client = new Pulsar.Client({
    serviceUrl: pulsarUri,
    authentication: auth,
    // tlsTrustCertsFilePath: trustStore,
    operationTimeoutSeconds: 30,
  });

  // Create consumer
  const consumer = await client.subscribe({
    topic: "persistent://"+topicName,
    subscription: subscriptionName,
    subscriptionType: "KeyShared",
    subscriptionInitialPosition: "Earliest",
    ackTimeoutMs: 10000,
  });

  // Receive and acknowledge messages
  for (let i = 0; i < 100; i += 1) {
    const msg = await consumer.receive();

    // The PartitionKey is Base64 Encoded, so it needs to be decoded
    let partitionKey = msg.getPartitionKey()
    let buff = new Buffer(partitionKey,'base64')
    let decodedPartitionKey = buff.toString('ascii')
    console.log(decodedPartitionKey);

    let decodedKey = keyAvroSchema.fromBuffer(buff)
    console.log(decodedKey);

    let decodedVal = valAvroSchema.fromBuffer(msg.getData())
    console.log(decodedVal);

    consumer.acknowledge(msg);
  }

  await consumer.close();
  await client.close();
})();

Pulsar functions

It is very common to have a function consuming the CDC data. Functions usually perform additional processing on the data and pass it to another topic. Similar to a client consumer, it needs to deserialize the message data.

The following example functions consume messages from the CDC data topic.

  • Go

  • Java

  • Python

main.go
package main

import (
	"context"
	"encoding/base64"
	"log"
	"regexp"
	"strings"

	"encoding/json"
	"fmt"
	"io/ioutil"
	"net/http"

	"github.com/apache/pulsar-client-go/pulsar"
	"github.com/tidwall/gjson"
)

const (
	TOKEN             = "YOUR PULSAR TOKEN"
	PULSAR_BROKER     = "pulsar+ssl://pulsar-aws-useast2.streaming.datastax.com:6651"
	PULSAR_WEB        = "https://pulsar-aws-useast2.api.streaming.datastax.com"
	TOPIC_NAME        = "njcdcawsuseast2/astracdc/data-6ee78bd3-78af-4ddd-be73-093f38d094bd-ks1.tbl1"
	SUBSCRIPTION_NAME = "my-subscription22"
)

func main() {

	log.SetFlags(log.LstdFlags | log.Lmicroseconds)

	log.Println("Astra Streaming CDC Consumer")

	// Get the Astra CDC Topic Schema From the Schema registry
	keyavroSchema, valueavroSchema := getSchema()

	// Configuration variables pertaining to this consumer

	token := pulsar.NewAuthenticationToken(TOKEN)

	// Pulsar client
	client, err := pulsar.NewClient(pulsar.ClientOptions{
		URL:            PULSAR_BROKER,
		Authentication: token,
	})

	if err != nil {
		log.Fatal(err)
	}

	defer client.Close()

	consumerOptions := pulsar.ConsumerOptions{
		Topic:                       "persistent://" + TOPIC_NAME,
		SubscriptionName:            SUBSCRIPTION_NAME,
		SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
	}

	consumer, err := client.Subscribe(consumerOptions)

	if err != nil {
		log.Fatal(err)
	}

	defer consumer.Close()

	ctx := context.Background()

	var keyMap map[string]interface{}
	var valueMap map[string]interface{}

	// infinite loop to receive messages
	for {

		msg, err := consumer.Receive(ctx)

		//Key is Base64 Encoded, so it needs be decoded
		keyAsBytes, err := base64.StdEncoding.DecodeString(msg.Key())

		//Use the KeyAvroSchema to decode the key
		err = keyavroSchema.Decode(keyAsBytes, &keyMap)

		//Use the ValueAvroSchema to decode the value
		err = valueavroSchema.Decode(msg.Payload(), &valueMap)

		if err != nil {
			log.Fatal(err)
		} else {
			log.Printf("Received key: %v message : %v", keyMap, valueMap)
		}

		consumer.Ack(msg)
	}

}

type PulsarSchema struct {
	Version    int               `json:"version"`
	Type       string            `json:"type"`
	Timestamp  int64             `json:"timestamp"`
	Properties map[string]string `json:"properties"`
	Data       string            `json:"data"`
}

func getSchema() (*pulsar.AvroSchema, *pulsar.AvroSchema) {

	url := fmt.Sprintf("%s/admin/v2/schemas/%s/schema", PULSAR_WEB, TOPIC_NAME)

	method := "GET"
	client := &http.Client{}
	req, err := http.NewRequest(method, url, nil)

	if err != nil {
		log.Fatal(err)
		return nil, nil
	}
	req.Header.Add("Authorization", "Bearer "+TOKEN)

	res, err := client.Do(req)
	if err != nil {
		log.Fatal(err)
		return nil, nil
	}
	defer res.Body.Close()

	body, err := ioutil.ReadAll(res.Body)
	if err != nil {
		log.Fatal(err)
		return nil, nil
	}
	jsonString := (string(body))

	//This isn't great
	//the data part of the json has extra back slashes
	jsonString = strings.Replace(jsonString, "\\\\", "", -1)

	var schemaResponse PulsarSchema

	json.Unmarshal([]byte(jsonString), &schemaResponse)

	keySchema := gjson.Get(schemaResponse.Data, "key").String()
	log.Printf(keySchema)
	valueSchema := gjson.Get(schemaResponse.Data, "value").String()
	log.Printf(keySchema)

	//the namespaces start with numbers and AVRO doesn't like it
	//so strip them out for now
	var re = regexp.MustCompile(`\"namespace\":\"[[:alnum:]]*_`)
	keySchema = re.ReplaceAllString(keySchema, "\"namespace\":\"")
	log.Printf(keySchema)
	keyavroSchema := pulsar.NewAvroSchema(keySchema, nil)
	valueSchema = re.ReplaceAllString(valueSchema, "\"namespace\":\"")
	log.Printf(valueSchema)
	valueavroSchema := pulsar.NewAvroSchema(valueSchema, nil)

	return keyavroSchema, valueavroSchema
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.datastax.astrastreaming.javaexamples</groupId>
  <artifactId>astra-streaming-java-examples</artifactId>
  <version>1.0-SNAPSHOT</version>

  <name>astra-streaming-java-examples</name>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <pulsar.version>2.8.0</pulsar.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-client</artifactId>
      <version>${pulsar.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.pulsar</groupId>
      <artifactId>pulsar-functions-api</artifactId>
      <version>${pulsar.version}</version>
    </dependency>

    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.13.2.1</version>
    </dependency>

    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>1.10.2</version>
    </dependency>
  </dependencies>

  <build>
    <pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
      <plugins>
        <!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
        <plugin>
          <artifactId>maven-clean-plugin</artifactId>
          <version>3.1.0</version>
        </plugin>
        <!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
        <plugin>
          <artifactId>maven-resources-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-compiler-plugin</artifactId>
          <version>3.8.0</version>
        </plugin>
        <plugin>
          <artifactId>maven-surefire-plugin</artifactId>
          <version>2.22.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-jar-plugin</artifactId>
          <version>3.0.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-install-plugin</artifactId>
          <version>2.5.2</version>
        </plugin>
        <plugin>
          <artifactId>maven-deploy-plugin</artifactId>
          <version>2.8.2</version>
        </plugin>
        <!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
        <plugin>
          <artifactId>maven-site-plugin</artifactId>
          <version>3.7.1</version>
        </plugin>
        <plugin>
          <artifactId>maven-project-info-reports-plugin</artifactId>
          <version>3.0.0</version>
        </plugin>
      </plugins>
    </pluginManagement>
  </build>
</project>
/utils/JsonConverter.java
package com.datastax.astrastreaming.javaexamples.utils;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;

import java.util.*;


/**
 * Convert an AVRO GenericRecord to a JsonNode.
 */
public class JsonConverter {

    private static Map<String, LogicalTypeConverter> logicalTypeConverters = new HashMap<>();
    private static final JsonNodeFactory jsonNodeFactory = JsonNodeFactory.withExactBigDecimals(true);

    public static JsonNode topLevelMerge(JsonNode n1, JsonNode n2) {
        ObjectNode objectNode = jsonNodeFactory.objectNode();
        n1.fieldNames().forEachRemaining(f -> objectNode.put(f, n1.get(f)));
        n2.fieldNames().forEachRemaining(f -> objectNode.put(f, n2.get(f)));
        return objectNode;
    }

    public static ObjectNode toJson(GenericRecord genericRecord) {
        if (genericRecord == null) {
            return null;
        }
        ObjectNode objectNode = jsonNodeFactory.objectNode();
        for(Schema.Field field: genericRecord.getSchema().getFields()) {
            objectNode.set(field.name(), toJson(field.schema(), genericRecord.get(field.name())));
        }
        return objectNode;
    }

    public static JsonNode toJson(Schema schema, Object value) {
        if (value == null) {
            return jsonNodeFactory.nullNode();
        }
        if (schema.getLogicalType() != null) {
            if (logicalTypeConverters.containsKey(schema.getLogicalType().getName())) {
                return logicalTypeConverters.get(schema.getLogicalType().getName()).toJson(schema, value);
            }
        }
        try {
            switch (schema.getType()) {
                case NULL: // this should not happen
                    return jsonNodeFactory.nullNode();
                case INT:
                    return jsonNodeFactory.numberNode((Integer) value);
                case LONG:
                    return jsonNodeFactory.numberNode((Long) value);
                case DOUBLE:
                    return jsonNodeFactory.numberNode((Double) value);
                case FLOAT:
                    return jsonNodeFactory.numberNode((Float) value);
                case BOOLEAN:
                    return jsonNodeFactory.booleanNode((Boolean) value);
                case BYTES:
                    return jsonNodeFactory.binaryNode((byte[]) value);
                case FIXED:
                    return jsonNodeFactory.binaryNode(((GenericFixed) value).bytes());
                case ARRAY: {
                    Schema elementSchema = schema.getElementType();
                    ArrayNode arrayNode = jsonNodeFactory.arrayNode();
                    Object[] iterable;
                    if (value instanceof GenericData.Array) {
                        iterable = ((GenericData.Array) value).toArray();
                    } else {
                        iterable = (Object[]) value;
                    }
                    for (Object elem : iterable) {
                        JsonNode fieldValue = toJson(elementSchema, elem);
                        arrayNode.add(fieldValue);
                    }
                    return arrayNode;
                }
                case MAP: {
                    Map<Object, Object> map = (Map<Object, Object>) value;
                    ObjectNode objectNode = jsonNodeFactory.objectNode();
                    for (Map.Entry<Object, Object> entry : map.entrySet()) {
                        JsonNode jsonNode = toJson(schema.getValueType(), entry.getValue());
                        // can be a String or org.apache.avro.util.Utf8
                        final String entryKey = entry.getKey() == null ? null : entry.getKey().toString();
                        objectNode.set(entryKey, jsonNode);
                    }
                    return objectNode;
                }
                case RECORD:
                    return toJson( (GenericRecord) value);
                case UNION:
                    for (Schema s : schema.getTypes()) {
                        if (s.getType() == Schema.Type.NULL)
                            continue;
                        return toJson( s, value);
                    }
                    // this case should not happen
                    return jsonNodeFactory.textNode(value.toString());
                case ENUM: // GenericEnumSymbol
                case STRING:  // can be a String or org.apache.avro.util.Utf8
                    return jsonNodeFactory.textNode(value.toString());
                default:
                    return jsonNodeFactory.nullNode();

            }
        } catch (ClassCastException error) {
            throw new IllegalArgumentException("Error while converting a value of type " + value.getClass() + " to a " + schema.getType()
                    + ": " + error, error);
        }
    }

    abstract static class LogicalTypeConverter {
        abstract JsonNode toJson(Schema schema, Object value);
    }

    private static void checkType(Object value, String name, Class expected) {
        if (value == null) {
            throw new IllegalArgumentException("Invalid type for " + name + ", expected " + expected.getName() + " but was NULL");
        }
        if (!expected.isInstance(value)) {
            throw new IllegalArgumentException("Invalid type for " + name + ", expected " + expected.getName() + " but was " + value.getClass());
        }
    }

    static {
        logicalTypeConverters.put("date", new JsonConverter.LogicalTypeConverter() {
            @Override
            JsonNode toJson(Schema schema, Object value) {
                checkType(value, "date", Integer.class);
                Integer daysFromEpoch = (Integer)value;
                return jsonNodeFactory.numberNode(daysFromEpoch);
            }
        });
        logicalTypeConverters.put("time-millis", new JsonConverter.LogicalTypeConverter() {
            @Override
            JsonNode toJson(Schema schema, Object value) {
                checkType(value, "time-millis", Integer.class);
                Integer timeMillis = (Integer)value;
                return jsonNodeFactory.numberNode(timeMillis);
            }
        });
        logicalTypeConverters.put("time-micros", new JsonConverter.LogicalTypeConverter() {
            @Override
            JsonNode toJson(Schema schema, Object value) {
                checkType(value, "time-micros", Long.class);
                Long timeMicro = (Long)value;
                return jsonNodeFactory.numberNode(timeMicro);
            }
        });
        logicalTypeConverters.put("timestamp-millis", new JsonConverter.LogicalTypeConverter() {
            @Override
            JsonNode toJson(Schema schema, Object value) {
                checkType(value, "timestamp-millis", Long.class);
                Long epochMillis = (Long)value;
                return jsonNodeFactory.numberNode(epochMillis);
            }
        });
        logicalTypeConverters.put("timestamp-micros", new JsonConverter.LogicalTypeConverter() {
            @Override
            JsonNode toJson(Schema schema, Object value) {
                checkType(value, "timestamp-micros", Long.class);
                Long epochMillis = (Long)value;
                return jsonNodeFactory.numberNode(epochMillis);
            }
        });
        logicalTypeConverters.put("uuid", new JsonConverter.LogicalTypeConverter() {
            @Override
            JsonNode toJson(Schema schema, Object value) {
                return jsonNodeFactory.textNode(value == null ? null : value.toString());
            }
        });
    }

    public static ArrayNode toJsonArray(JsonNode jsonNode, List<String> fields) {
        ArrayNode arrayNode = jsonNodeFactory.arrayNode();
        Iterator<String> it = jsonNode.fieldNames();
        while (it.hasNext()) {
            String fieldName = it.next();
            if (fields.contains(fieldName)) {
                arrayNode.add(jsonNode.get(fieldName));
            }
        }
        return arrayNode;
    }
}
CDCFunction.java
package com.datastax.astrastreaming.javaexamples.functions;


import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.common.schema.KeyValue;
import com.datastax.astrastreaming.javaexamples.utils.JsonConverter;;


/**
 * Function that Consumes a CDC data topic and ouputs it as a json string
 */
public class CDCFunction implements Function<GenericObject, String> {



    @Override
    public String process(GenericObject input, Context context) {
        try {
            KeyValue<GenericRecord, GenericRecord> keyValue = (KeyValue<GenericRecord, GenericRecord>) input.getNativeObject();

            GenericRecord keyGenObject = keyValue.getKey();
            GenericRecord valGenObject = keyValue.getValue();


            ObjectNode keyNode = JsonConverter.toJson((org.apache.avro.generic.GenericRecord) keyGenObject.getNativeObject());
            ObjectNode valNode = JsonConverter.toJson((org.apache.avro.generic.GenericRecord) valGenObject.getNativeObject());

            keyNode.setAll(valNode);

            String json = keyNode.toPrettyString();
            return json;
        } catch (Throwable t) {
            throw new RuntimeException(t);
        }
    }


}

The following function decodes the content of CDC events as plain JSON. For the full context of this example, see the Decoding CDC Astra Streaming functions repository

deschemaer.py
import base64
import json
import io
import avro.schema
from avro.io import BinaryDecoder, DatumReader
#
from pulsar import Function


## helpers
keySchemaDict = {
    "type": "record",
    "name": "reviews",
    "fields": [
        {
            "name": "hotel",
            "type": "string",
        },
        {
            "name": "id",
            "type": [
                "null",
                {
                    "type": "string",
                    "logicalType": "uuid",
                },
            ],
            "default": None,
        },
    ],
}

valueSchemaDict = {
    "type": "record",
    "name": "reviews",
    "fields": [
        {
            "name": "body",
            "type": [
                "null",
                "string"
            ],
            "default": None
        },
        {
            "name": "reviewer",
            "type": [
                "null",
                "string"
            ],
            "default": None
        },
        {
            "name": "is_valid",
            "type": [
                "null",
                "boolean"
            ],
            "default": None
        },
        {
            "name": "score",
            "type": [
                "null",
                "int"
            ],
            "default": None
        }
    ]
}


def createAvroReader(schemaDict):
    return DatumReader(avro.schema.make_avsc_object(schemaDict))


def bytesToReadDict(by, avroReader):
    binDecoded = BinaryDecoder(io.BytesIO(by))
    return avroReader.read(binDecoded)


def b64ToReadDict(b64string, avroReader):
    b64Decoded = base64.b64decode(b64string)
    return bytesToReadDict(b64Decoded, avroReader)


def cdcMessageToDictPF(pk, body, keyReader, valueReader):
    # Body can be a 'str' or 'bytes' already depending on the length
    # of the input fields in the table insert.
    #       ¯\_(ツ)_/¯
    # Take care of this:
    encodedBody = body if isinstance(body, bytes) else body.encode()
    #
    return {
        **bytesToReadDict(base64.b64decode(pk), keyReader),
        **bytesToReadDict(encodedBody, valueReader),
    }


class Deschemaer(Function):

    def __init__(self):
        self.keyReader = createAvroReader(keySchemaDict)
        self.valueReader = createAvroReader(valueSchemaDict)

    def process(self, msgBody, context):
        msgPK = context.get_partition_key()
        msgDict = cdcMessageToDictPF(msgPK, msgBody, self.keyReader, self.valueReader)
        return json.dumps(msgDict)

Was this helpful?

Give Feedback

How can we improve the documentation?

© Copyright IBM Corporation 2026 | Privacy policy | Terms of use Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: Contact IBM