Reactive Requests
In addition to all return types supported by the OSS mapper annotations, The DSE mapper also supports two reactive return types:
-
MappedReactiveResultSet can be used as a return type for a methods annotated with
@Select
or@Query
. It is a Publisher of mapped entities that also implements ReactiveQueryMetadata. This class is specific to the DSE mapper. - The DSE core driver’s ReactiveResultSet can be used as a return type for methods annotated with
@Insert
,@Update
,@Delete
and@Query
. Returning aReactiveResultSet
from@Insert
,@Update
and@Delete
methods is mostly useful if you need to inspect returned query metadata in a reactive fashion.
Refer to the DSE driver documentation on reactive queries for more information reactive programming support in the DSE driver.
Examples
Consider the following DAO interface:
@Dao
public interface ProductDao {
@Select
MappedReactiveResultSet<Product> findById(UUID productId);
@Insert
ReactiveResultSet saveReactive(Product product);
@Update
ReactiveResultSet updateReactive(Product product);
@Delete
ReactiveResultSet deleteReactive(Product product);
@Query("SELECT * FROM ${qualifiedTableId} WHERE description LIKE :description")
MappedReactiveResultSet<Product> findByDescription(String description);
}
Assuming that you are using Reactor as your reactive library, the above DAO can be used as follows:
ProductDao dao = ...;
UUID toFind = ...;
// execute the SELECT query, expect at most 1 item
Optional<Product> maybeFound =
Mono.from(dao.findById(toFind))
.doOnNext(p -> System.out.println("Found product: " + p))
.blockOptional();
assert maybeFound.isPresent();
Product product = maybeFound.get();
product.setDescription("new description");
// execute the UPDATE query and block until it returns
ReactiveResultSet rs = dao.updateReactive(product);
Mono.from(rs).block();
// inspect query metadata and print all query warnings, if any
Mono.from(rs.getExecutionInfos())
.flatMapIterable(ExecutionInfo::getWarnings)
.doOnNext(warning -> System.err.println("Query produced warning: " + warning))
.blockLast();
// execute the query, print each entity found, then the total number of entites found
Flux.from(dao.findByDescription("new description"))
.doOnNext(p -> System.out.println("Found product: " + p))
.count()
.doOnSuccess(n -> System.out.println(String.format("Found %d products", n)))
.block();
// execute the DELETE query and block until it returns
Mono.from(dao.deleteReactive(product)).block();