Elasticsearch sink connector

The Elasticsearch sink connector reads messages from Pulsar topics and persists messages to indexes.

Configuration

The configuration of the Elasticsearch sink connector has the following properties.

Name

Type

Required

Default

Description

apiKey

String

false

" " (empty string)

The apiKey used by the connector to connect to the ElasticSearch cluster. Only one between basic/token/apiKey authentication mode must be configured.

bulkActions

Integer

false

1000

The maximum number of actions per elasticsearch bulk request. Use -1 to disable it.

bulkConcurrentRequests

Integer

false

0

The maximum number of in flight elasticsearch bulk requests. The default 0 allows the execution of a single request. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests.

bulkEnabled

Boolean

false

false

Enable the elasticsearch bulk processor to flush write requests based on the number or size of requests, or after a given period.

bulkFlushIntervalInMs

Long

false

1000

The maximum period of time to wait for flushing pending writes when bulk writes are enabled. -1 or zero means the scheduled flushing is disabled.

bulkSizeInMb

Integer

false

5

The maximum size in megabytes of elasticsearch bulk requests. Use -1 to disable it.

canonicalKeyFields

Boolean

false

false

Whether to sort the key fields for JSON and Avro or not. If it is set to true and the record key schema is JSON or AVRO, the serialized object does not consider the order of properties.

compatibilityMode

enum (AUTO,ELASTICSEARCH,ELASTICSEARCH_7,OPENSEARCH)

false

AUTO

Specify compatibility mode with the ElasticSearch cluster. AUTO value will try to auto detect the correct compatibility mode to use. Use ELASTICSEARCH_7 if the target cluster is running ElasticSearch 7 or prior. Use ELASTICSEARCH if the target cluster is running ElasticSearch 8 or higher. Use OPENSEARCH if the target cluster is running OpenSearch.

compressionEnabled

Boolean

false

false

Enable elasticsearch request compression.

connectionIdleTimeoutInMs

Integer

false

5

Idle connection timeout to prevent a read timeout.

connectionRequestTimeoutInMs

Integer

false

1000

The time in milliseconds for getting a connection from the elasticsearch connection pool.

connectTimeoutInMs

Integer

false

5000

The elasticsearch client connection timeout in milliseconds.

createIndexIfNeeded

Boolean

false

false

Manage index if missing.

elasticSearchUrl

String

true

" " (empty string)

The URL of elastic search cluster to which the connector connects.

idHashingAlgorithm

enum(NONE,SHA256,SHA512)

false

NONE

Hashing algorithm to use for the document id. This is useful in order to be compliant with the ElasticSearch _id hard limit of 512 bytes.

indexName

String

false

" " (empty string)

The index name to which the connector writes messages. The default value is the topic name. It accepts date formats in the name to support event time based index with the pattern %{+<date-format>}. For example, suppose the event time of the record is 1645182000000L, the indexName is logs-%{+yyyy-MM-dd}, then the formatted index name would be logs-2022-02-18.

indexNumberOfReplicas

int

false

1

The number of replicas of the index.

indexNumberOfShards

int

false

1

The number of shards of the index.

keyIgnore

Boolean

false

true

Whether to ignore the record key to build the Elasticsearch document _id. If primaryFields is defined, the connector extract the primary fields from the payload to build the document _id If no primaryFields are provided, elasticsearch auto generates a random document _id.

malformedDocAction

enum (IGNORE,WARN,FAIL)

false

FAIL

How to handle elasticsearch rejected documents due to some malformation. Possible options are IGNORE, DELETE or FAIL. Default is FAIL the Elasticsearch document.

maxRetries

Integer

false

1

The maximum number of retries for elasticsearch requests. Use -1 to disable it.

maxRetryTimeInSec

Integer

false

86400

The maximum retry time interval in seconds for retrying an elasticsearch request.

nullValueAction

enum (IGNORE,DELETE,FAIL)

false

IGNORE

How to handle records with null values, possible options are IGNORE, DELETE or FAIL. Default is IGNORE the message.

password

String

false

" " (empty string)

