Elasticsearch sink connector
The Elasticsearch sink connector reads messages from Pulsar topics and persists messages to indexes.
Configuration
The configuration of the Elasticsearch sink connector has the following properties.
Name |
Type |
Required |
Default |
Description |
|
String |
false |
" " (empty string) |
The apiKey used by the connector to connect to the ElasticSearch cluster. Only one between basic/token/apiKey authentication mode must be configured. |
|
Integer |
false |
1000 |
The maximum number of actions per elasticsearch bulk request. Use -1 to disable it. |
|
Integer |
false |
0 |
The maximum number of in flight elasticsearch bulk requests. The default 0 allows the execution of a single request. A value of 1 means 1 concurrent request is allowed to be executed while accumulating new bulk requests. |
|
Boolean |
false |
false |
Enable the elasticsearch bulk processor to flush write requests based on the number or size of requests, or after a given period. |
|
Long |
false |
1000 |
The maximum period of time to wait for flushing pending writes when bulk writes are enabled. -1 or zero means the scheduled flushing is disabled. |
|
Integer |
false |
5 |
The maximum size in megabytes of elasticsearch bulk requests. Use -1 to disable it. |
|
Boolean |
false |
false |
Whether to sort the key fields for JSON and Avro or not. If it is set to |
|
enum (AUTO,ELASTICSEARCH,ELASTICSEARCH_7,OPENSEARCH) |
false |
AUTO |
Specify compatibility mode with the ElasticSearch cluster. |
|
Boolean |
false |
false |
Enable elasticsearch request compression. |
|
Integer |
false |
5 |
Idle connection timeout to prevent a read timeout. |
|
Integer |
false |
1000 |
The time in milliseconds for getting a connection from the elasticsearch connection pool. |
|
Integer |
false |
5000 |
The elasticsearch client connection timeout in milliseconds. |
|
Boolean |
false |
false |
Manage index if missing. |
|
String |
true |
" " (empty string) |
The URL of elastic search cluster to which the connector connects. |
|
enum(NONE,SHA256,SHA512) |
false |
NONE |
Hashing algorithm to use for the document id. This is useful in order to be compliant with the ElasticSearch _id hard limit of 512 bytes. |
|
String |
false |
" " (empty string) |
The index name to which the connector writes messages. The default value is the topic name. It accepts date formats in the name to support event time based index with the pattern |
|
int |
false |
1 |
The number of replicas of the index. |
|
int |
false |
1 |
The number of shards of the index. |
|
Boolean |
false |
true |
Whether to ignore the record key to build the Elasticsearch document |
|
enum (IGNORE,WARN,FAIL) |
false |
FAIL |
How to handle elasticsearch rejected documents due to some malformation. Possible options are IGNORE, DELETE or FAIL. Default is FAIL the Elasticsearch document. |
|
Integer |
false |
1 |
The maximum number of retries for elasticsearch requests. Use -1 to disable it. |
|
Integer |
false |
86400 |
The maximum retry time interval in seconds for retrying an elasticsearch request. |
|
enum (IGNORE,DELETE,FAIL) |
false |
IGNORE |
How to handle records with null values, possible options are IGNORE, DELETE or FAIL. Default is IGNORE the message. |
|
String |
false |
" " (empty string) |
The password used by the connector to connect to the elastic search cluster. <br /><br />If |
|
String |
false |
"id" |
The comma separated ordered list of field names used to build the Elasticsearch document |
|
Integer |
false |
100 |
The base time to wait when retrying an Elasticsearch request (in milliseconds). |
|
Boolean |
false |
false |
Turn on the Schema Aware mode. |
|
Integer |
false |
60000 |
The socket timeout in milliseconds waiting to read the elasticsearch response. |
|
ElasticSearchSslConfig |
false |
string |
Configuration for TLS encrypted communication. See ElasticSearchSslConfig structure. |
|
Boolean |
false |
true |
Whether to remove all non-printable characters from the document or not. If it is set to true, all non-printable characters are removed from the document. |
|
Boolean |
false |
true |
If stripNulls is false, elasticsearch _source includes 'null' for empty fields (for example {"foo": null}), otherwise null fields are stripped. |
|
String |
false |
" " (empty string) |
The token used by the connector to connect to the ElasticSearch cluster. Only one between basic/token/apiKey authentication mode must be configured. |
|
String |
false |
"_doc" |
The type name to which the connector writes messages to. The value should be set explicitly to a valid type name other than "_doc" for Elasticsearch version before 6.2, and left to default otherwise. |
|
String |
false |
" " (empty string) |
The username used by the connector to connect to the elastic search cluster. If |
ElasticSearchSslConfig structure
Name |
Type |
Required |
Default |
Description |
|
String |
false |
" " (empty string) |
SSL/TLS cipher suites. |
|
Boolean |
false |
true |
Whether or not to disable the node certificate validation. Changing this value is highly insecure and you should not use it in production environment. |
|
Boolean |
false |
false |
Enable SSL/TLS. |
|
Boolean |
false |
true |
Whether or not to validate node hostnames when using SSL. |
|
String |
false |
" " (empty string) |
Keystore password. |
|
String |
false |
" " (empty string) |
The path to the keystore file. |
|
String |
false |
"TLSv1.2" |
Comma separated list of enabled SSL/TLS protocols. |
|
String |
false |
" " (empty string) |
Truststore password. |
|
String |
false |
" " (empty string) |
The path to the truststore file. |
Example
Requirements
To deploy an Elasticsearch sink connector, the following are required:
-
Elasticsearch 7 (Elasticsearch 8 will be supported in the future)
-
OpenSearch 1.x
Usage
-
Create a JSON or YAML configuration file.
-
JSON
-
YAML
{ "configs": { "elasticSearchUrl": "http://localhost:9200", "indexName": "my_index", "username": "username", "password": "password" } }
configs: elasticSearchUrl: "http://localhost:9200" indexName: "my_index" username: "username" password: "password"
For Elasticsearch versions before 6.2, the value of
typeName
is required, and should be set explicitly to a valid type name other than "_doc". -
-
Start a single node Elasticsearch cluster.
$ docker run -p 9200:9200 -p 9300:9300 \ -e "discovery.type=single-node" \ docker.elastic.co/elasticsearch/elasticsearch:7.13.3
-
Start a Pulsar service locally in standalone mode.
$ bin/pulsar standalone
-
Make sure the connector NAR file is available at
connectors/pulsar-io-elastic-search-@pulsar:version@.nar
. -
Start the Pulsar Elasticsearch connector in local run mode using the JSON or YAML configuration file.
-
JSON
-
YAML
$ bin/pulsar-admin sinks localrun \ --archive connectors/pulsar-io-elastic-search-@pulsar:version@.nar \ --tenant public \ --namespace default \ --name elasticsearch-test-sink \ --sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName": "my_index","username": "username","password": "password"}' \ --inputs elasticsearch_test
$ bin/pulsar-admin sinks localrun \ --archive connectors/pulsar-io-elastic-search-@pulsar:version@.nar \ --tenant public \ --namespace default \ --name elasticsearch-test-sink \ --sink-config-file elasticsearch-sink.yml \ --inputs elasticsearch_test
-
-
Publish records to the topic.
$ bin/pulsar-client produce elasticsearch_test --messages "{\"a\":1}"
-
Check documents in Elasticsearch.
-
Refresh the index.
$ curl -s http://localhost:9200/my_index/_refresh
-
Search documents.
$ curl -s http://localhost:9200/my_index/_search
-
You can see the record that published earlier has been successfully written into Elasticsearch.
{"took":2,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"my_index","_type":"_doc","_id":"FSxemm8BLjG_iC0EeTYJ","_score":1.0,"_source":{"a":1}}]}}
-
Example with TLS enabled
Enable Transport Layer Security (TLS) on your Elasticsearch cluster to encrypt network traffic from eavesdropping.
-
Create a JSON or YAML configuration file with TLS/SSL enabled.
-
JSON
-
YAML
{ "configs": { "elasticSearchUrl": "http://localhost:9200", "indexName": "my_index", "username": "username", "password": "password", "ssl": { "enabled": true "truststorePath": "/pulsar/security/truststore.jks" "truststorePassword": "truststorepass" "keystorePath": "/pulsar/security/keystore.jks" "keystorePassword": "keystorepass" } } }
configs: elasticSearchUrl: "http://localhost:9200" indexName: "my_index" username: "username" password: "password" ssl: enabled: true truststorePath: "/pulsar/security/truststore.jks" truststorePassword: "truststorepass" keystorePath: ""/pulsar/security/keystore.jks"" keystorePassword: "keystorepass"
-
-
Start a single node Elasticsearch cluster.
$ docker run -p 9200:9200 -p 9300:9300 \ -e "discovery.type=single-node" \ docker.elastic.co/elasticsearch/elasticsearch:7.13.3
-
Start a Pulsar service locally in standalone mode.
$ bin/pulsar standalone
-
Make sure the connector NAR file is available at
connectors/pulsar-io-elastic-search-@pulsar:version@.nar
. -
Start the Pulsar Elasticsearch connector in local run mode using the JSON or YAML configuration file.
-
JSON
-
YAML
$ bin/pulsar-admin sinks localrun \ --archive connectors/pulsar-io-elastic-search-@pulsar:version@.nar \ --tenant public \ --namespace default \ --name elasticsearch-test-sink \ --sink-config '{"elasticSearchUrl":"http://localhost:9200","indexName": "my_index","username": "username","password": "password",ssl: {"enabled": true,"truststorePath": "/pulsar/security/truststore.jks","truststorePassword": "truststorepass","keystorePath": "/pulsar/security/keystore.jks","keystorePassword": "keystorepass"}}' \ --inputs elasticsearch_test
$ bin/pulsar-admin sinks localrun \ --archive connectors/pulsar-io-elastic-search-@pulsar:version@.nar \ --tenant public \ --namespace default \ --name elasticsearch-test-sink \ --sink-config-file elasticsearch-sink.yml \ --inputs elasticsearch_test
-
-
Publish records to the topic.
$ bin/pulsar-client produce elasticsearch_test --messages "{\"a\":1}"
-
Check documents in Elasticsearch.
-
Refresh the index.
$ curl -s http://localhost:9200/my_index/_refresh
-
Search documents.
$ curl -s http://localhost:9200/my_index/_search
-
You can see the record that published earlier has been successfully written into Elasticsearch.
{"took":2,"timed_out":false,"_shards":{"total":1,"successful":1,"skipped":0,"failed":0},"hits":{"total":{"value":1,"relation":"eq"},"max_score":1.0,"hits":[{"_index":"my_index","_type":"_doc","_id":"FSxemm8BLjG_iC0EeTYJ","_score":1.0,"_source":{"a":1}}]}}
-
What’s next?
For more sink connectors, see Luna Streaming sink connectors. For more source connectors, see Luna Streaming source connectors.