Compute
The compute
transform function computes field values based on an expression
evaluated at runtime.
If the field already exists, it will be overwritten.
The step name is compute
and the function’s UserConfig
is controlled in this step:
{"steps": [{"type": "compute", "fields":[
{"name": "key.newKeyField", "expression" : "5*3", "type": "INT32"},
{"name": "value.valueField", "expression" : "fn:concat(value.valueField, '_suffix')", "type": "STRING"}]}
]}
Parameters:
Parameter | Description |
---|---|
fields |
An array of JSON objects describing how to calculate the field values. The JSON object represents a |
Field parameters
Name (field) | Description |
---|---|
name |
The name of the field to be computed. Prefix with |
expression |
Supports the Expression language syntax. It is evaluated at runtime and the result of the evaluation is assigned to the field. |
type |
The type of the computed field. this will translate to the schema type of the new field in the transformed message. The following types are currently supported [ |
optional (default: true) |
If true, it marks the field as optional in the schema of the transformed message. This is useful when |
Type parameters
Name (field.type) |
Description |
Expression Examples |
|
represents 32-bit integer. |
expression1: "2147483647", expression2: "1 + 1" |
|
represents 64-bit integer. |
expression1: "9223372036854775807", expression2: "1 + 1" |
|
represents 32-bit floating point. |
expression1: "340282346638528859999999999999999999999.999999", expression2: "1.1 + 1.1" |
|
represents 64-bit floating point. |
expression1: "1.79769313486231570e+308", expression2: "1.1 + 1.1" |
|
true or false |
expression1: "true", expression2: "1 == 1", expression3: "value.stringField == 'matching string'" |
|
a date without a time-zone in the RFC3339 format |
expression1: "2021-12-03" |
|
a time without a time-zone in the RFC3339 format |
expression1: "20:15:45" |
|
a date-time with an offset from UTC in the RFC3339 format |
expression1: "2022-10-02T01:02:03+02:00", expression2: "2019-10-02T01:02:03Z", expression3: "fn:now()" |
Expression language
Pulsar transforms use a simple expression language to evaluate conditional steps and compute steps.
The expression language uses dot notation to access field properties or map keys, and supports the following operations and functions:
Operators
The Expression Language supports the following operators:
Category | Suppoerted operators |
---|---|
Arithmetic |
+, - (binary), *, / and div, % and mod, - (unary) |
Logical |
and, &&, or ||, not, ! |
Relational |
==, eq, !=, ne, <, lt, >, gt, ⇐, ge, >=, le. |
Functions
Utility methods are available under the fn
namespace. For example, to calculate the current date, use fn:now()
.
The expression language supports the following functions:
Name (field) | Description |
---|---|
uppercase(input) |
Changes the capitalization of a string. If input is not a string, it attempts a string conversion. If the input is null, it returns null. |
lowercase(input) |
Changes the capitalization of a string. If input is not a string, it attempts a string conversion. If the input is null, it returns null. |
contains(input, value) |
Returns true if value exists in input. It attempts string conversion on both input and value if either is not a string. If input or value is null, ir returns false. |
trim(input) |
Returns the input string with all leading and trailing spaces removed. If the input is not a string, it attempts a string conversion. |
concat(input1, input2) |
Returns a string concatenation of input1 and input2. If either input is null, it is treated as an empty string. |
coalesce(value, valueIfNull) |
Returns value if it is not null, otherwise returns valueIfNull. |
now() |
Returns the current epoch millis. |
dateadd(input, delta, unit) |
Performs date/time arithmetic operations on the input date/time. |
Conditional Steps
Each step
accepts an optional when
configuration that is evaluated at step execution time against current records (the current step in the transform pipeline).
The when
condition supports the <expression language syntax>, which provides access to the record attributes as follows:
Name (field) | Description |
---|---|
key: |
the key portion of the record in a KeyValue schema. |
value: |
the value portion of the record in a KeyValue schema, or the message payload itself. |
messageKey: |
the optional key messages are tagged with (aka. Partition Key). |
topicName: |
the optional name of the topic which the record originated from (aka. Input Topic). |
destinationTopic: |
the name of the topic on which the transformed record will be sent (aka. Output Topic). |
eventTime: |
the optional timestamp attached to the record from its source. For example, the original timestamp attached to the pulsar message. |
properties: |
the optional user-defined properties attached to record. |
You can use the .
operator to access top level or nested properties on a schema-full key or value.
For example, key.keyField1
or value.valueFiled1.nestedValueField
.
When
example
For this KeyValue record:
{
"key": {
"compound": {
"uuid": "uuidValue",
"timestamp": 1663616014
},
"value" : {
"first" : "f1",
"last" : "l1",
"rank" : 1,
"address" : {
"zipcode" : "abc-def"
}
}
}}
These statements would evaluate in a when
statement:
when statement |
Evaluates to: |
---|---|
key.compound.uuid == 'uuidValue' |
True |
key.compound.timestamp <= 10 |
False |
value.first == 'f1' && value.last.toUpperCase() == 'L1' |
True |
value.rank <= 1 && value.address.substring(0, 3) == 'abc' |
True |
Multiply and concatenate example
-
Create a
compute
transform function with the Pulsar admin CLI:./bin/pulsar-admin functions create \ --tenant ${TENANT} \ --namespace ${NAMESPACE} \ --name transform-function \ --inputs persistent://${TENANT}/${NAMESPACE}/${INPUT_TOPIC} \ --output persistent://${TENANT}/${NAMESPACE}/${OUTPUT_TOPIC} \ --classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \ --jar functions/pulsar-transformations-2.0.1.nar \ --transform-function-config '{"steps": [{"type": "compute", "fields":[ {"name": "key.newKeyField", "expression" : "5*3", "type": "INT32"}, {"name": "value.valueField", "expression" : "fn:concat(value.valueField, '_suffix')", "type": "STRING"}]} ]}'
-
Produce an AVRO message with the payload:
-
AVRO
-
Result
{key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>)
{key={keyField: key, newKeyField: 15}, value={valueField: value_suffix}} (KeyValue<AVRO, AVRO>)
-
-
The function applies preprocessing to outgoing messages, in this case performing multiplication and concatenation operations to output
{key={keyField: key, newKeyField: 15}, value={valueField: value_suffix}} (KeyValue<AVRO, AVRO>)
to your output topic.
Message routing example
-
Create a
compute
transform function with the Pulsar admin CLI:./bin/pulsar-admin functions create \ --tenant ${TENANT} \ --namespace ${NAMESPACE} \ --name transform-function \ --inputs persistent://${TENANT}/${NAMESPACE}/${INPUT_TOPIC} \ --output persistent://${TENANT}/${NAMESPACE}/${OUTPUT_TOPIC} \ --classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \ --jar functions/pulsar-transformations-2.0.1.nar \ --transform-function-config `{"steps": [{"type": "compute", "fields":[ {"name": "destinationTopic", "expression" : "'routed'", "type": "STRING"}, {"name": "properties.k1", "expression" : "'overwritten'", "type": "STRING"}, {"name": "properties.k2", "expression" : "'new'", "type": "STRING"}]} ]}`
-
Produce an AVRO message with the following payload:
-
AVRO
-
Result
key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>), headers=destinationTopic: out1, properties: {k1:v1}
{key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>), headers=destinationTopic:routed, properties: {k1:overwritten, k2:new}
-
-
The function applies preprocessing to outgoing messages, in this case re-routing the destination topics with the output
{key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>), headers=destinationTopic:routed, properties: {k1:overwritten, k2:new}
.
What’s next?
For more, see Transform Functions or the Pulsar documentation.