The password used by the connector to connect to the elastic search cluster. <br /><br />If username is set, then password should also be provided.

primaryFields

String

false

"id"

The comma separated ordered list of field names used to build the Elasticsearch document _id from the record value. If this list is a singleton, the field is converted as a string. If this list has 2 or more fields, the generated _id is a string representation of a JSON array of the field values.

retryBackoffInMs

Integer

false

100

The base time to wait when retrying an Elasticsearch request (in milliseconds).

schemaEnable

Boolean

false

false

Turn on the Schema Aware mode.

socketTimeoutInMs

Integer

false

60000

The socket timeout in milliseconds waiting to read the elasticsearch response.

ssl

ElasticSearchSslConfig

false

string

Configuration for TLS encrypted communication. See ElasticSearchSslConfig structure.

stripNonPrintableCharacters

Boolean

false

true

Whether to remove all non-printable characters from the document or not. If it is set to true, all non-printable characters are removed from the document.

stripNulls

Boolean

false

true

If stripNulls is false, elasticsearch _source includes 'null' for empty fields (for example {"foo": null}), otherwise null fields are stripped.

token

String

false

" " (empty string)

The token used by the connector to connect to the ElasticSearch cluster. Only one between basic/token/apiKey authentication mode must be configured.

typeName

String

false

"_doc"

The type name to which the connector writes messages to. The value should be set explicitly to a valid type name other than "_doc" for Elasticsearch version before 6.2, and left to default otherwise.

username

String

false

" " (empty string)

The username used by the connector to connect to the elastic search cluster. If username is set, then password should also be provided.

ElasticSearchSslConfig structure

Name

Type

Required

Default

Description

cipherSuites

String

false

" " (empty string)

SSL/TLS cipher suites.

disableCertificateValidation

Boolean

false

true

Whether or not to disable the node certificate validation. Changing this value is highly insecure and you should not use it in production environment.

enabled

Boolean

false

false

Enable SSL/TLS.

hostnameVerification

Boolean

false

true

Whether or not to validate node hostnames when using SSL.

keystorePassword

String

false

" " (empty string)

Keystore password.

keystorePath

String

false

" " (empty string)

The path to the keystore file.

protocols

String

false

"TLSv1.2"

Comma separated list of enabled SSL/TLS protocols.

truststorePassword

String

false

" " (empty string)

Truststore password.

truststorePath

String

false

" " (empty string)

The path to the truststore file.

Example

Requirements

To deploy an Elasticsearch sink connector, the following are required:

  • Elasticsearch 7 (Elasticsearch 8 will be supported in the future)

  • OpenSearch 1.x

Usage

  1. Create a JSON or YAML configuration file.

    • JSON

    • YAML

    {
        "configs": {
        "elasticSearchUrl": "http://localhost:9200",
        "indexName": "my_index",
        "username": "username",
        "password": "password"
        }
    }
    configs:
        elasticSearchUrl: "http://localhost:9200"
        indexName: "my_index"
        username: "username"
        password: "password"

    For Elasticsearch versions before 6.2, the value of typeName is required, and should be set explicitly to a valid type name other than "_doc".

  2. Start a single node Elasticsearch cluster.

    $ docker run -p 9200:9200 -p 9300:9300 \
        -e "discovery.type=single-node" \
        docker.elastic.co/elasticsearch/elasticsearch:7.13.3
  3. Start a Pulsar service locally in standalone mode.

    $ bin/pulsar standalone
  4. Make sure the connector NAR file is available at connectors/pulsar-io-elastic-search-@pulsar:version@.nar.

  5. Start the Pulsar Elasticsearch connector in local run mode using the JSON or YAML configuration file.

    • JSON

    • YAML

    $ bin/pulsar-admin sinks localrun \
        --archive connectors/pulsar-io-elastic-search-@pulsar:version@.nar \
        --tenant public \
        --namespace default \
        --name elasticsearch-test-sink \
        --sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName": "my_index","username": "username","password": "password"}' \
        --inputs elasticsearch_test
    $ bin/pulsar-admin sinks localrun \
        --archive connectors/pulsar-io-elastic-search-@pulsar:version@.nar \
        --tenant public \
        --namespace default \
        --name elasticsearch-test-sink \
        --sink-config-file elasticsearch-sink.yml \
        --inputs elasticsearch_test
  6. Publish records to the topic.

    $ bin/pulsar-client produce elasticsearch_test --messages "{\"a\":1}"
  7. Check documents in Elasticsearch.

    1. Refresh the index.

      $ curl -s http://localhost:9200/my_index/_refresh
    2. Search documents.

      $ curl -s http://localhost:9200/my_index/_search
    3. You can see the record that published earlier has been successfully written into Elasticsearch.

      {"took":2,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"my_index","_type":"_doc","_id":"FSxemm8BLjG_iC0EeTYJ","_score":1.0,"_source":{"a":1}}]}}

