Elasticsearch
The Elasticsearch sink connector reads messages from Pulsar topics and writes them to Elasticsearch.
Compatibility
Astra Streaming supports Apache Pulsar™ 3.1, which uses the OpenSearch 1.2.4 library.
For information about compatibility between OpenSearch and Elasticsearch, see the OpenSearch FAQ.
Create the connector
-
Optional: If you are using the
pulsar-adminCLI or Pulsar Admin API, set the following commonly-used environment variables:export TENANT="TENANT_NAME" export TOPIC="INPUT_TOPIC_NAME" export NAMESPACE="NAMESPACE_NAME" # or default export SINK_NAME="SINK_CONNECTOR_UNIQUE_NAME" export PULSAR_TOKEN="TENANT_PULSAR_TOKEN" # API only export WEB_SERVICE_URL="TENANT_PULSAR_WEB_SERVICE_URL" # API onlySINK_NAMEis the name for your new sink connector. DataStax recommends a memorable, human-readable name that summarizes the connector’s purpose. For example:elastic_search-sink-prod-us-east-1. -
Create the connector using JSON-formatted connector configuration data. You can pass the configuration directly or with a configuration file.
pulsar-admin CLI./bin/pulsar-admin sinks create \ --sink-type elastic_search \ --name "$SINK_NAME" \ --inputs "persistent://$TENANT/$NAMESPACE/$TOPIC" \ --tenant "$TENANT" \ --sink-config-file configs.jsonPulsar Admin APIcurl -sS --fail -L -X POST "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME" \ --header "Authorization: Bearer $PULSAR_TOKEN" \ --form "sinkConfig=@configs.json;type=application/json"Example configuration data structure{ "tenant": "${TENANT}", "namespace": "${NAMESPACE}", "name": "${SINK_NAME}", "className": "org.apache.pulsar.io.elasticsearch.ElasticSearchSink", "sourceSubscriptionName": "string", "sourceSubscriptionPosition": "Latest", "inputs": [ "persistent://${TENANT}/${NAMESPACE}/${TOPIC}" ], "topicToSerdeClassName": { "property1": "string", "property2": "string" }, "topicToSchemaType": { "property1": "string", "property2": "string" }, "topicToSchemaProperties": { "property1": "string", "property2": "string" }, "inputSpecs": { "persistent://${TENANT}/${NAMESPACE}/${TOPIC}": { "schemaType": "string", "serdeClassName": "string", "schemaProperties": { "property1": "string", "property2": "string" }, "consumerProperties": { "property1": "string", "property2": "string" }, "receiverQueueSize": 0, "cryptoConfig": { "cryptoKeyReaderClassName": "string", "cryptoKeyReaderConfig": { "property1": {}, "property2": {} }, "encryptionKeys": [ "string" ], "producerCryptoFailureAction": "FAIL", "consumerCryptoFailureAction": "FAIL" }, "poolMessages": true, "regexPattern": true } }, "maxMessageRetries": 0, "deadLetterTopic": "string", "configs": { "elasticSearchUrl": "http://localhost:9200", "username": "user", "password":"${PASSWORD}", "indexName":"my_index" }, "secrets": {}, "parallelism": 1, "processingGuarantees": "ATLEAST_ONCE", "retainOrdering": false, "retainKeyOrdering": true, "resources": { "cpu": 0.25, "disk": 1000000000, "ram": 1000000000 }, "autoAck": true, "timeoutMs": 5000, "negativeAckRedeliveryDelayMs": 0, "archive": "builtin://elastic_search", "cleanupSubscription": true, "runtimeFlags": "string", "customRuntimeOptions": "string", "transformFunction": "string", "transformFunctionClassName": "string", "transformFunctionConfig": "string" }
Edit the connector
To update a connector, pass the new configuration definition to the connector. For example, if you created the connector with a configuration file, you can pass an updated configuration file.
You can include the entire configuration or only the properties that you want to change.
Additionally, some properties can be modified with specific arguments, such as --parallelism.
To get the current configuration, see Get sink connector configuration data.
./bin/pulsar-admin sinks update \
--sink-type elastic_search \
--name "$SINK_NAME" \
--inputs "persistent://$TENANT/$NAMESPACE/$TOPIC" \
--tenant "$TENANT" \
--parallelism 2
curl -sS --fail -L -X PUT "$WEB_SERVICE_URL/admin/v3/sinks/$TENANT/$NAMESPACE/$SINK_NAME" \
--header "Authorization: Bearer $PULSAR_TOKEN" \
--form "sinkConfig=@configs.json;type=application/json"
Manage the connector
See Create and manage connectors for details on how to manage connectors after you create them, including:
-
Get connector status
-
Get existing connectors
-
Get connector configuration details
-
Start connectors
-
Stop connectors
-
Restart connectors
-
Delete connectors
Connector configuration reference
To configure Astra Streaming Pulsar connectors, you use a combination of common Pulsar properties and provider-specific properties.
Because Astra Streaming and the Astra Streaming Pulsar connectors are based on a specific version of the open-source Apache Pulsar project and connectors, some properties and values might be unsupported or ignored by Astra Streaming.
Pulsar sink connector properties
|
Pulsar connectors and functions both use Pulsar functions workers. Therefore, some function configuration properties are also used to configure connectors. |
The following table lists Astra Streaming Pulsar sink connector configuration properties by JSON field name. For the equivalent command line arguments and more information about these properties, see the following:
| Field name | Required | Description |
|---|---|---|
|
Yes |
The type of built-in Astra Streaming Pulsar connector that you want to deploy.
Formatted as |
|
Yes |
Whether the framework automatically acknowledges messages. Default: |
|
Yes |
The connector type’s class reference beginning with |
|
No |
Whether to delete subscriptions that are created or used by a sink when the sink is deleted. Default: |
|
Yes |
A JSON-formatted key-value map containing configuration properties specific to the connector type, including provider-specific authentication and integration settings.
For available subproperties, see Elasticsearch sink connector properties ( Default: |
|
No |
A string that encodes options to configure the Apache Pulsar function runtime. |
|
No |
The name of the topic that receives unacknowledged messages, such as those that exceed the maximum number of retries or fail to be processed completely. If See also |
|
No |
A map of input topics to consumer configuration.
By default, most values are
|
|
Yes |
An array of input topics that the sink consumes messages from, such as To consume all topics matching a naming pattern, use Default: |
|
No |
Maximum number of times that a message attempts to be delivered before being sent to the dead letter queue as an unacknowledged message. See also |
|
Yes |
The name for the connector.
It must start with a lowercase letter, and contain only numbers, hyphens ( |
|
Yes |
The namespace in your Pulsar tenant where you want to create the connector. |
|
No |
The amount of time, in milliseconds, to wait before attempting redelivery if message delivery times out or fails. See also |
|
Yes |
The number of Pulsar function instances to run. Default: |
|
Yes |
The messaging delivery semantic to use when writing to topics: Respect for the processing guarantee depends on the connector implementation. For more information, see the Pulsar documentation on Function processing guarantees and Processing guarantees in I/O connectors. Default: |
|
No |
A JSON string describing the compute resources to allocate to each Pulsar function instance.
For example: |
|
No |
Whether the sink consumes and processes messages in key order. Default: |
|
No |
Whether the sink consumes and processes messages in the order they were written to the topic. Default: |
|
No |
A string that encodes flags to pass to the Apache Pulsar function runtime. Only applicable to process and Kubernetes runtimes. |
|
No |
If security is enabled on your function workers, you can provide a map of secret names ( To get the types for the values in this mapping, use the This is separate from connector-specific security settings in |
|
No |
The name of a specific Pulsar source subscription, if required by your input topic consumer. |
|
No |
The position to begin reading from in the source, if Default: |
|
Yes |
The Pulsar tenant where you want to create the connector. |
|
No |
The message timeout in milliseconds Default: |
|
No |
A topic naming pattern to select topics to consume from all topics in a namespace. To consume an exact list of topics, use |
|
No |
A map of input topics to schema properties specified as a JSON object. |
|
No |
A map of input topics to schema types or class names specified as a JSON object. |
|
No |
A map of input topics to SerDe class names specified as a JSON object. |
Elasticsearch sink connector properties (configs)
Set these properties in the configs section of the connector configuration.
Generally, all properties provided in the OSS Apache Pulsar Elasticsearch sink connector are supported. Exceptions include properties that aren’t relevant to Astra Streaming and properties that are only present in incompatible versions.