Compute

The compute transform function computes field values based on an expression evaluated at runtime.
If the field already exists, it will be overwritten.
The step name is compute and the function’s UserConfig is controlled in this step:

{"steps": [{"type": "compute", "fields":[
                {"name": "key.newKeyField", "expression" : "5*3", "type": "INT32"},
                {"name": "value.valueField", "expression" : "fn:concat(value.valueField, '_suffix')", "type": "STRING"}]}
]}

Parameters:

Parameter Description

fields

An array of JSON objects describing how to calculate the field values. The JSON object represents a field. See Field parameters.

Field parameters

Name (field) Description

name

The name of the field to be computed. Prefix with key. or value. to compute the fields in the key or value parts of the message.
You can also compute values on the following message headers: [destinationTopic, messageKey, properties.].
Please note that properties is a map of key/value pairs that are referenced by the dot notation, for example properties.key0.

expression

Supports the Expression language syntax. It is evaluated at runtime and the result of the evaluation is assigned to the field.

type

The type of the computed field. this will translate to the schema type of the new field in the transformed message. The following types are currently supported [STRING, INT32, INT64, FLOAT, DOUBLE, BOOLEAN, DATE, TIME, DATETIME]. See Type parameters.

optional (default: true)

If true, it marks the field as optional in the schema of the transformed message. This is useful when null is a possible value of the compute expression.

Type parameters

Name (field.type)

Description

Expression Examples

INT32

represents 32-bit integer.

expression1: "2147483647", expression2: "1 + 1"

INT64

represents 64-bit integer.

expression1: "9223372036854775807", expression2: "1 + 1"

FLOAT

represents 32-bit floating point.

expression1: "340282346638528859999999999999999999999.999999", expression2: "1.1 + 1.1"

DOUBLE

represents 64-bit floating point.

expression1: "1.79769313486231570e+308", expression2: "1.1 + 1.1"

BOOLEAN

true or false

expression1: "true", expression2: "1 == 1", expression3: "value.stringField == 'matching string'"

DATE

a date without a time-zone in the RFC3339 format

expression1: "2021-12-03"

TIME

a time without a time-zone in the RFC3339 format

expression1: "20:15:45"

DATETIME

a date-time with an offset from UTC in the RFC3339 format

expression1: "2022-10-02T01:02:03+02:00", expression2: "2019-10-02T01:02:03Z", expression3: "fn:now()"

Expression language

Pulsar transforms use a simple expression language to evaluate conditional steps and compute steps.
The expression language uses dot notation to access field properties or map keys, and supports the following operations and functions:

Operators

The Expression Language supports the following operators:

Category Suppoerted operators

Arithmetic

+, - (binary), *, / and div, % and mod, - (unary)

Logical

and, &&, or ||, not, !

Relational

==, eq, !=, ne, <, lt, >, gt, ⇐, ge, >=, le.

Functions

Utility methods are available under the fn namespace. For example, to calculate the current date, use fn:now().
The expression language supports the following functions:

Name (field) Description

uppercase(input)

Changes the capitalization of a string. If input is not a string, it attempts a string conversion. If the input is null, it returns null.

lowercase(input)

Changes the capitalization of a string. If input is not a string, it attempts a string conversion. If the input is null, it returns null.

contains(input, value)

Returns true if value exists in input. It attempts string conversion on both input and value if either is not a string. If input or value is null, ir returns false.

trim(input)

Returns the input string with all leading and trailing spaces removed. If the input is not a string, it attempts a string conversion.

concat(input1, input2)

Returns a string concatenation of input1 and input2. If either input is null, it is treated as an empty string.

coalesce(value, valueIfNull)

Returns value if it is not null, otherwise returns valueIfNull.

now()

Returns the current epoch millis.

dateadd(input, delta, unit)

Performs date/time arithmetic operations on the input date/time.
input can be either epoch millis or an RFC3339 format like "2022-10-14T10:15:30+01:00"
delta is the amount of unit to add to input. Can be a negative value to perform subtraction. unit is the unit of time to add or subtract. Can be one of [years, months, days, hours, minutes, seconds, millis].

Conditional Steps

