gRPC Java Client

gRPC Java client setup

Install software

You’ll need to install the following software if it is not already installed:

Add dependencies

To begin, add the required dependency to your project pom.xml:

<dependencies>
  <dependency>
    <groupId>io.stargate.grpc</groupId>
    <artifactId>grpc-proto</artifactId>
    <version>1.0.41</version>
  </dependency>
  <dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-netty-shaded</artifactId>
    <version>1.41.0</version>
  </dependency>
</dependencies>

If you do not add it, you will observe the following error:

No functional channel service provider found.
Try adding a dependency on the grpc-okhttp, grpc-netty, or grpc-netty-shaded artifact.

Java connecting

Authentication

This example assumes that you’re running Stargate locally with the default credentials of cassandra/cassandra. For more information regarding authentication please see the Stargate authentication and authorization docs.

The following code snippet can be used to generate the table-based auth token in the client code:

// Stargate OSS configuration for locally hosted docker image
private static final String STARGATE_USERNAME      = "cassandra";
private static final String STARGATE_PASSWORD      = "cassandra";
private static final String STARGATE_HOST          = "localhost";
private static final int    STARGATE_GRPC_PORT     = 8090;
private static final String STARGATE_AUTH_ENDPOINT = "http://" + STARGATE_HOST+ ":8081/v1/auth";

// Authenticate to get a token (http client jdk11)
String token = getTokenFromAuthEndpoint(STARGATE_USERNAME, STARGATE_PASSWORD);

private static String getTokenFromAuthEndpoint(String username, String password) {
    try {
        HttpRequest request = HttpRequest.newBuilder()
                .GET()
                .uri(URI.create(STARGATE_AUTH_ENDPOINT))
                .setHeader("Content-Type", "application/json")
                .POST(HttpRequest.BodyPublishers.ofString("{"
                        + " \"username\": \"" + STARGATE_USERNAME+ "\",\n"
                        + " \"password\": \"" + STARGATE_PASSWORD + "\"\n"
                        + "}'"))
                .build();
        HttpResponse<String> response = HttpClient.newBuilder().build()
                .send(request, HttpResponse.BodyHandlers.ofString());
        return response.body().split(":")[1].replaceAll("\"", "").replaceAll("}", "");
    } catch(Exception e) {
        throw new IllegalArgumentException(e);
    }
}

Creating a client

To connect to your Stargate instance, create the client. For a local Stargate instance, for instance, the following client code will fetch an auth token with a REST call:

// Stargate OSS configuration for locally hosted docker image
private static final String STARGATE_USERNAME      = "cassandra";
private static final String STARGATE_PASSWORD      = "cassandra";
private static final String STARGATE_HOST          = "localhost";
private static final int    STARGATE_GRPC_PORT     = 8090;
private static final String STARGATE_AUTH_ENDPOINT = "http://" + STARGATE_HOST+ ":8081/v1/auth";

