Kafka

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Astra Streaming currently supports Apache Pulsar 2.10, which uses the Kafka 2.7.2 library to interact with Kafka.

Get Started

Set the required variables using any of the methods below.

export TENANT=<replace-me>
export INPUT_TOPIC=<replace-me>
export NAMESPACE=default
export SINK_NAME=kafka-sink
  • Pulsar Admin

  • cURL

  • Sample Config Data

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

./bin/pulsar-admin sinks create \
  --sink-type kafka \
  --name "$SINK_NAME" \
  --inputs "persistent://$TENANT/$NAMESPACE/$INPUT_TOPIC" \
  --tenant "$TENANT" \
  --sink-config '{
      "bootstrapServers": "localhost:6667",
      "topic": "test",
      "acks": "1",
      "batchSize": "16384",
      "maxRequestSize": "1048576",
      "producerConfigProperties": {
         "client.id": "test-pulsar-producer",
         "security.protocol": "SASL_SSL",
         "sasl.mechanism": "PLAIN",
         "sasl.kerberos.service.name": "kafka",
         "sasl.username": "cccccc",
         "sasl.password": "cccccc",
         "acks": "all"
      }
    }'

You’ll need to create an Astra Streaming API token to be used with the REST API. This is different from your Astra tokens.

Navigate to the "Settings" area in the Astra Streaming UI and choose "Create Token".

Retrieve the web service URL from the "Connect" tab in the Astra Streaming UI.

Refer to the complete Pulsar sinks REST API spec for all available options.

export WEB_SERVICE_URL=<replace-me>
export ASTRA_STREAMING_TOKEN=<replace-me>
curl -sS --fail --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'?opt=poweruser' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN" \
  --form 'sinkConfig="{
    \"archive\":\"builtin:\/\/kafka\",
    \"tenant\":\"'$TENANT'\",
    \"namespace\":\"'$NAMESPACE'\",
    \"name\":\"'$SINK_NAME'\",
    \"parallelism\": 1,
    \"inputs\":[\"'$TENANT'\/'$NAMESPACE'\/'$INPUT_TOPIC'\"],
    \"configs\":{
      \"bootstrapServers\": \"localhost:6667\",
      \"topic\": \"test\",
      \"acks\": \"1\",
      \"batchSize\": \"16384\",
      \"maxRequestSize\": \"1048576\",
      \"producerConfigProperties\": {
         \"client.id\": \"test-pulsar-producer\",
         \"security.protocol\": \"SASL_PLAINTEXT\",
         \"sasl.mechanism\": \"GSSAPI\",
         \"sasl.kerberos.service.name\": \"kafka\",
         \"acks\": \"all\"
      }
    }
  }"'
{
  "tenant": "string",
  "namespace": "string",
  "name": "string",
  "className": "string",
  "sourceSubscriptionName": "string",
  "sourceSubscriptionPosition": "Latest",
  "inputs": [
    "string"
  ],
  "topicToSerdeClassName": {
    "property1": "string",
    "property2": "string"
  },
  "topicsPattern": "string",
  "topicToSchemaType": {
    "property1": "string",
    "property2": "string"
  },
  "topicToSchemaProperties": {
    "property1": "string",
    "property2": "string"
  },
  "inputSpecs": {
    "property1": {
      "schemaType": "string",
      "serdeClassName": "string",
      "schemaProperties": {
        "property1": "string",
        "property2": "string"
      },
      "consumerProperties": {
        "property1": "string",
        "property2": "string"
      },
      "receiverQueueSize": 0,
      "cryptoConfig": {
        "cryptoKeyReaderClassName": "string",
        "cryptoKeyReaderConfig": {
          "property1": {},
          "property2": {}
        },
        "encryptionKeys": [
          "string"
        ],
        "producerCryptoFailureAction": "FAIL",
        "consumerCryptoFailureAction": "FAIL"
      },
      "poolMessages": true,
      "regexPattern": true
    },
    "property2": {
      "schemaType": "string",
      "serdeClassName": "string",
      "schemaProperties": {
        "property1": "string",
        "property2": "string"
      },
      "consumerProperties": {
        "property1": "string",
        "property2": "string"
      },
      "receiverQueueSize": 0,
      "cryptoConfig": {
        "cryptoKeyReaderClassName": "string",
        "cryptoKeyReaderConfig": {
          "property1": {},
          "property2": {}
        },
        "encryptionKeys": [
          "string"
        ],
        "producerCryptoFailureAction": "FAIL",
        "consumerCryptoFailureAction": "FAIL"
      },
      "poolMessages": true,
      "regexPattern": true
    }
  },
  "maxMessageRetries": 0,
  "deadLetterTopic": "string",
  "configs": {
    "property1": {},
    "property2": {}
  },
  "secrets": {
    "property1": {},
    "property2": {}
  },
  "parallelism": 0,
  "processingGuarantees": "ATLEAST_ONCE",
  "retainOrdering": true,
  "retainKeyOrdering": true,
  "resources": {
    "cpu": 0,
    "ram": 0,
    "disk": 0
  },
  "autoAck": true,
  "timeoutMs": 0,
  "negativeAckRedeliveryDelayMs": 0,
  "sinkType": "string",
  "archive": "string",
  "cleanupSubscription": true,
  "runtimeFlags": "string",
  "customRuntimeOptions": "string",
  "transformFunction": "string",
  "transformFunctionClassName": "string",
  "transformFunctionConfig": "string"
}

