Custom Payloads

The native protocol version 4 introduces a new feature called Custom Payloads.

According to the protocol V4 specification, custom payloads are generic key-value maps where keys are strings and each value is an arbitrary sequence of bytes. Currently payloads can be sent by clients along with all QUERY, PREPARE, EXECUTE and BATCH requests.

Custom payloads can be used to convey user-specific information from clients to servers and vice versa. Please note that this is an advanced feature that requires specific server-side configuration (see below).

Enabling custom payloads on C* nodes

What the server is supposed to do with a payload depends on the QueryHandler used server-side to decode requests. Unsurprisingly, the default QueryHandler implementation used by Cassandra simply ignores all payloads. However, users can replace it with their own implementation, thus being able to to process user-specific payloads; the payload format as well as the serialization and deserialization logic is entirely left for clients to implement. Needless to say, the same encoding and decoding logic should be applied on both client and server endpoints.

This section is a small how-to to help driver users getting started with custom payload processing server-side. For more detailed information, please refer to the Cassandra documentation itself.

To enable a custom QueryHandler for a Cassandra node, its JVM should be started with the system property cassandra.custom_query_handler_class set to a fully qualified name of a class implementing org.apache.cassandra.cql3.QueryHandler. This class should be of course available on the node’s classpath.

This can be achieved with the following steps:

  1. Add your implementation jar to the CLASSPATH environment variable;
  2. Add the following to $CASSANDRA_CONF/cassandra-env.sh:
JVM_EXTRA_OPTS="-Dcassandra.custom_query_handler_class=fully.qualified.name.of.MyQueryHandler"

Of course, all nodes in the cluster must be started with the same QueryHandler, otherwise payloads sent by the driver could get lost.

Implementation Notes

Payloads in the Java driver are represented as Map<String,ByteBuffer> instances. It is safe to use any Map implementation, including unsynchronized implementations such as java.util.HashMap; the driver will create defensive, thread-safe copies of user-supplied maps. However, ByteBuffer instances are inherently mutable, so callers should take care not to modify the ByteBuffer values in a payload map after submitting it to the driver as it could lead to unexpected results.

Null values

Note that, for thread safety reasons, the Java driver does not permit null keys nor null values in a payload map; including a null in your payload will result in a NullPointerException being immediately thrown.

However, the protocol specification does allow null values. If you need to include a null value in your payload map, this can be achieved with the Java driver by using the special value Statement.NULL_PAYLOAD_VALUE.

Payload length limitations
  1. Payload maps cannot have more than 65,535 entries.
  2. Payload map values are limited to a maximum length of Integer.MAX_VALUE (231 − 1) bytes each.

Sending custom payloads

A custom payload can be attached to any Statement via Statement.setOutgoingPayload().

Once the payload is set, you can either prepare the statement or execute it; in both cases, the payload will be sent along with the PREPARE or QUERY request respectively:

Statement statement = new SimpleStatement("SELECT foo FROM bar WHERE qix = 1");
statement.setOutgoingPayload(myCustomPayload);
session.execute(statement); // myCustomPayload will be sent with QUERY request
session.prepare(statement); // myCustomPayload will be sent with PREPARE request

Naturally, you can also set a payload when using the QueryBuilder API:

Statement statement = QueryBuilder.select("c2").from("t1").where(eq("c1", 1)).setOutgoingPayload(myCustomPayload);
session.execute(statement); // myCustomPayload will be sent with EXECUTE request

When sending payloads with batches, please note that payloads must be attached to the BatchStatement itself, not to internal statements:

Statement statement = new SimpleStatement("INSERT INTO t1 (c1, c2) values ('foo', 'bar')")
// correct
Statement batch = new BatchStatement().add(statement);
batch.setOutgoingPayload(myCustomPayload);
session.execute(batch);  // myCustomPayload will be sent with BATCH request
// wrong
statement.setOutgoingPayload(myCustomPayload);
Statement batch = new BatchStatement().add(statement);
session.execute(batch); // myCustomPayload will NOT be sent with BATCH request

One important note about paging requests: if your query needs to paginate, any outgoing payload set on the executed statement will be transparently sent over and over for every new page request.

Statement statement = new SimpleStatement("...");
statement.setOutgoingPayload(myCustomPayload);
ResultSet rows = session.execute(...); // myCustomPayload will be sent with first QUERY request
for (Row row : rows) {
    // if additional QUERY requests are sent, all of them will send the same payload
}

