Integrate Apache Flink® with Astra DB Serverless

query_builder 15 min

You can use an Astra DB Serverless database as a sink for results computed by Apache Flink®. This is handled through the Apache Cassandra® Java driver and Flink’s CassandraSink connector.

Prerequisites

  • Java 11 or later

  • Maven or Gradle

  • An Apache Flink deployment

  • A Java project where you want to use an Astra sink with Flink

Prepare Astra credentials

  1. Create an Astra DB Serverless database where you want to store the results of your Flink jobs.

  2. Generate an application token with a role that can read and write to your database, such as the Database Administrator role.

  3. Download your database’s Secure Connect Bundle (SCB), and store in a location where it can be accessed by your Java project, such as in app/src/main/resources.

  4. In app/src/main/resources, create an app.properties file with the following contents:

    astra.clientid=token
    astra.secret=APPLICATION_TOKEN
    astra.scb=PATH/TO/SCB.zip

    Replace the following:

    • APPLICATION_TOKEN: A secure reference to your Astra application token.

    • PATH/TO/SCB.zip: The absolute or relative path to your database’s SCB zip file.

Example script with an Astra sink

Once you set the Astra connection properties, you can use the CassandraSink connector with your Flink jobs.

The following code is for example purposes only. It isn’t working code, and it doesn’t represent a complete Java project.

The portion of the code that defines the Flink job (runFlink()) is a combination of the Cassandra sink tuple streaming example in the Flink documentation and the SocketWindowWordCount example in the Flink GitHub repository.

package com.datastax.flink.astra;

// Note the driver and connector.cassandra imports
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PlainTextAuthProvider;
import com.datastax.driver.core.Session;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
import org.apache.flink.util.Collector;

import java.util.Properties;

public class App {

    // Astra credentials imported from app.properties
    private static final String ASTRA_SCB = "astra.scb";
    private static final String ASTRA_CLIENTID = "astra.clientid";
    private static final String ASTRA_SECRET = "astra.secret";

    private static Cluster buildCluster(Cluster.Builder builder, Properties props) {

        return builder
                .withCloudSecureConnectBundle(ClassLoader.getSystemClassLoader().getResourceAsStream(props.getProperty(ASTRA_SCB)))
                .withAuthProvider(
                        new PlainTextAuthProvider(
                                props.getProperty(ASTRA_CLIENTID),
                                props.getProperty(ASTRA_SECRET)))
                .build();
    }

    private static void setupSchema(Properties props) {

        // This code uses a keyspace named 'example' and a table named 'wordcount'
        Cluster cluster = buildCluster(Cluster.builder(), props);
        Session session = cluster.connect();
        session.execute("drop table if exists example.wordcount");
        session.execute("CREATE TABLE IF NOT EXISTS example.wordcount (\n" +
                "word text,\n" +
                "count bigint,\n" +
                "PRIMARY KEY(word))\n");
        session.close();
        cluster.close();
    }

    private static void runFlink(Properties props)
    throws Exception {

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> text = env.fromElements("the quick brown fox", "jumped over", "the lazy dog", "foxes are just", "lazier than dogs", "or at least our dog");

        DataStream<Tuple2<String, Long>> result = text
                .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
                        // normalize and split the line
                        String[] words = value.toLowerCase().split("\\s");

                        // emit the pairs
                        for (String word : words) {
                            //Do not accept empty word, since word is defined as primary key in C* table
                            if (!word.isEmpty()) {
                                out.collect(new Tuple2<String, Long>(word, 1L));
                            }
                        }
                    }
                })
                .keyBy(value -> value.f0)
                // Omit the following method call while testing the connection.
                // Including this time window causes no data to be recorded,
                // and time windowing isn't relevant to demonstrating connectivity.
                //.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum(1);

        CassandraSink.addSink(result)
                .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
                // ClusterBuilder is an abstract class rather than interface so you can't swap in a closure here.
                // You must use the anonymous inner class or another equivalent.
                .setClusterBuilder(new ClusterBuilder() {
                    @Override
                    protected Cluster buildCluster(Cluster.Builder builder) {
                        return App.buildCluster(builder, props);
                    }
                })
                .setFailureHandler((failure) -> { log.error("Exception in C* ops", failure); })
                .build();

        result.print().setParallelism(1);

        env.execute("Flink Astra Test");
    }

    public static void main(String[] args) throws Exception {

        // This example loads the Astra credentials from a properties file.
        Properties props = new Properties();
        props.load(ClassLoader.getSystemClassLoader().getResourceAsStream("app.properties"));

        setupSchema(props);
        runFlink(props);
    }
}

Test the connection

  1. Build and run your project.

  2. Generate some activity to trigger your Flink job.

  3. Use cqlsh to verify that data is being written to your database:

    SELECT * FROM KEYSPACE_NAME.TABLE_NAME;

    Replace KEYSPACE_NAME and TABLE_NAME with the keyspace and table you used in your script.

Next steps

This guide used simplified code to demonstrate how to configure a Flink job that interacts with Astra. There are many ways to optimize this code or modify it for your use cases. For example, Flink’s CassandraSink opens a new Session on each open() call even though these Session objects are thread-safe. A more robust implementation would minimize the number of open sessions for multiple operations on the same JVM. For more information, see Performance tuning for Flink.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax, an IBM Company | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com