Managing the Connector

Start

  • Pulsar Admin

  • cURL

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Start all instances of a connector
./bin/pulsar-admin sinks start \
  --namespace "$NAMESPACE" \
  --name "$SINK_NAME" \
  --tenant "$TENANT"

# optionally add --instance-id to only start an individual instance

You’ll need to create an Astra Streaming API token to be used with the REST API. This is different from your Astra tokens.

Navigate to the "Settings" area in the Astra Streaming UI and choose "Create Token".

Retrieve the web service URL from the "Connect" tab in the Astra Streaming UI.

Refer to the complete Pulsar sinks REST API spec for all available options.

export WEB_SERVICE_URL=<replace-me>
export ASTRA_STREAMING_TOKEN=<replace-me>
# Start all instances of a connector
curl -sS --fail --location --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/start' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"

# Start an individual instance of a connector
curl -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/start" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"

Stop

  • Pulsar Admin

  • cURL

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Stop all instances of a connector
./bin/pulsar-admin sinks stop \
  --namespace "$NAMESPACE" \
  --name "$SINK_NAME" \
  --tenant "$TENANT"

# optionally add --instance-id to only stop an individual instance

You’ll need to create an Astra Streaming API token to be used with the REST API. This is different from your Astra tokens.

Navigate to the "Settings" area in the Astra Streaming UI and choose "Create Token".

Retrieve the web service URL from the "Connect" tab in the Astra Streaming UI.

Refer to the complete Pulsar sinks REST API spec for all available options.

export WEB_SERVICE_URL=<replace-me>
export ASTRA_STREAMING_TOKEN=<replace-me>
# Stop all instances of a connector
curl -sS --fail --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/stop' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"

# Stop an individual instance of a connector
curl -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/stop" \
  --H "Authorization: $ASTRA_STREAMING_TOKEN"

Restart

  • Pulsar Admin

  • cURL

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Restart all instances of a connector
./bin/pulsar-admin sinks restart \
  --namespace "$NAMESPACE" \
  --name "$SINK_NAME" \
  --tenant "$TENANT"

# optionally add --instance-id to only restart an individual instance

You’ll need to create an Astra Streaming API token to be used with the REST API. This is different from your Astra tokens.

Navigate to the "Settings" area in the Astra Streaming UI and choose "Create Token".

Retrieve the web service URL from the "Connect" tab in the Astra Streaming UI.

Refer to the complete Pulsar sinks REST API spec for all available options.

export WEB_SERVICE_URL=<replace-me>
export ASTRA_STREAMING_TOKEN=<replace-me>
# Restart all instances of a connector
curl -sS --fail --request POST ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/restart' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"

# Restart an individual instance of a connector
curl -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/restart" \
-H "Authorization: $ASTRA_STREAMING_TOKEN"

Update

  • Pulsar Admin

  • cURL

  • Sample Config Data

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

./bin/pulsar-admin sinks update \
  --sink-type kafka \
  --name "$SINK_NAME" \
  --inputs "persistent://$TENANT/$NAMESPACE/$INPUT_TOPIC" \
  --tenant "$TENANT" \
  --parallelism 2

You’ll need to create an Astra Streaming API token to be used with the REST API. This is different from your Astra tokens.

Navigate to the "Settings" area in the Astra Streaming UI and choose "Create Token".

Retrieve the web service URL from the "Connect" tab in the Astra Streaming UI.

Refer to the complete Pulsar sinks REST API spec for all available options.