And finally, note that custom payloads can only be used with protocol versions >= 4; trying to set a payload under lower protocol versions will result in an UnsupportedFeatureException (wrapped in a NoHostAvailableException) when the request is encoded.

If you want to defensively protect your code against these errors, you can either:

1) Force the protocol version when creating your Cluster instance:

Cluster cluster = Cluster.builder()
    .addContactPoint("...")
    .withProtocolVersion(ProtocolVersion.V4)
    .build();

2) Inspect the current negotiated protocol version, and only send payloads if version is equal to or greater than 4:

ProtocolVersion myCurrentVersion = cluster.getConfiguration()
    .getProtocolOptions()
    .getProtocolVersion();
if (myCurrentVersion.compareTo(ProtocolVersion.V4) >= 0) {
    // only send custom payloads if current protocol version supports it
    statement.setOutgoingPayload(myCustomPayload);
}

Receiving custom payloads

Custom payloads sent back by the server can be retrieved in the following ways:

1) From a ResultSet: use the ExecutionInfo class to retrieve the payload sent by the server.

Statement statement = new SimpleStatement("...");
ResultSet rows = session.execute(...);
// last payload sent by the server
Map<String,ByteBuffer> serverPayload = rows.getExecutionInfo().getIncomingPayload();

If your query required pagination (multiple QUERY requests), the above method would only return the last payload sent by server with its last RESULT response. If you need to retrieve all the payloads for all RESULT responses, use the following method instead:

Statement statement = new SimpleStatement("...");
ResultSet rows = session.execute(...);
//all payloads sent by the server
for (ExecutionInfo info : rows.getAllExecutionInfo()) {
    Map<String,ByteBuffer> serverPayload = info.getIncomingPayload();
}

2) From a PreparedStatement: to retrieve the payload that the server sent back with its PREPARED response, use the following method:

PreparedStatement ps = session.prepare(...);
Map<String,ByteBuffer> serverPayload = ps.getIncomingPayload();

Custom Payloads and Prepared Statements

If you use either Session.prepare(RegularStatement) or Session.prepareAsync(RegularStatement) to prepare a statement, as with all other properties in the given statement, its outgoing payload, if any, will become the default outgoing payload for the prepared statement.

Statement statement = new SimpleStatement("...");
statement.setOutgoingPayload(myCustomPayload)
PreparedStatement ps = session.prepare(...); // myCustomPayload will be sent with PREPARE request 
                                             // AND become the default outgoing payload

Naturally, you can also set a default outgoing payload on the PreparedStatement instance itself. The code below is roughly equivalent to the one above - except that the payload is not sent with the PREPARE request:

Statement statement = new SimpleStatement("...");
PreparedStatement ps = session.prepare(...); // no payload sent with PREPARE request
ps.setOutgoingPayload(myCustomPayload) // default outgoing payload is now myCustomPayload

But PreparedStatement has also a getIncomingPayload() method, which, as we already know, can be used to retrieve incoming payloads sent back by the server that prepared the request.

When binding a statement - with either PreparedStatement.bind() or PreparedStatement.bind(Object...)) - BoundStatement instances will also inherit payloads.

To determine which payload is going to be used as outgoing payload for a bound statement, the following rules apply:

  1. If the prepared statement has a non-null outgoing payload, all instances of BoundStatement created out of it will inherit that payload as their outgoing payload;
  2. Otherwise, if the prepared statement has a non-null incoming payload, all instances of BoundStatement created out of it will inherit that payload as their outgoing payload;
  3. Otherwise, all instances of BoundStatement created out of it won’t have any outgoing payload set by default; users can however set an outgoing payload on individual BoundStatement instances by calling their setOutgoingPayload() method, like in the example below:
Statement statement = new SimpleStatement("...");
PreparedStatement ps = session.prepare(...); // no payload sent with PREPARE request
BoundStatement bs = ps.bind(1); // no outgoing payload by default
bs.setOutgoingPayload(myCustomPayload); // outgoing payload for this specific statement
session.execute(statement); // myCustomPayload will be sent with EXECUTE request for this bound statement only

Debugging custom payloads

When debugging custom payloads, set the com.datastax.driver.core.Message logger to the TRACE level, e.g. with Log4J:

<logger name="com.datastax.driver.core.Message">
  <level value="TRACE"/>
</logger>

This will log a message for every request and every response that contains a non-null payload. The log message contains a pretty-printed version of the payload itself, and its total length in bytes.