Integrate Google Dataflow with Astra DB Serverless
Google Dataflow is a managed service for batch and streaming data processing pipelines based on Apache Beam.
Prerequisites
-
Create an Astra account.
-
Create a Serverless (Vector) database.
-
Create an application token with the Database Administrator role.
-
Download your database’s Secure Connect Bundle (SCB).
-
Create an OpenAI API key.
-
Install the Astra CLI.
-
Install Git, Maven, and Java 11 or later.
Build a sample project
Clone the astra-dataflow-starter
sample repository, and then build the project with Maven.
-
Clone the repository containing the sample flows:
git clone https://github.com/DataStax-Examples/astra-dataflow-starter.git
-
Change to the repo directory, and then build the project with Maven:
cd astra-dataflow-starter mvn clean install -D maven.test.skip=true
The sample repository contains two modules with two flows each:
-
samples-beam contains flows executed with Apache Beam runners. Run these flows to test the integration between Astra DB and Apache Beam.
-
samples-dataflow contains flows executed with Google Dataflow. Run these flows to test the integration between Astra DB and Google Dataflow.
Beam samples
In the sample repo, the samples-beam
directory contains flows that you can run with Apache Beam runners in a local environment.
Set up the local environment
-
Change to the
samples-beam
directory:cd samples-beam pwd
-
In the Astra Portal, get your database’s name, target keyspace, and application token.
-
Download the SCB for your database.
You can use the Astra CLI to download the SCB:
astra db download-scb DATABASE_ID -f /tmp/secure-connect-bundle-DATABASE_NAME.zip
Replace
DATABASE_ID
with your database’s UUID, andDATABASE_NAME
with your database’s name. -
Set the following environment variables:
-
ASTRA_DB
: Database name. -
ASTRA_DB_KEYSPACE
: A keyspace in your database. The default keyspace for Serverless (Vector) databases isdefault_keyspace
. -
ASTRA_SCB_PATH
: The path to the SCB zip file. -
ASTRA_TOKEN
: Your application token.export ASTRA_DB=DATABASE_NAME export ASTRA_DB_KEYSPACE=default_keyspace export ASTRA_SCB_PATH=/tmp/secure-connect-bundle-db-demo.zip export ASTRA_TOKEN=AstraCS:...
-
Run Beam flows
After you set your environment variables, you can run the following flows:
-
Import a CSV File
-
Export a CQL table as CSV
-
Import a Cassandra table into Astra DB
-
Generate embeddings
This flow parses a CSV file to populate an Astra DB table with the same structure. The mapping from CSV to table is defined in the flow. The sample dataset is a list of languages and language codes.
-
Run the pipeline:
mvn clean compile exec:java \ -Dexec.mainClass=com.datastax.astra.beam.Csv_to_AstraDb \ -Dexec.args="\ --astraToken=${ASTRA_TOKEN} \ --astraSecureConnectBundle=${ASTRA_SCB_PATH} \ --astraKeyspace=${ASTRA_DB_KEYSPACE} \ --csvInput=`pwd`/src/test/resources/language-codes.csv"
-
Verify the Astra DB table data:
astra db cqlsh ${ASTRA_DB} \ -k ${ASTRA_DB_KEYSPACE} \ -e "SELECT * FROM languages LIMIT 10;"
Result:
code | language ------+--------------------------------------- ru | Russian mt | Maltese ur | Urdu bm | Bambara es | Spanish; Castilian sd | Sindhi nb | "Bokmål, Norwegian; Norwegian Bokmål" gd | Gaelic; Scottish Gaelic st | "Sotho, Southern" ml | Malayalam
This flow exports a CQL table from Astra DB to a CSV file. The mapping from table to CSV is defined in the flow.
Before you run this flow, make sure that you import the required data.
-
Run the pipeline:
mvn clean compile exec:java \ -Dexec.mainClass=com.datastax.astra.beam.AstraDb_To_Csv \ -Dexec.args="\ --astraToken=${ASTRA_TOKEN} \ --astraSecureConnectBundle=${ASTRA_SCB_PATH} \ --astraKeyspace=${ASTRA_DB_KEYSPACE} \ --table=languages \ --csvOutput=`pwd`/src/test/resources/out/language"
-
Verify the output:
ls -l `pwd`/src/test/resources/out cat `pwd`/src/test/resources/out/language-00001-of-00004
Result
gu,Gujarati ve,Venda ss,Swati ch,Chamorro si,Sinhala; Sinhalese fa,Persian no,Norwegian sv,Swedish ...
This flow creates a table in a Apache Cassandra® database, and then imports the table data to Astra DB.
The dataset is a list of languages, and the data mapping is defined in the flow.
The flow uses CassandraIO (driver3x
) to read the Cassandra data and AstraDbIO (drivers4x
) to load data.
-
Build the Cassandra Docker image and start the container:
docker-compose -f ./src/main/docker/docker-compose.yml up -d
-
Wait for Cassandra to start. You can use the following command to check the container status:
docker ps -f "name=cassandra"
-
To verify that Cassandra is ready, connect with
cqlsh
and get the current data center:docker exec -it `docker ps | \ grep cassandra:4.1.1 | \ cut -b 1-12` cqlsh -e "SELECT data_center FROM system.local;"
-
Create environment variables in your terminal:
export ASTRA_DB=DATABASE_NAME export ASTRA_DB_KEYSPACE=default_keyspace export ASTRA_SCB_PATH=/tmp/secure-connect-bundle-db-demo.zip export ASTRA_TOKEN=AstraCS:...
-
Run the pipeline. The flow creates a keyspace and table in the local Cassandra database before copying the data to Astra DB.
mvn clean compile exec:java \ -Dexec.mainClass=com.datastax.astra.beam.Cassandra_To_AstraDb \ -Dexec.args="\ --astraToken=${ASTRA_TOKEN} \ --astraSecureConnectBundle=${ASTRA_SCB_PATH} \ --astraKeyspace=${ASTRA_DB_KEYSPACE} \ --cassandraHost=localhost \ --cassandraKeyspace=demo \ --cassandraTableName=languages \ --cassandraPort=9042 \ --tableName=languages"
-
Use cqlsh to verify the data in the local Cassandra database:
docker exec -it `docker ps \ | grep cassandra:4.1.1 \ | cut -b 1-12` \ cqlsh -e "SELECT * FROM demo.languages LIMIT 10;"
Result
code | language ------+--------------------------------------- ru | Russian mt | Maltese ur | Urdu bm | Bambara es | Spanish; Castilian sd | Sindhi nb | "Bokmål, Norwegian; Norwegian Bokmål" gd | Gaelic; Scottish Gaelic st | "Sotho, Southern" ml | Malayalam
-
Use cqlsh to verify that the Astra DB table has the same data as the Cassandra database:
astra db cqlsh ${ASTRA_DB} -k ${ASTRA_TOKEN} -e "SELECT * FROM languages LIMIT 10;"
This example uses two flows. The first flow imports a CSV file and maps it to an Astra DB table. The second flow generates embeddings from the OpenAI Embedding API, and then stores the vectors in the table.
-
Import the CSV data:
mvn clean compile exec:java \ -Dexec.mainClass=com.datastax.astra.beam.genai.GenAI_01_ImportData \ -Dexec.args="\ --astraToken=${ASTRA_TOKEN} \ --astraSecureConnectBundle=${ASTRA_SCB_PATH} \ --astraKeyspace=${ASTRA_DB_KEYSPACE} \ --csvInput=`pwd`/src/main/resources/fables_of_fontaine.csv"
-
Verify the data in Astra DB:
astra db cqlsh ${ASTRA_DB} -k ${ASTRA_TOKEN} -e "SELECT * FROM fable LIMIT 10;"
Result
document_id | document | title -------------+---------------------------------------+---------------------- 6 | Of certain gormandizing cits. | THE FLY AND THE GAME 16 | The lady thought the creatures prime, | THE FLY AND THE GAME 32 | Such game as this may suit the dogs.' | THE FLY AND THE GAME 26 | Was anything but fit to eat. | THE FLY AND THE GAME 19 | For dinner she could hardly wait. | THE FLY AND THE GAME 13 | Pronounced it racy, rich, and rare, | THE FLY AND THE GAME 34 | His soul possess'd of this surmise, | THE FLY AND THE GAME 21 | Just as the buyer drew his purse, | THE FLY AND THE GAME 7 | With merry heart the fellow went | THE FLY AND THE GAME 17 | And for their dinner just in time | THE FLY AND THE GAME (10 rows)
-
Set the following two environment variables:
export ASTRA_TABLE=fable export OPENAI_KEY=OPENAI_API_KEY
-
Generate and load the embeddings:
mvn clean compile exec:java \ -Dexec.mainClass=com.datastax.astra.beam.genai.GenAI_02_CreateEmbeddings \ -Dexec.args="\ --astraToken=${ASTRA_TOKEN} \ --astraSecureConnectBundle=${ASTRA_SCB_PATH} \ --astraKeyspace=${ASTRA_DB_KEYSPACE} \ --openAiKey=${OPENAI_KEY} \ --table=${ASTRA_TABLE}"
If the flow ran successfully, the output contains the embeddings that were added to the table.
Google Dataflow samples
In the sample repo, the samples-dataflow
directory contains flows executed with Google Dataflow.
Set up the gcloud CLI
-
In the Google Cloud console, select or create a Google Cloud project.
-
Enable the Billing API.
-
Set your Google Cloud project values as environment variables:
export GCP_PROJECT_ID=GCP_PROJECT_ID export GCP_PROJECT_CODE=GCP_PROJECT_CODE export GCP_USER=GCP_EMAIL export GCP_COMPUTE_ENGINE=${GCP_PROJECT_CODE}-compute@developer.gserviceaccount.com
-
Sign in and authenticate with Google Cloud:
gcloud auth login
-
Set your project ID, and then print your project to confirm you’re connected:
gcloud config set project ${GCP_PROJECT_ID} gcloud projects describe ${GCP_PROJECT_ID}
Result
createTime: '2024-07-12T14:53:48.447964Z' lifecycleState: ACTIVE name: my-quickstart projectId: my-quickstart-429214 projectNumber: '186311514751'
-
Enable the required APIs, and then wait for the process to complete:
gcloud services enable dataflow compute_component \ logging storage_component storage_api \ bigquery pubsub datastore.googleapis.com \ cloudresourcemanager.googleapis.com
-
Add roles to your project to allow your user and service accounts to access Dataflow.
Your user account must have the Dataflow Admin role and the Service Account User role. The Compute Engine default service account must have the Dataflow Worker role.
gcloud projects add-iam-policy-binding ${GCP_PROJECT_ID} \ --member="user:${GCP_USER}" \ --role=roles/iam.serviceAccountUser gcloud projects add-iam-policy-binding ${GCP_PROJECT_ID} \ --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \ --role=roles/dataflow.admin gcloud projects add-iam-policy-binding ${GCP_PROJECT_ID} \ --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \ --role=roles/dataflow.worker gcloud projects add-iam-policy-binding ${GCP_PROJECT_ID} \ --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \ --role=roles/storage.objectAdmin
-
Create a Google Cloud Storage bucket:
export GCP_BUCKET_INPUT=gs://astra_dataflow_inputs gsutil mb -c STANDARD -l US ${GCP_BUCKET_INPUT}
-
Change directory to the
samples-dataflow
folder:cd samples-dataflow pwd
-
Copy the local file
src/test/resources/language-codes.csv
to the GCP bucket:gsutil cp src/test/resources/language-codes.csv ${GCP_BUCKET_INPUT}/csv/ gsutil ls
-
Set environment variables for
token
and the SCB path, and then create secrets for these values in Google Cloud Secret Manager:export GCP_SECRET_TOKEN=token export GCP_SECRET_SECURE_BUNDLE=/tmp/secure-connect-bundle-db-DATABASE_NAMEzip gcloud secrets create ${GCP_SECRET_TOKEN} \ --data-file <(echo -n "${ASTRA_TOKEN}") \ --replication-policy="automatic" gcloud secrets add-iam-policy-binding ${GCP_SECRET_TOKEN} \ --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \ --role='roles/secretmanager.secretAccessor' gcloud secrets create ${GCP_SECRET_SECURE_BUNDLE} \ --data-file ${ASTRA_SCB_PATH} \ --replication-policy="automatic" gcloud secrets add-iam-policy-binding ${GCP_SECRET_SECURE_BUNDLE} \ --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \ --role='roles/secretmanager.secretAccessor' gcloud secrets list
-
Create a keyspace in Astra DB:
astra db create-keyspace DATABASE_NAME \ -k KEYSPACE_NAME \ --if-not-exist
-
Export environment variables:
export ASTRA_DB=DATABASE_NAME export ASTRA_DB_KEYSPACE=KEYSPACE_NAME export ASTRA_TABLE=TABLE_NAME export ASTRA_SECRET_TOKEN=projects/${GCP_PROJECT_CODE}/secrets/${GCP_SECRET_TOKEN}/versions/1 export ASTRA_SECRET_SECURE_BUNDLE=projects/${GCP_PROJECT_CODE}/secrets/${GCP_SECRET_SECURE_BUNDLE}/versions/1 export GCP_PROJECT_ID=<your-gcp-project-id>
Run Google Dataflow flows
Before you run these flows, make sure that you Set up the gcloud CLI.
To run the flows in sample-flows
, use the following commands:
-
Import a CSV from Google Cloud Storage
-
Export a table to Google Cloud Storage
-
Export an Astra DB table to BigQuery
Use this flow to import a CSV from Google Cloud Storage to an Astra DB table.
-
Run the pipeline:
mvn compile exec:java \ -Dexec.mainClass=com.datastax.astra.dataflow.Gcs_To_AstraDb \ -Dexec.args="\ --astraToken=${ASTRA_SECRET_TOKEN} \ --astraSecureConnectBundle=${ASTRA_SECRET_SECURE_BUNDLE} \ --astraKeyspace=${ASTRA_DB_KEYSPACE} \ --csvInput=${GCP_INPUT_CSV} \ --project=${GCP_PROJECT_ID} \ --runner=DataflowRunner \ --region=us-central1"
-
Confirm the Astra DB table is populated.
astra db cqlsh ${ASTRA_DB} \ -k ${ASTRA_DB_KEYSPACE} \ -e "SELECT * FROM languages LIMIT 10;"
Use this flow to export data from an Astra DB table to Google Cloud Storage.
-
Create a GCP output bucket:
export GCP_OUTPUT_CSV=gs://astra_dataflow_outputs gsutil mb -c STANDARD -l US ${GCP_OUTPUT_CSV}
-
Run the pipeline:
mvn compile exec:java \ -Dexec.mainClass=com.datastax.astra.dataflow.AstraDb_To_Gcs \ -Dexec.args="\ --astraToken=${ASTRA_SECRET_TOKEN} \ --astraSecureConnectBundle=${ASTRA_SECRET_SECURE_BUNDLE} \ --astraKeyspace=${ASTRA_DB_KEYSPACE} \ --table=${ASTRA_TABLE} \ --outputFolder=${GCP_OUTPUT_CSV} \ --project=${GCP_PROJECT_ID} \ --runner=DataflowRunner \ --region=us-central1"
Use this flow to export table data from Astra DB to Google BigQuery.
-
Create a dataset in
dataflow_input_us
BigQuery:export GCP_BIGQUERY_DATASET=dataflow_input_us bq mk ${GCP_BIGQUERY_DATASET} bq ls --format=pretty
-
Create a BigQuery table using the
schema_language_codes.json
file insamples-dataflow/src/main/resources/schema_language_codes.json
:export GCP_BIGQUERY_TABLE=destination bq mk --table --schema src/main/resources/schema_language_codes.json ${GCP_BIGQUERY_DATASET}.${GCP_BIGQUERY_TABLE}
-
Run the pipeline:
mvn compile exec:java \ -Dexec.mainClass=com.datastax.astra.dataflow.AstraDb_To_BigQuery \ -Dexec.args="\ --astraToken=${ASTRA_SECRET_TOKEN} \ --astraSecureConnectBundle=${ASTRA_SECRET_SECURE_BUNDLE} \ --astraKeyspace=${ASTRA_DB_KEYSPACE} \ --table=languages \ --bigQueryDataset=${GCP_BIGQUERY_DATASET} \ --bigQueryTable=${GCP_BIGQUERY_TABLE} \ --runner=DataflowRunner \ --project=${GCP_PROJECT_ID} \ --region=us-central1"
-
Verify that the BigQuery table contains the data from Astra DB:
bq head -n 10 ${GCP_BIGQUERY_DATASET}.${GCP_BIGQUERY_TABLE}
Astra DB I/O connectors
Flows use Astra DB I/O connectors to run read, write, delete, and read all operations in pipelines.
To add the dependency to your Maven project, find the latest version on Maven Central, and set it in pom.xml
:
<dependency>
<groupId>com.datastax.astra</groupId>
<artifactId>beam-sdks-java-io-astra</artifactId>
<version>${latest-version}</version>
</dependency>
To read data from Astra DB, use AstraDbIO.Read<Entity>
.
The entity must be a Serializable object.
The following example uses the Cassandra object mapping of Driver 4x:
byte[] scbZip = ...
AstraDbIO.Read<LanguageCode> read = AstraDbIO.<LanguageCode>read()
.withToken("token")
.withKeyspace("keyspace")
.withSecureConnectBundle(scbZip)
.withTable("table")
.withMinNumberOfSplits(20)
.withCoder(SerializableCoder.of(LanguageCode.class))
.withMapperFactoryFn(new LanguageCodeDaoMapperFactoryFn())
.withEntity(LanguageCode.class);
The mapperFactoryFn
implements SerializableFunction<CqlSession, AstraDbMapper<Entity>>
.
Alternatively, you can specify a query. The table name is not mandatory.
AstraDbIO.Read<LanguageCode> read2 = AstraDbIO.<LanguageCode>read()
.withToken("token")
.withKeyspace("keyspace")
.withSecureConnectBundle(scbZip)
.withQuery("select * from table where ...")
.withMinNumberOfSplits(20)
.withCoder(SerializableCoder.of(LanguageCode.class))
.withMapperFactoryFn(new LanguageCodeDaoMapperFactoryFn())
.withEntity(LanguageCode.class);
To write data to Astra DB, use AstraDbIO.Write<Entity>
:
AstraDbIO.Write<LanguageCode> write = AstraDbIO.<LanguageCode>write()
.withToken("token")
.withKeyspace("keyspace")
.withSecureConnectBundle(scbZip)
.withMapperFactoryFn(new LanguageCodeDaoMapperFactoryFn())
.withEntity(LanguageCode.class);