export WEB_SERVICE_URL=<replace-me>
export ASTRA_STREAMING_TOKEN=<replace-me>
curl -sS --fail --request PUT ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'?opt=poweruser' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN" \
  --form 'sinkConfig="{
    \"archive\":\"builtin:\/\/kafka\",
    \"tenant\":\"'$TENANT'\",
    \"namespace\":\"'$NAMESPACE'\",
    \"name\":\"'$SINK_NAME'\",
    \"parallelism\": 2,
    \"inputs\":[\"'$TENANT'\/'$NAMESPACE'\/'$INPUT_TOPIC'\"]
  }"'
{
  "tenant": "string",
  "namespace": "string",
  "name": "string",
  "className": "string",
  "sourceSubscriptionName": "string",
  "sourceSubscriptionPosition": "Latest",
  "inputs": [
    "string"
  ],
  "topicToSerdeClassName": {
    "property1": "string",
    "property2": "string"
  },
  "topicsPattern": "string",
  "topicToSchemaType": {
    "property1": "string",
    "property2": "string"
  },
  "topicToSchemaProperties": {
    "property1": "string",
    "property2": "string"
  },
  "inputSpecs": {
    "property1": {
      "schemaType": "string",
      "serdeClassName": "string",
      "schemaProperties": {
        "property1": "string",
        "property2": "string"
      },
      "consumerProperties": {
        "property1": "string",
        "property2": "string"
      },
      "receiverQueueSize": 0,
      "cryptoConfig": {
        "cryptoKeyReaderClassName": "string",
        "cryptoKeyReaderConfig": {
          "property1": {},
          "property2": {}
        },
        "encryptionKeys": [
          "string"
        ],
        "producerCryptoFailureAction": "FAIL",
        "consumerCryptoFailureAction": "FAIL"
      },
      "poolMessages": true,
      "regexPattern": true
    },
    "property2": {
      "schemaType": "string",
      "serdeClassName": "string",
      "schemaProperties": {
        "property1": "string",
        "property2": "string"
      },
      "consumerProperties": {
        "property1": "string",
        "property2": "string"
      },
      "receiverQueueSize": 0,
      "cryptoConfig": {
        "cryptoKeyReaderClassName": "string",
        "cryptoKeyReaderConfig": {
          "property1": {},
          "property2": {}
        },
        "encryptionKeys": [
          "string"
        ],
        "producerCryptoFailureAction": "FAIL",
        "consumerCryptoFailureAction": "FAIL"
      },
      "poolMessages": true,
      "regexPattern": true
    }
  },
  "maxMessageRetries": 0,
  "deadLetterTopic": "string",
  "configs": {
    "property1": {},
    "property2": {}
  },
  "secrets": {
    "property1": {},
    "property2": {}
  },
  "parallelism": 0,
  "processingGuarantees": "ATLEAST_ONCE",
  "retainOrdering": true,
  "retainKeyOrdering": true,
  "resources": {
    "cpu": 0,
    "ram": 0,
    "disk": 0
  },
  "autoAck": true,
  "timeoutMs": 0,
  "negativeAckRedeliveryDelayMs": 0,
  "sinkType": "string",
  "archive": "string",
  "cleanupSubscription": true,
  "runtimeFlags": "string",
  "customRuntimeOptions": "string",
  "transformFunction": "string",
  "transformFunctionClassName": "string",
  "transformFunctionConfig": "string"
}

Delete

  • Pulsar Admin

  • cURL

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Delete all instances of a connector
./bin/pulsar-admin sinks delete \
  --namespace "$NAMESPACE" \
  --name "$SINK_NAME" \
  --tenant "$TENANT"

You’ll need to create an Astra Streaming API token to be used with the REST API. This is different from your Astra tokens.

Navigate to the "Settings" area in the Astra Streaming UI and choose "Create Token".

Retrieve the web service URL from the "Connect" tab in the Astra Streaming UI.

Refer to the complete Pulsar sinks REST API spec for all available options.

export WEB_SERVICE_URL=<replace-me>
export ASTRA_STREAMING_TOKEN=<replace-me>
# Delete all instances of a connector
curl -sS --fail --location --request DELETE ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"

Monitoring the Connector

Info

  • Pulsar Admin

  • cURL

  • Response

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Get information about connector
./bin/pulsar-admin sinks get \
  --namespace "$NAMESPACE" \
  --name "$SINK_NAME" \
  --tenant "$TENANT"

You’ll need to create an Astra Streaming API token to be used with the REST API. This is different from your Astra tokens.

Navigate to the "Settings" area in the Astra Streaming UI and choose "Create Token".

Retrieve the web service URL from the "Connect" tab in the Astra Streaming UI.

Refer to the complete Pulsar sinks REST API spec for all available options.