Each step accepts an optional when configuration that is evaluated at step execution time against current records (the current step in the transform pipeline).
The when condition supports the <expression language syntax>, which provides access to the record attributes as follows:

Name (field) Description

key:

the key portion of the record in a KeyValue schema.

value:

the value portion of the record in a KeyValue schema, or the message payload itself.

messageKey:

the optional key messages are tagged with (aka. Partition Key).

topicName:

the optional name of the topic which the record originated from (aka. Input Topic).

destinationTopic:

the name of the topic on which the transformed record will be sent (aka. Output Topic).

eventTime:

the optional timestamp attached to the record from its source. For example, the original timestamp attached to the pulsar message.

properties:

the optional user-defined properties attached to record.

You can use the . operator to access top level or nested properties on a schema-full key or value.
For example, key.keyField1 or value.valueFiled1.nestedValueField.

When example

For this KeyValue record:

{
  "key": {
    "compound": {
      "uuid": "uuidValue",
      "timestamp": 1663616014
    },
    "value" : {
      "first" : "f1",
      "last" : "l1",
      "rank" : 1,
      "address" : {
        "zipcode" : "abc-def"
      }
    }
  }}

These statements would evaluate in a when statement:

when statement Evaluates to:

key.compound.uuid == 'uuidValue'

True

key.compound.timestamp <= 10

False

value.first == 'f1' && value.last.toUpperCase() == 'L1'

True

value.rank <= 1 && value.address.substring(0, 3) == 'abc'

True

Multiply and concatenate example

  1. Create a compute transform function with the Pulsar admin CLI:

    ./bin/pulsar-admin functions create \
    --tenant ${TENANT} \
    --namespace ${NAMESPACE} \
    --name transform-function \
    --inputs persistent://${TENANT}/${NAMESPACE}/${INPUT_TOPIC} \
    --output persistent://${TENANT}/${NAMESPACE}/${OUTPUT_TOPIC} \
    --classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \
    --jar functions/pulsar-transformations-2.0.1.nar \
    --transform-function-config '{"steps": [{"type": "compute", "fields":[
                    {"name": "key.newKeyField", "expression" : "5*3", "type": "INT32"},
                    {"name": "value.valueField", "expression" : "fn:concat(value.valueField, '_suffix')", "type": "STRING"}]}
    ]}'
  2. Produce an AVRO message with the payload:

    • AVRO

    • Result

    {key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>)
    {key={keyField: key, newKeyField: 15}, value={valueField: value_suffix}} (KeyValue<AVRO, AVRO>)
  3. The function applies preprocessing to outgoing messages, in this case performing multiplication and concatenation operations to output {key={keyField: key, newKeyField: 15}, value={valueField: value_suffix}} (KeyValue<AVRO, AVRO>) to your output topic.

Message routing example

  1. Create a compute transform function with the Pulsar admin CLI:

    ./bin/pulsar-admin functions create \
    --tenant ${TENANT} \
    --namespace ${NAMESPACE} \
    --name transform-function \
    --inputs persistent://${TENANT}/${NAMESPACE}/${INPUT_TOPIC} \
    --output persistent://${TENANT}/${NAMESPACE}/${OUTPUT_TOPIC} \
    --classname com.datastax.oss.pulsar.functions.transforms.TransformFunction \
    --jar functions/pulsar-transformations-2.0.1.nar \
    --transform-function-config `{"steps": [{"type": "compute", "fields":[
                    {"name": "destinationTopic", "expression" : "'routed'", "type": "STRING"},
                    {"name": "properties.k1", "expression" : "'overwritten'", "type": "STRING"},
                    {"name": "properties.k2", "expression" : "'new'", "type": "STRING"}]}
                 ]}`
  2. Produce an AVRO message with the following payload:

    • AVRO

    • Result

    key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>), headers=destinationTopic: out1, properties: {k1:v1}
    {key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>), headers=destinationTopic:routed, properties: {k1:overwritten, k2:new}
  3. The function applies preprocessing to outgoing messages, in this case re-routing the destination topics with the output {key={keyField: key}, value={valueField: value}} (KeyValue<AVRO, AVRO>), headers=destinationTopic:routed, properties: {k1:overwritten, k2:new}.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com