Reactive Style Programming

The driver provides built-in support for reactive queries. The DseSession interface extends ReactiveSession, which adds specialized methods to execute requests expressed in reactive streams.

Overview

ReactiveSession exposes two public methods:

ReactiveResultSet executeReactive(String query);
ReactiveResultSet executeReactive(Statement<?> statement);

Both methods return a ReactiveResultSet, which is the reactive streams version of a regular ResultSet. In other words, a ReactiveResultSet is a Publisher for query results.

When subscribing to and consuming from a ReactiveResultSet, there are two important caveats to bear in mind:

  1. By default, all ReactiveResultSet implementations returned by the driver are cold, unicast, single-subscription-only publishers. In other words, they do not support multiple subscribers; consider caching the results produced by such publishers if you need to consume them by more than one downstream subscriber. We provide a few examples of caching further in this document.
  2. Also, note that reactive result sets may emit items to their subscribers on an internal driver IO thread. Subscriber implementors are encouraged to abide by Reactive Streams Specification rule 2.2 and avoid performing heavy computations or blocking calls inside onNext calls, as doing so could slow down the driver and impact performance. Instead, they should asynchronously dispatch received signals to their processing logic.

Basic usage

The examples in this page make usage of Reactor, a popular reactive library, but they should be easily adaptable to any other library implementing the concepts of reactive streams.

Reading in reactive style

The following example reads from a table and prints all the returned rows to the console. In case of error, a DriverException is thrown and its stack trace is printed to standard error:

try (DseSession session = ...) {
      Flux.from(session.executeReactive("SELECT ..."))
          .doOnNext(System.out::println)
          .blockLast();
} catch (DriverException e) {
  e.printStackTrace();
}

Writing in reactive style

The following example inserts rows into a table after printing the queries to the console, stopping at the first error, if any. Again, in case of error, a DriverException is thrown:

try (DseSession session = ...) {
  Flux.just("INSERT ...", "INSERT ...", "INSERT ...", ...)
      .doOnNext(System.out::println)
      .flatMap(session::executeReactive)
      .blockLast();
} catch (DriverException e) {
  e.printStackTrace();
}

Note that when a statement is executed reactively, the actual request is only triggered when the ReactiveResultSet is subscribed to; in other words, when the executeReactive method returns, nothing has been executed yet. This is why the write example above uses a flatMap operator, which takes care of subscribing to each ReactiveResultSet returned by successive calls to session.executeReactive. A common pitfall is to use an operator that silently ignores the returned ReactiveResultSet; for example, the code below seems correct, but will not execute any query:

// DON'T DO THIS
Flux.just("INSERT INTO ...")
     // The returned ReactiveResultSet is not subscribed to
    .doOnNext(session::executeReactive)
    .blockLast();

Accessing query metadata

ReactiveResultSet exposes useful information about request execution and query metadata:

Publisher<? extends ColumnDefinitions> getColumnDefinitions();
Publisher<? extends ExecutionInfo> getExecutionInfos();
Publisher<Boolean> wasApplied(); 

Refer to the javadocs of getColumnDefinitions, getExecutionInfos and wasApplied for more information on these methods.

To inspect the contents of the above publishers, simply subscribe to them. Note that these publishers cannot complete before the query itself completes; if the query fails, then these publishers will fail with the same error.

The following example executes a query, then prints all the available metadata to the console:

ReactiveResultSet rs = session.executeReactive("SELECT ...");
// execute the query first
Flux.from(rs).blockLast();
// then retrieve query metadata
System.out.println("Column definitions: ");
Mono.from(rs.getColumnDefinitions()).doOnNext(System.out::println).block();
System.out.println("Execution infos: ");
Flux.from(rs.getExecutionInfos()).doOnNext(System.out::println).blockLast();
System.out.println("Was applied: ");
Mono.from(rs.wasApplied()).doOnNext(System.out::println).block();

Note that it is also possible to inspect query metadata at row level. Each row returned by a reactive query execution implements ReactiveRow, the reactive equivalent of a Row.

ReactiveRow exposes the same kind of query metadata and execution info found in ReactiveResultSet, but for each individual row:

ColumnDefinitions getColumnDefinitions();
ExecutionInfo getExecutionInfo();
boolean wasApplied();

Refer to the javadocs of getColumnDefinitions, getExecutionInfo and wasApplied for more information on these methods.

The following example executes a query and, for each row returned, prints the coordinator that served that row, then retrieves all the coordinators that were contacted to fulfill the query and prints them to the console:

Iterable<Node> coordinators = Flux.from(session.executeReactive("SELECT ..."))
    .doOnNext(
        row ->
            System.out.printf(
                "Row %s was obtained from coordinator %s%n",
                row,
                row.getExecutionInfo().getCoordinator()))
    .map(ReactiveRow::getExecutionInfo)
    // dedup by coordinator (note: this is dangerous on a large result set)
    .groupBy(ExecutionInfo::getCoordinator)
    .map(GroupedFlux::key)
    .toIterable();
System.out.println("Contacted coordinators: " + coordinators);

Advanced topics

Caching query results

As stated above, a ReactiveResultSet can only be subscribed once. This is an intentional design decision, because otherwise users could inadvertently trigger a spurious execution of the same query again when subscribing for the second time to the same ReactiveResultSet.

Let’s suppose that we want to compute both the average and the sum of all values from a table column. The most naive approach would be to create two flows and subscribe to both:

// DON'T DO THIS
ReactiveResultSet rs = session.executeReactive("SELECT n FROM ...");
double avg = Flux.from(rs)
    .map(row -> row.getLong(0))
    .reduce(0d, (a, b) -> (a + b / 2.0))
    .block();
// will fail with IllegalStateException
long sum = Flux.from(rs)
    .map(row -> row.getLong(0))
    .reduce(0L, (a, b) -> a + b)
    .block();

Unfortunately, the second Flux above with terminate immediately with an onError signal encapsulating an IllegalStateException, since rs was already subscribed to.

To circumvent this limitation, while still avoiding to query the table twice, the easiest technique consists in using the cache operator that most reactive libraries offer:

Flux<Long> rs = Flux.from(session.executeReactive("SELECT n FROM ..."))
    .map(row -> row.getLong(0))
    .cache();
double avg = rs
    .reduce(0d, (a, b) -> (a + b / 2.0))
    .block();
long sum = rs
    .reduce(0L, (a, b) -> a + b)
    .block();

The above example works just fine.

The cache operator will subscribe at most once to the ReactiveResultSet, cache the results, and serve the cached results to downstream subscribers. This is obviously only possible if your result set is small and can fit entirely in memory.

If caching is not an option, most reactive libraries also offer operators that multicast their upstream subscription to many subscribers on the fly.

The above example could be rewritten with a different approach as follows:

Flux<Long> rs = Flux.from(session.executeReactive("SELECT n FROM ..."))
    .map(row -> row.getLong(0))
    .publish()       // multicast upstream to all downstream subscribers
    .autoConnect(2); // wait until two subscribers subscribe
long sum = rs
    .reduce(0L, (a, b) -> a + b)
    .block();
double avg = rs
    .reduce(0d, (a, b) -> (a + b / 2.0))
    .block();

In the above example, the publish operator multicasts every onNext signal to all of its subscribers; and the autoConnect(2) operator instructs publish to wait until it gets 2 subscriptions before subscribing to its upstream source (and triggering the actual query execution).

This approach should be the preferred one for large result sets since it does not involve caching results in memory.

Resuming from and retrying after failed queries

When executing a flow of statements, any failed query execution would trigger an onError signal and terminate the subscription immediately, potentially preventing subsequent queries from being executed at all.

If this behavior is not desired, it is possible to mimic the behavior of a fail-safe system. This usually involves the usage of operators such as onErrorReturn or onErrorResume. Consult your reactive library documentation to find out which operators allow you to intercept failures.

The following example executes a flow of statements; for each failed execution, the stack trace is printed to standard error and, thanks to the onErrorResume operator, the error is completely ignored and the flow execution resumes normally:

Flux<Statement<?>> stmts = ...;
stmts.flatMap(
    statement ->
        Flux.from(session.executeReactive(statement))
            .doOnError(Throwable::printStackTrace)
            .onErrorResume(error -> Mono.empty()))
    .blockLast();

The following example expands on the previous one: for each failed execution, at most 3 retries are attempted if the error was an UnavailableException, then, if the query wasn’t successful after retrying, a message is logged. Finally, all the errors are collected and the total number of failed queries is printed to the console:

Flux<Statement<?>> statements = ...;
long failed = statements.flatMap(
    stmt ->
        Flux.from(session.executeReactive(stmt))
            // retry at most 3 times on Unavailable
            .retry(3, UnavailableException.class::isInstance)
            // handle errors
            .doOnError(
                error -> {
                  System.err.println("Statement failed: " + stmt);
                  error.printStackTrace();
                })
            // Collect errors and discard all returned rows
            .ignoreElements()
            .cast(Long.class)
            .onErrorReturn(1L))
    .sum()
    .block();
System.out.println("Total failed queries: " + failed);

Note that the driver already has a built-in retry mechanism that can transparently retry failed queries; the above example should be seen as a demonstration of application-level retries, when a more fine-grained control of what should be retried, and how, is required.