export WEB_SERVICE_URL=<replace-me>
export ASTRA_STREAMING_TOKEN=<replace-me>
curl -sS --fail --location ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"
{
  "tenant": "string",
  "namespace": "string",
  "name": "string",
  "className": "string",
  "sourceSubscriptionName": "string",
  "sourceSubscriptionPosition": "Latest",
  "inputs": [
    "string"
  ],
  "topicToSerdeClassName": {
    "property1": "string",
    "property2": "string"
  },
  "topicsPattern": "string",
  "topicToSchemaType": {
    "property1": "string",
    "property2": "string"
  },
  "topicToSchemaProperties": {
    "property1": "string",
    "property2": "string"
  },
  "inputSpecs": {
    "property1": {
      "schemaType": "string",
      "serdeClassName": "string",
      "schemaProperties": {
        "property1": "string",
        "property2": "string"
      },
      "consumerProperties": {
        "property1": "string",
        "property2": "string"
      },
      "receiverQueueSize": 0,
      "cryptoConfig": {
        "cryptoKeyReaderClassName": "string",
        "cryptoKeyReaderConfig": {
          "property1": {},
          "property2": {}
        },
        "encryptionKeys": [
          "string"
        ],
        "producerCryptoFailureAction": "FAIL",
        "consumerCryptoFailureAction": "FAIL"
      },
      "poolMessages": true,
      "regexPattern": true
    },
    "property2": {
      "schemaType": "string",
      "serdeClassName": "string",
      "schemaProperties": {
        "property1": "string",
        "property2": "string"
      },
      "consumerProperties": {
        "property1": "string",
        "property2": "string"
      },
      "receiverQueueSize": 0,
      "cryptoConfig": {
        "cryptoKeyReaderClassName": "string",
        "cryptoKeyReaderConfig": {
          "property1": {},
          "property2": {}
        },
        "encryptionKeys": [
          "string"
        ],
        "producerCryptoFailureAction": "FAIL",
        "consumerCryptoFailureAction": "FAIL"
      },
      "poolMessages": true,
      "regexPattern": true
    }
  },
  "maxMessageRetries": 0,
  "deadLetterTopic": "string",
  "configs": {
    "property1": {},
    "property2": {}
  },
  "secrets": {
    "property1": {},
    "property2": {}
  },
  "parallelism": 0,
  "processingGuarantees": "ATLEAST_ONCE",
  "retainOrdering": true,
  "retainKeyOrdering": true,
  "resources": {
    "cpu": 0,
    "ram": 0,
    "disk": 0
  },
  "autoAck": true,
  "timeoutMs": 0,
  "negativeAckRedeliveryDelayMs": 0,
  "sinkType": "string",
  "archive": "string",
  "cleanupSubscription": true,
  "runtimeFlags": "string",
  "customRuntimeOptions": "string",
  "transformFunction": "string",
  "transformFunctionClassName": "string",
  "transformFunctionConfig": "string"
}

Health

  • Pulsar Admin

  • cURL

  • Response

Refer to the complete pulsar-admin sinks spec for all available options.

Assuming you have downloaded client.conf to the Pulsar folder:

# Check connector status
./bin/pulsar-admin sinks status \
  --instance-id "$SINK_INSTANCEID" \
  --namespace "$NAMESPACE" \
  --name "$SINK_NAME" \
  --tenant "$TENANT"

You’ll need to create an Astra Streaming API token to be used with the REST API. This is different from your Astra tokens.

Navigate to the "Settings" area in the Astra Streaming UI and choose "Create Token".

Retrieve the web service URL from the "Connect" tab in the Astra Streaming UI.

Refer to the complete Pulsar sinks REST API spec for all available options.

export WEB_SERVICE_URL=<replace-me>
export ASTRA_STREAMING_TOKEN=<replace-me>
# Get the status of all connector instances
curl -sS --fail --location ''$WEB_SERVICE_URL'/admin/v3/sinks/'$TENANT'/'$NAMESPACE'/'$SINK_NAME'/status' \
  --header "Authorization: Bearer $ASTRA_STREAMING_TOKEN"

# Get the status of an individual connector instance
curl "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME/$SINK_INSTANCEID/status" \
  -H "accept: application/json" \
  -H "Authorization: $ASTRA_STREAMING_TOKEN"

Status response for all connector instances