public static void main(String[] args)
throws Exception {

    // Create Grpc Channel
    ManagedChannel channel = ManagedChannelBuilder
            .forAddress(STARGATE_HOST, STARGATE_GRPC_PORT).usePlaintext().build();

    // blocking stub version
    StargateGrpc.StargateBlockingStub blockingStub =
        StargateGrpc.newBlockingStub(channel)
            .withDeadlineAfter(10, TimeUnit.SECONDS)
            .withCallCredentials(new StargateBearerToken(token));

Java querying

A simple query can be performed by passing a CQL query to the client using the ExecuteQuery() function for standard query execution:

QueryOuterClass.Response queryString = blockingStub.executeQuery(QueryOuterClass
        .Query.newBuilder()
        .setCql("SELECT firstname, lastname FROM test.users")
        .build());

Data definition (DDL) queries are supported in the same manner:

blockingStub.executeQuery(
        QueryOuterClass.Query.newBuilder()
            .setCql(""
                + "CREATE KEYSPACE IF NOT EXISTS test "
                + "WITH REPLICATION = {"
                + " 'class' : 'SimpleStrategy', "
                + " 'replication_factor' : 1"
                + "};")
            .build());
System.out.println("Keyspace 'test' has been created.");

blockingStub.executeQuery(
        QueryOuterClass.Query.newBuilder()
            .setCql("CREATE TABLE IF NOT EXISTS test.users (firstname text PRIMARY KEY, lastname text);")
            .build());
System.out.println("Table 'users' has been created.");

In general, users will create a keyspace and table first.

If you would like to use a batch statement, the client also provides an ExecuteBatch() function to execute a batch query:

blockingStub.executeBatch(
        QueryOuterClass.Batch.newBuilder()
            .addQueries(
                QueryOuterClass.BatchQuery.newBuilder()
                    .setCql("INSERT INTO test.users (firstname, lastname) VALUES('Jane', 'Doe')")
                    .build())
            .addQueries(
                QueryOuterClass.BatchQuery.newBuilder()
                    .setCql("INSERT INTO test.users (firstname, lastname) VALUES('Serge', 'Provencio')")
                    .build())
            .build());
System.out.println("2 rows have been inserted in table users.");

The ExecuteQuery function can be used to execute a single query.

This example inserts two values into the keyspace table test.users. Only INSERT, UPDATE, and DELETE operations can be used in a batch query.

Java gRPC processing result set

After executing a query, a response returns rows that match the SELECT statement. If there are no rows, the returned payload is unset. The convenience function getResultSet() is provided to help transform this response into a result set that’s easier to work with.

QueryOuterClass.ResultSet rs = queryString.getResultSet();

for (Row row : rs.getRowsList()) {
    System.out.println(""
            + "FirstName=" + row.getValues(0).getString() + ", "
            + "lastname=" + row.getValues(1).getString());
}

Since the result type is known, the getString function transforms the value into a native string. Additional functions also exist for other types such as int, map, and blob. The full list can be found in Values.java.

Java full sample script

To put all the pieces together, here is a sample script that combines all the pieces shown above:

  • Sample script

  • Result

package com.datastax.tutorial;

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.concurrent.TimeUnit;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.stargate.grpc.StargateBearerToken;
import io.stargate.proto.QueryOuterClass;
import io.stargate.proto.QueryOuterClass.Row;
import io.stargate.proto.StargateGrpc;

public class ConnectStargate {

    // Stargate OSS configuration for locally hosted docker image
    private static final String STARGATE_USERNAME      = "cassandra";
    private static final String STARGATE_PASSWORD      = "cassandra";
    private static final String STARGATE_HOST          = "localhost";
    private static final int    STARGATE_GRPC_PORT     = 8090;
    private static final String STARGATE_AUTH_ENDPOINT = "http://" + STARGATE_HOST+ ":8081/v1/auth";

    public static void main(String[] args)
    throws Exception {

        //-------------------------------------
        // 1. Initializing Connectivity
        //-------------------------------------

        // Authenticate to get a token (http client jdk11)
        String token = getTokenFromAuthEndpoint(STARGATE_USERNAME, STARGATE_PASSWORD);

        // Create Grpc Channel
        ManagedChannel channel = ManagedChannelBuilder
                .forAddress(STARGATE_HOST, STARGATE_GRPC_PORT).usePlaintext().build();

        // blocking stub version
        StargateGrpc.StargateBlockingStub blockingStub =
            StargateGrpc.newBlockingStub(channel)
                .withDeadlineAfter(10, TimeUnit.SECONDS)
                .withCallCredentials(new StargateBearerToken(token));

        //-------------------------------------
        // 2. Create Schema
        //-------------------------------------

        blockingStub.executeQuery(
                QueryOuterClass.Query.newBuilder()
                    .setCql(""
                        + "CREATE KEYSPACE IF NOT EXISTS test "
                        + "WITH REPLICATION = {"
                        + " 'class' : 'SimpleStrategy', "
                        + " 'replication_factor' : 1"
                        + "};")
                    .build());
        System.out.println("Keyspace 'test' has been created.");

        blockingStub.executeQuery(
                QueryOuterClass.Query.newBuilder()
                    .setCql("CREATE TABLE IF NOT EXISTS test.users (firstname text PRIMARY KEY, lastname text);")
                    .build());
        System.out.println("Table 'users' has been created.");

        //-------------------------------------
        // 3. Insert 2 rows with Batch
        //-------------------------------------

        blockingStub.executeBatch(
                QueryOuterClass.Batch.newBuilder()
                    .addQueries(
                        QueryOuterClass.BatchQuery.newBuilder()
                            .setCql("INSERT INTO test.users (firstname, lastname) VALUES('Jane', 'Doe')")
                            .build())
                    .addQueries(
                        QueryOuterClass.BatchQuery.newBuilder()
                            .setCql("INSERT INTO test.users (firstname, lastname) VALUES('Serge', 'Provencio')")
                            .build())
                    .build());
        System.out.println("2 rows have been inserted in table users.");

        //-------------------------------------
        // 4. Retrieving result.
        //-------------------------------------

        QueryOuterClass.Response queryString = blockingStub.executeQuery(QueryOuterClass
                .Query.newBuilder()
                .setCql("SELECT firstname, lastname FROM test.users")
                .build());
        QueryOuterClass.ResultSet rs = queryString.getResultSet();
        for (Row row : rs.getRowsList()) {
            System.out.println(""
                    + "FirstName=" + row.getValues(0).getString() + ", "
                    + "lastname=" + row.getValues(1).getString());
        }

        System.out.println("Everything worked!");
        System.exit(0);
    }

    private static String getTokenFromAuthEndpoint(String username, String password) {
        try {
            HttpRequest request = HttpRequest.newBuilder()
                    .GET()
                    .uri(URI.create(STARGATE_AUTH_ENDPOINT))
                    .setHeader("Content-Type", "application/json")
                    .POST(HttpRequest.BodyPublishers.ofString("{"
                            + " \"username\": \"" + STARGATE_USERNAME+ "\",\n"
                            + " \"password\": \"" + STARGATE_PASSWORD + "\"\n"
                            + "}'"))
                    .build();
            HttpResponse<String> response = HttpClient.newBuilder().build()
                    .send(request, HttpResponse.BodyHandlers.ofString());
            return response.body().split(":")[1].replaceAll("\"", "").replaceAll("}", "");
        } catch(Exception e) {
            throw new IllegalArgumentException(e);
        }
    }

}
Keyspace 'test' has been created.
Table 'users' has been created.
2 rows have been inserted in table users.
FirstName=Serge, lastname=Provencio
FirstName=Jane, lastname=Doe
Everything worked!

Java developing

Generating gRPC code stubs

To see a guide how the Java code is compiled from the proto files see the gRPC setup project dependencies. To update the protobuf files being used, add the new files to the top level proto directory and then run make proto from the root of the project.

More notes on the asynchronous use of the gRPC API

Up to this point, we were using the blocking version of the generated stub. We can also interact with the Stargate API using the async version of the stub. To do so, we need to pass the StreamObserver that will be called asynchronously when the results are available.

Every StreamObserver needs to implement 3 methods: onNext(), onError() and onComplete(). For example:

StreamObserver<QueryOuterClass.Response> streamObserver = new StreamObserver<QueryOuterClass.Response>() {
           @Override
           public void onNext(QueryOuterClass.Response response) {
               try {
                   System.out.println("response:" + response.getResultSet());
               } catch (InvalidProtocolBufferException e) {
                   throw new RuntimeException(e);
               }
           }
           @Override
           public void onError(Throwable throwable) {
               System.out.println("Error: " + throwable);
           }
           @Override
           public void onCompleted() {
               // close resources, finish processing
               System.out.println("completed");
           }
       };

Please note that this is a very simplified version only for demonstration purposes and should not be used on production.

Once we have the Observer, we can pass it to the executeQuery method on the async stub:

stub.executeQuery(QueryOuterClass.Query
  .newBuilder()
  .setCql("SELECT k, v FROM ks.test")
  .build(), streamObserver);

This query will return immediately because it is non-blocking. If your program (or test) is progressing to the end, you may not be able to see the results. Your program may exit before the data arrives. After some time, when the data arrives, the streamObserver will be called.

The output of our program will look like this:

response:columns {
  type {
    basic: VARCHAR
  }
  name: "k"
}
columns {
  type {
    basic: INT
  }
  name: "v"
}
rows {
  values {
    string: "a"
  }
  values {
    int: 1
  }
}

completed

Please note, that at the end we have a completed emitted. This is called by the onCompleted method.

The Stargate gRPC Java Client repository is located at https://github.com/stargate/stargate-grpc-java-client.