Reactive Requests

In addition to all return types supported by the OSS mapper annotations, The DSE mapper also supports two reactive return types:

  1. MappedReactiveResultSet can be used as a return type for a methods annotated with @Select or @Query. It is a Publisher of mapped entities that also implements ReactiveQueryMetadata. This class is specific to the DSE mapper.
  2. The DSE core driver’s ReactiveResultSet can be used as a return type for methods annotated with @Insert, @Update, @Delete and @Query. Returning a ReactiveResultSet from @Insert, @Update and @Delete methods is mostly useful if you need to inspect returned query metadata in a reactive fashion.

Refer to the DSE driver documentation on reactive queries for more information reactive programming support in the DSE driver.

Examples

Consider the following DAO interface:

@Dao
public interface ProductDao {

  @Select
  MappedReactiveResultSet<Product> findById(UUID productId);

  @Insert
  ReactiveResultSet saveReactive(Product product);

  @Update
  ReactiveResultSet updateReactive(Product product);

  @Delete
  ReactiveResultSet deleteReactive(Product product);

  @Query("SELECT * FROM ${qualifiedTableId} WHERE description LIKE :description")
  MappedReactiveResultSet<Product> findByDescription(String description);
}

Assuming that you are using Reactor as your reactive library, the above DAO can be used as follows:

ProductDao dao = ...;
UUID toFind = ...;

// execute the SELECT query, expect at most 1 item
Optional<Product> maybeFound =
    Mono.from(dao.findById(toFind))
        .doOnNext(p -> System.out.println("Found product: " + p))
        .blockOptional();

assert maybeFound.isPresent();

Product product = maybeFound.get();
product.setDescription("new description");

// execute the UPDATE query and block until it returns
ReactiveResultSet rs = dao.updateReactive(product);
Mono.from(rs).block();
// inspect query metadata and print all query warnings, if any
Mono.from(rs.getExecutionInfos())
    .flatMapIterable(ExecutionInfo::getWarnings)
    .doOnNext(warning -> System.err.println("Query produced warning: " + warning))
    .blockLast();

// execute the query, print each entity found, then the total number of entites found
Flux.from(dao.findByDescription("new description"))
    .doOnNext(p -> System.out.println("Found product: " + p))
    .count()
    .doOnSuccess(n -> System.out.println(String.format("Found %d products", n)))
    .block();

// execute the DELETE query and block until it returns
Mono.from(dao.deleteReactive(product)).block();