{
 "numInstances": 0,
 "numRunning": 0,
 "instances": [
   {
     "instanceId": 0,
     "status": {
       "running": true,
       "error": "string",
       "numRestarts": 0,
       "numReadFromPulsar": 0,
       "numSystemExceptions": 0,
       "latestSystemExceptions": [
         {
           "exceptionString": "string",
           "timestampMs": 0
         }
       ],
       "numSinkExceptions": 0,
       "latestSinkExceptions": [
         {
           "exceptionString": "string",
           "timestampMs": 0
         }
       ],
       "numWrittenToSink": 0,
       "lastReceivedTime": 0,
       "workerId": "string"
     }
   }
 ]
}

Status response for individual connector instance

{
 "running": true,
 "error": "string",
 "numRestarts": 0,
 "numReadFromPulsar": 0,
 "numSystemExceptions": 0,
 "latestSystemExceptions": [
   {
     "exceptionString": "string",
     "timestampMs": 0
   }
 ],
 "numSinkExceptions": 0,
 "latestSinkExceptions": [
   {
     "exceptionString": "string",
     "timestampMs": 0
   }
 ],
 "numWrittenToSink": 0,
 "lastReceivedTime": 0,
 "workerId": "string"
}

Metrics

Astra Streaming exposes Prometheus formatted metrics for every connector. Refer to the scrape metrics with Prometheus page for more detail.

Connector Reference

There are two sets of parameters that support sink connectors.

Astra Streaming

Name Required Default Description

archive

true

The connector type, like 'builtin://elastic_search'

autoAck

true

false

Boolean denotes whether or not the framework will automatically acknowledge messages

className

true

The connector type’s class reference, like 'org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource'

cleanupSubscription

false

false

Boolean denotes whether the subscriptions the functions created/used should be deleted when the functions is deleted

configs

false

{}

A key/value map of config properties specific to the type of connector. See the reference table below for values.

customRuntimeOptions

false

A string that encodes options to customize the runtime, see Apache Pulsar docs for configured runtime for details

deadLetterTopic

false

Name of the dead topic where the failing messages will be sent

inputSpecs

false

The map of input topics to its consumer configuration, each configuration has schema of {"schemaType": "type-x", "serdeClassName": "name-x", "isRegexPattern": true, "receiverQueueSize": 5}

inputs

true

[]

The input topic or topics of the Sink (specified as a JSON array)

maxMessageRetries

false

Maximum number of times that a message will be redelivered before being sent to the dead letter queue

name

true

Give your sink a good name for later reference. The name must start with a lowercase alphabetic character. It can only contain lowercase alphanumeric characters, and hyphens(kebab-case).

namespace

true

The namespace you’d like the sink created under

negativeAckRedeliveryDelayMs

false

The negative ack message redelivery delay in milliseconds

parallelism

true

1

The number of sink instances to run

processingGuarantees

true

ATLEAST_ONCE

The delivery semantics applied to the Pulsar Sink. Values are 'ATLEAST_ONCE', 'ATMOST_ONCE', 'EFFECTIVELY_ONCE'

resources

false

The compute resources that need to be allocated per instance (applicable only to the process)(as a JSON string). Example: {"cpu": 0.25,"disk":1000000000,"ram":500000000}

retainKeyOrdering

false

true

Sink consumes and processes messages in key order

retainOrdering

false

false

Boolean denotes whether the Pulsar Sink consumes and processes messages in order

runtimeFlags

false

A string that encodes options to customize the runtime, see Apache Pulsar docs for configured runtime for details

secrets

false

This is a map of secretName(that is how the secret is going to be accessed in the function via context) to an object that encapsulates how the secret is fetched by the underlying secrets provider. The type of an value here can be found by the SecretProviderConfigurator.getSecretObjectType() method

sourceSubscriptionName

false

Pulsar source subscription name if user wants a specific subscription-name for input-topic consumer

sourceSubscriptionPosition

false

Earliest

The position to begin reading from the source

tenant

true

The tenant you’d like the sink created under

timeoutMs

false

5000

Denotes the message timeout in milliseconds

topicToSchemaProperties

false

topicToSchemaType

false

The map of input topics to Schema types or class names (specified as a JSON object)

topicToSerdeClassName

false

The map of input topics to SerDe class names (specified as a JSON object)

topicsPattern

false

TopicsPattern to consume from list of topics under a namespace that match the pattern. [input] and [topicsPattern] are mutually exclusive. Add SerDe class name for a pattern in customSerdeInputs (supported for java fun only)

Kafka (configs)

These values are provided in the "configs" area.

The Astra Streaming Kafka sink connector supports all configuration properties provided by Apache Pulsar. Please refer to the connector properties for a complete list.

What’s next?

Learn more about Kafka’s features and capabilities on their site.

Learn more about Apache Pulsar’s Kafka sink connector here.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com