Example with TLS enabled

Enable Transport Layer Security (TLS) on your Elasticsearch cluster to encrypt network traffic from eavesdropping.

  1. Create a JSON or YAML configuration file with TLS/SSL enabled.

    • JSON

    • YAML

    {
       "configs": {
          "elasticSearchUrl": "http://localhost:9200",
          "indexName": "my_index",
          "username": "username",
          "password": "password",
          "ssl": {
            "enabled": true
            "truststorePath": "/pulsar/security/truststore.jks"
            "truststorePassword": "truststorepass"
            "keystorePath": "/pulsar/security/keystore.jks"
            "keystorePassword": "keystorepass"
          }
       }
    }
    configs:
        elasticSearchUrl: "http://localhost:9200"
        indexName: "my_index"
        username: "username"
        password: "password"
        ssl:
            enabled: true
            truststorePath: "/pulsar/security/truststore.jks"
            truststorePassword: "truststorepass"
            keystorePath: ""/pulsar/security/keystore.jks""
            keystorePassword: "keystorepass"
  2. Start a single node Elasticsearch cluster.

    $ docker run -p 9200:9200 -p 9300:9300 \
        -e "discovery.type=single-node" \
        docker.elastic.co/elasticsearch/elasticsearch:7.13.3
  3. Start a Pulsar service locally in standalone mode.

    $ bin/pulsar standalone
  4. Make sure the connector NAR file is available at connectors/pulsar-io-elastic-search-@pulsar:version@.nar.

  5. Start the Pulsar Elasticsearch connector in local run mode using the JSON or YAML configuration file.

    • JSON

    • YAML

    $ bin/pulsar-admin sinks localrun \
        --archive connectors/pulsar-io-elastic-search-@pulsar:version@.nar \
        --tenant public \
        --namespace default \
        --name elasticsearch-test-sink \
        --sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName": "my_index","username": "username","password": "password",ssl: {"enabled": true,"truststorePath": "/pulsar/security/truststore.jks","truststorePassword": "truststorepass","keystorePath": "/pulsar/security/keystore.jks","keystorePassword": "keystorepass"}}' \
        --inputs elasticsearch_test
    $ bin/pulsar-admin sinks localrun \
        --archive connectors/pulsar-io-elastic-search-@pulsar:version@.nar \
        --tenant public \
        --namespace default \
        --name elasticsearch-test-sink \
        --sink-config-file elasticsearch-sink.yml \
        --inputs elasticsearch_test
  6. Publish records to the topic.

    $ bin/pulsar-client produce elasticsearch_test --messages "{\"a\":1}"
  7. Check documents in Elasticsearch.

    1. Refresh the index.

      $ curl -s http://localhost:9200/my_index/_refresh
    2. Search documents.

      $ curl -s http://localhost:9200/my_index/_search
    3. You can see the record that published earlier has been successfully written into Elasticsearch.

      {"took":2,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"my_index","_type":"_doc","_id":"FSxemm8BLjG_iC0EeTYJ","_score":1.0,"_source":{"a":1}}]}}

What’s next?

For more sink connectors, see Luna Streaming sink connectors. For more source connectors, see Luna Streaming source connectors.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com