Reactive Style Programming
The driver provides built-in support for reactive queries. The CqlSession interface extends ReactiveSession, which adds specialized methods to execute requests expressed in reactive streams.
Notes:
- reactive capabilities require the Reactive Streams API to be present on the classpath. The driver has a dependency on that library, but if your application does not use reactive queries at all, it is possible to exclude it to minimize the number of runtime dependencies. If the library cannot be found at runtime, reactive queries won’t be available, and a warning will be logged, but the driver will otherwise operate normally (this is also valid for OSGi deployments).
- for historical reasons, reactive-related driver types reside in a package prefixed with
dse
; however, reactive queries also work with regular Cassandra.
Overview
ReactiveSession
exposes two public methods:
ReactiveResultSet executeReactive(String query);
ReactiveResultSet executeReactive(Statement<?> statement);
Both methods return a ReactiveResultSet, which is the reactive streams version of a regular
ResultSet. In other words, a ReactiveResultSet
is a Publisher for query results.
When subscribing to and consuming from a ReactiveResultSet
, there are two important caveats to
bear in mind:
- By default, all
ReactiveResultSet
implementations returned by the driver are cold, unicast, single-subscription-only publishers. In other words, they do not support multiple subscribers; consider caching the results produced by such publishers if you need to consume them by more than one downstream subscriber. We provide a few examples of caching further in this document. - Also, note that reactive result sets may emit items to their subscribers on an internal driver IO
thread. Subscriber implementors are encouraged to abide by Reactive Streams Specification rule
2.2 and avoid performing heavy computations or blocking calls inside
onNext
calls, as doing so could slow down the driver and impact performance. Instead, they should asynchronously dispatch received signals to their processing logic.
Basic usage
The examples in this page make usage of Reactor, a popular reactive library, but they should be easily adaptable to any other library implementing the concepts of reactive streams.
Reading in reactive style
The following example reads from a table and prints all the returned rows to the console. In case of
error, a DriverException
is thrown and its stack trace is printed to standard error:
try (CqlSession session = ...) {
Flux.from(session.executeReactive("SELECT ..."))
.doOnNext(System.out::println)
.blockLast();
} catch (DriverException e) {
e.printStackTrace();
}
Writing in reactive style
The following example inserts rows into a table after printing the queries to the console, stopping
at the first error, if any. Again, in case of error, a DriverException
is thrown:
try (CqlSession session = ...) {
Flux.just("INSERT ...", "INSERT ...", "INSERT ...", ...)
.doOnNext(System.out::println)
.flatMap(session::executeReactive)
.blockLast();
} catch (DriverException e) {
e.printStackTrace();
}
Note that when a statement is executed reactively, the actual request is only triggered when the
ReactiveResultSet
is subscribed to; in other words, when the executeReactive
method returns,
nothing has been executed yet. This is why the write example above uses a flatMap
operator,
which takes care of subscribing to each ReactiveResultSet
returned by successive calls to
session.executeReactive
. A common pitfall is to use an operator that silently ignores the returned
ReactiveResultSet
; for example, the code below seems correct, but will not execute any query:
// DON'T DO THIS
Flux.just("INSERT INTO ...")
// The returned ReactiveResultSet is not subscribed to
.doOnNext(session::executeReactive)
.blockLast();
Since a write query does not return any rows, it may appear difficult to count the number of rows written to the database. Hopefully most reactive libraries have operators that are useful in these scenarios. The following example demonstrates how to achieve this goal with Reactor:
Flux<Statement<?>> stmts = ...;
long count =
stmts
.flatMap(
stmt ->
Flux.from(session.executeReactive(stmt))
// dummy cast, since result sets are always empty for write queries
.cast(Integer.class)
// flow will always be empty, so '1' will be emitted for each query
.defaultIfEmpty(1))
.count()
.block();
System.out.printf("Executed %d write statements%n", count);
Accessing query metadata
ReactiveResultSet
exposes useful information about request execution and query metadata:
Publisher<? extends ColumnDefinitions> getColumnDefinitions();
Publisher<? extends ExecutionInfo> getExecutionInfos();
Publisher<Boolean> wasApplied();
Refer to the javadocs of getColumnDefinitions, getExecutionInfos and wasApplied for more information on these methods.
To inspect the contents of the above publishers, simply subscribe to them. Note that these publishers cannot complete before the query itself completes; if the query fails, then these publishers will fail with the same error.
The following example executes a query, then prints all the available metadata to the console:
ReactiveResultSet rs = session.executeReactive("SELECT ...");
// execute the query first
Flux.from(rs).blockLast();
// then retrieve query metadata
System.out.println("Column definitions: ");
Mono.from(rs.getColumnDefinitions()).doOnNext(System.out::println).block();
System.out.println("Execution infos: ");
Flux.from(rs.getExecutionInfos()).doOnNext(System.out::println).blockLast();
System.out.println("Was applied: ");
Mono.from(rs.wasApplied()).doOnNext(System.out::println).block();
Note that it is also possible to inspect query metadata at row level. Each row returned by a
reactive query execution implements ReactiveRow
, the reactive equivalent of a
Row
.
ReactiveRow
exposes the same kind of query metadata and execution info found in
ReactiveResultSet
, but for each individual row:
ColumnDefinitions getColumnDefinitions();
ExecutionInfo getExecutionInfo();
boolean wasApplied();
Refer to the javadocs of getColumnDefinitions
,
getExecutionInfo
and wasApplied
for
more information on these methods.
The following example executes a query and, for each row returned, prints the coordinator that served that row, then retrieves all the coordinators that were contacted to fulfill the query and prints them to the console:
Iterable<Node> coordinators = Flux.from(session.executeReactive("SELECT ..."))
.doOnNext(
row ->
System.out.printf(
"Row %s was obtained from coordinator %s%n",
row,
row.getExecutionInfo().getCoordinator()))
.map(ReactiveRow::getExecutionInfo)
// dedup by coordinator (note: this is dangerous on a large result set)
.groupBy(ExecutionInfo::getCoordinator)
.map(GroupedFlux::key)
.toIterable();
System.out.println("Contacted coordinators: " + coordinators);
Advanced topics
Applying backpressure
One of the key features of reactive programming is backpressure.
Unfortunately, the Cassandra native protocol does not offer proper support for exchanging backpressure information between client and server over the network. Cassandra is able, since version 3.10, to throttle clients but at the time of writing, there is no proper client-facing backpressure mechanism available.
When reading from Cassandra, this shouldn’t however be a problem for most applications. Indeed, in a
read scenario, Cassandra acts as a producer, and the driver is a consumer; in such a setup, if a
downstream subscriber is not able to cope with the throughput, the driver would progressively adjust
the rate at which it requests more pages from the server, thus effectively regulating the server
throughput to match the subscriber’s. The only caveat is if the subscriber is really too slow, which
could eventually trigger a query timeout, be it on the client side (DriverTimeoutException
), or on
the server side (ReadTimeoutException
).
When writing to Cassandra, the lack of backpressure communication between client and server is more
problematic. Indeed in a write scenario, the driver acts as a producer, and Cassandra is a consumer;
in such a setup, if an upstream producer generates too much data, the driver would blindly send the
write statements to the server as quickly as possible, eventually causing the cluster to become
overloaded or even crash. This usually manifests itself with errors like WriteTimeoutException
, or
OverloadedException
.
It is strongly advised for users to limit the concurrency at which write statements are executed in
write-intensive scenarios. A simple way to achieve this is to use the flatMap
operator, which, in
most reactive libraries, has an overloaded form that takes a parameter that controls the desired
amount of concurrency. The following example executes a flow of statements with a maximum
concurrency of 10, leveraging the concurrency
parameter of Reactor’s flatMap
operator:
Flux<Statement<?>> stmts = ...;
stmts.flatMap(session::executeReactive, 10).blockLast();
In the example above, the flatMap
operator will subscribe to at most 10 ReactiveResultSet
instances simultaneously, effectively limiting the number of concurrent in-flight requests to 10.
This is usually enough to prevent data from being written too fast. More sophisticated operators are
capable of rate-limiting or throttling the execution of a flow; for example, Reactor offers a
delayElements
operator that rate-limits the throughput of its upstream publisher. Consult the
documentation of your reactive library for more information.
As a last resort, it is also possible to limit concurrency at driver level, for example using the driver’s built-in request throttling mechanism, although this is usually not required in reactive applications. See “Managing concurrency in asynchronous query execution” in the Developer Guide for a few examples.
Caching query results
As stated above, a ReactiveResultSet
can only be subscribed once. This is an intentional design
decision, because otherwise users could inadvertently trigger a spurious execution of the same query
again when subscribing for the second time to the same ReactiveResultSet
.
Let’s suppose that we want to compute both the average and the sum of all values from a table column. The most naive approach would be to create two flows and subscribe to both:
// DON'T DO THIS
ReactiveResultSet rs = session.executeReactive("SELECT n FROM ...");
double avg = Flux.from(rs)
.map(row -> row.getLong(0))
.reduce(0d, (a, b) -> (a + b / 2.0))
.block();
// will fail with IllegalStateException
long sum = Flux.from(rs)
.map(row -> row.getLong(0))
.reduce(0L, (a, b) -> a + b)
.block();
Unfortunately, the second Flux
above with terminate immediately with an onError
signal
encapsulating an IllegalStateException
, since rs
was already subscribed to.
To circumvent this limitation, while still avoiding to query the table twice, the easiest technique
consists in using the cache
operator that most reactive libraries offer:
Flux<Long> rs = Flux.from(session.executeReactive("SELECT n FROM ..."))
.map(row -> row.getLong(0))
.cache();
double avg = rs
.reduce(0d, (a, b) -> (a + b / 2.0))
.block();
long sum = rs
.reduce(0L, (a, b) -> a + b)
.block();
The above example works just fine.
The cache
operator will subscribe at most once to the ReactiveResultSet
, cache the results, and
serve the cached results to downstream subscribers. This is obviously only possible if your result
set is small and can fit entirely in memory.
If caching is not an option, most reactive libraries also offer operators that multicast their upstream subscription to many subscribers on the fly.
The above example could be rewritten with a different approach as follows:
Flux<Long> rs = Flux.from(session.executeReactive("SELECT n FROM ..."))
.map(row -> row.getLong(0))
.publish() // multicast upstream to all downstream subscribers
.autoConnect(2); // wait until two subscribers subscribe
long sum = rs
.reduce(0L, (a, b) -> a + b)
.block();
double avg = rs
.reduce(0d, (a, b) -> (a + b / 2.0))
.block();
In the above example, the publish
operator multicasts every onNext
signal to all of its
subscribers; and the autoConnect(2)
operator instructs publish
to wait until it gets 2
subscriptions before subscribing to its upstream source (and triggering the actual query execution).
This approach should be the preferred one for large result sets since it does not involve caching results in memory.
Resuming from and retrying after failed queries
When executing a flow of statements, any failed query execution would trigger an onError
signal
and terminate the subscription immediately, potentially preventing subsequent queries from being
executed at all.
If this behavior is not desired, it is possible to mimic the behavior of a fail-safe system. This
usually involves the usage of operators such as onErrorReturn
or onErrorResume
. Consult your
reactive library documentation to find out which operators allow you to intercept failures.
The following example executes a flow of statements; for each failed execution, the stack trace is
printed to standard error and, thanks to the onErrorResume
operator, the error is completely
ignored and the flow execution resumes normally:
Flux<Statement<?>> stmts = ...;
stmts.flatMap(
statement ->
Flux.from(session.executeReactive(statement))
.doOnError(Throwable::printStackTrace)
.onErrorResume(error -> Mono.empty()))
.blockLast();
The following example expands on the previous one: for each failed execution, at most 3 retries are
attempted if the error was an UnavailableException
, then, if the query wasn’t successful after
retrying, a message is logged. Finally, all the errors are collected and the total number of failed
queries is printed to the console:
Flux<Statement<?>> statements = ...;
long failed = statements.flatMap(
stmt ->
Flux.defer(() -> session.executeReactive(stmt))
// retry at most 3 times on Unavailable
.retry(3, UnavailableException.class::isInstance)
// handle errors
.doOnError(
error -> {
System.err.println("Statement failed: " + stmt);
error.printStackTrace();
})
// Collect errors and discard all returned rows
.ignoreElements()
.cast(Long.class)
.onErrorReturn(1L))
.sum()
.block();
System.out.println("Total failed queries: " + failed);
The example above uses Flux.defer()
to wrap the call to session.executeReactive()
. This is
required because, as mentioned above, the driver always creates single-subscription-only publishers.
Such publishers are not compatible with operators like retry
because these operators sometimes
subscribe more than once to the upstream publisher, thus causing the driver to throw an exception.
Hopefully it’s easy to solve this issue, and that’s exactly what the defer
operator is designed
for: each subscription to the defer
operator triggers a distinct call to
session.executeReactive()
, thus causing the session to re-execute the query and return a brand-new
publisher at every retry.
Note that the driver already has a built-in retry mechanism that can transparently retry failed queries; the above example should be seen as a demonstration of application-level retries, when a more fine-grained control of what should be retried, and how, is required.