Integrate Google Dataflow with Astra DB Serverless

query_builder 30 min

Google Dataflow is a managed service for batch and streaming data processing pipelines based on Apache Beam.

Prerequisites

Build a sample project

Clone the astra-dataflow-starter sample repository, and then build the project with Maven.

  1. Clone the repository containing the sample flows:

    git clone https://github.com/DataStax-Examples/astra-dataflow-starter.git
  2. Change to the repo directory, and then build the project with Maven:

    cd astra-dataflow-starter
    mvn clean install -D maven.test.skip=true

The sample repository contains two modules with two flows each:

  • samples-beam contains flows executed with Apache Beam runners. Run these flows to test the integration between Astra and Apache Beam.

  • samples-dataflow contains flows executed with Google Dataflow. Run these flows to test the integration between Astra and Google Dataflow.

Beam samples

In the sample repo, the samples-beam directory contains flows that you can run with Apache Beam runners in a local environment.

Set up the local environment

  1. Change to the samples-beam directory:

    cd samples-beam
    pwd
  2. Get the values for your Astra DB name, keyspace, and token from the Astra UI.

  3. Download the secure connect bundle for your Astra DB.

    You can use the Astra CLI to download the SCB:

    astra db download-scb DATABASE_ID -f /tmp/secure-connect-bundle-DATABASE_NAME.zip

    Replace DATABASE_ID with your database’s UUID, and DATABASE_NAME with your database’s name.

  4. Set the following environment variables:

    • ASTRA_DB: Database name.

    • ASTRA_KEYSPACE: A namespace in your database. The default namespace for Serverless (Vector) databases is default_keyspace.

    • ASTRA_SCB_PATH: The path to the SCB zip file.

    • ASTRA_TOKEN: Your application token.

      export ASTRA_DB=DATABASE_NAME
      export ASTRA_KEYSPACE=default_keyspace
      export ASTRA_SCB_PATH=/tmp/secure-connect-bundle-db-demo.zip
      export ASTRA_TOKEN=AstraCS:...

Run Beam flows

After you set your environment variables, you can run the following flows:

  • Import a CSV File

  • Export a CQL table as CSV

  • Import a Cassandra table into Astra

  • Generate embeddings

This flow parses a CSV file to populate an Astra DB table with the same structure. The mapping from CSV to table is defined in the flow. The sample dataset is a list of languages and language codes.

  1. Run the pipeline:

    mvn clean compile exec:java \
    -Dexec.mainClass=com.datastax.astra.beam.Csv_to_AstraDb \
    -Dexec.args="\
    --astraToken=${ASTRA_TOKEN} \
    --astraSecureConnectBundle=${ASTRA_SCB_PATH} \
    --astraKeyspace=${ASTRA_KEYSPACE} \
    --csvInput=`pwd`/src/test/resources/language-codes.csv"
  2. Verify the Astra DB table data:

    astra db cqlsh ${ASTRA_DB} \
       -k ${ASTRA_KEYSPACE} \
       -e "SELECT * FROM languages LIMIT 10;"
    Result:
     code | language
    ------+---------------------------------------
       ru |                               Russian
       mt |                               Maltese
       ur |                                  Urdu
       bm |                               Bambara
       es |                    Spanish; Castilian
       sd |                                Sindhi
       nb | "Bokmål, Norwegian; Norwegian Bokmål"
       gd |               Gaelic; Scottish Gaelic
       st |                     "Sotho, Southern"
       ml |                             Malayalam

This flow exports a CQL table from Astra DB to a CSV file. The mapping from table to CSV is defined in the flow.

Before you run this flow, make sure that you import the required data.

  1. Run the pipeline:

    mvn clean compile exec:java \
    -Dexec.mainClass=com.datastax.astra.beam.AstraDb_To_Csv \
    -Dexec.args="\
    --astraToken=${ASTRA_TOKEN} \
    --astraSecureConnectBundle=${ASTRA_SCB_PATH} \
    --astraKeyspace=${ASTRA_KEYSPACE} \
    --table=languages \
    --csvOutput=`pwd`/src/test/resources/out/language"
  2. Verify the output:

    ls -l `pwd`/src/test/resources/out
    cat `pwd`/src/test/resources/out/language-00001-of-00004
    Result:
    gu,Gujarati
    ve,Venda
    ss,Swati
    ch,Chamorro
    si,Sinhala; Sinhalese
    fa,Persian
    no,Norwegian
    sv,Swedish
    ...

This flow creates a table in a Cassandra database, and then imports the table data to Astra DB. The dataset is a list of languages, and the data mapping is defined in the flow. The flow uses CassandraIO (driver3x) to read the Cassandra data and AstraDbIO (drivers4x) to load data.

  1. Build the Cassandra Docker image and start the container:

    docker-compose -f ./src/main/docker/docker-compose.yml up -d
  2. Wait for Cassandra to start. You can use the following command to check the container status:

    docker ps -f "name=cassandra"
  3. To verify that Cassandra is ready, connect with cqlsh and get the current data center:

    docker exec -it `docker ps | \
      grep cassandra:4.1.1 | \
      cut -b 1-12` cqlsh -e "SELECT data_center FROM system.local;"
  4. Create environment variables in your terminal:

    export ASTRA_DB=DATABASE_NAME
    export ASTRA_KEYSPACE=default_keyspace
    export ASTRA_SCB_PATH=/tmp/secure-connect-bundle-db-demo.zip
    export ASTRA_TOKEN=AstraCS:...
  5. Run the pipeline. The flow creates a keyspace and table in the local Cassandra database before copying the data to Astra DB.

    mvn clean compile exec:java \
     -Dexec.mainClass=com.datastax.astra.beam.Cassandra_To_AstraDb \
     -Dexec.args="\
     --astraToken=${ASTRA_TOKEN} \
     --astraSecureConnectBundle=${ASTRA_SCB_PATH} \
     --astraKeyspace=${ASTRA_KEYSPACE} \
     --cassandraHost=localhost \
     --cassandraKeyspace=demo \
     --cassandraTableName=languages \
     --cassandraPort=9042 \
     --tableName=languages"
  6. Use cqlsh to verify the data in the local Cassandra database:

    docker exec -it `docker ps \
      | grep cassandra:4.1.1 \
      | cut -b 1-12` \
      cqlsh -e "SELECT * FROM demo.languages LIMIT 10;"
    Result:
     code | language
    ------+---------------------------------------
       ru |                               Russian
       mt |                               Maltese
       ur |                                  Urdu
       bm |                               Bambara
       es |                    Spanish; Castilian
       sd |                                Sindhi
       nb | "Bokmål, Norwegian; Norwegian Bokmål"
       gd |               Gaelic; Scottish Gaelic
       st |                     "Sotho, Southern"
       ml |                             Malayalam
  7. Use cqlsh to verify that the Astra DB table has the same data as the Cassandra database:

    astra db cqlsh ${ASTRA_DB}
    -k ${ASTRA_TOKEN}
    -e "SELECT * FROM languages LIMIT 10;"

This example uses two flows. The first flow imports a CSV file and maps it to an Astra DB table. The second flow generates embeddings from the OpenAI Embedding API, and then stores the vectors in the table.

  1. Import the CSV data:

     mvn clean compile exec:java \
     -Dexec.mainClass=com.datastax.astra.beam.genai.GenAI_01_ImportData \
     -Dexec.args="\
     --astraToken=${ASTRA_TOKEN} \
     --astraSecureConnectBundle=${ASTRA_SCB_PATH} \
     --astraKeyspace=${ASTRA_KEYSPACE} \
     --csvInput=`pwd`/src/main/resources/fables_of_fontaine.csv"
  2. Verify the data in Astra DB:

    astra db cqlsh ${ASTRA_DB}
    -k ${ASTRA_TOKEN}
    -e "SELECT * FROM fable LIMIT 10;"
    Result:
     document_id | document                              | title
    -------------+---------------------------------------+----------------------
               6 |         Of certain gormandizing cits. | THE FLY AND THE GAME
              16 | The lady thought the creatures prime, | THE FLY AND THE GAME
              32 | Such game as this may suit the dogs.' | THE FLY AND THE GAME
              26 |          Was anything but fit to eat. | THE FLY AND THE GAME
              19 |     For dinner she could hardly wait. | THE FLY AND THE GAME
              13 |   Pronounced it racy, rich, and rare, | THE FLY AND THE GAME
              34 |   His soul possess'd of this surmise, | THE FLY AND THE GAME
              21 |     Just as the buyer drew his purse, | THE FLY AND THE GAME
               7 |      With merry heart the fellow went | THE FLY AND THE GAME
              17 |     And for their dinner just in time | THE FLY AND THE GAME
    
    (10 rows)
  3. Set the following two environment variables:

    export ASTRA_TABLE=fable
    export OPENAI_KEY=OPENAI_API_KEY
  4. Generate and load the embeddings:

    mvn clean compile exec:java \
    -Dexec.mainClass=com.datastax.astra.beam.genai.GenAI_02_CreateEmbeddings \
    -Dexec.args="\
    --astraToken=${ASTRA_TOKEN} \
    --astraSecureConnectBundle=${ASTRA_SCB_PATH} \
    --astraKeyspace=${ASTRA_KEYSPACE} \
    --openAiKey=${OPENAI_KEY} \
    --table=${ASTRA_TABLE}"

If the flow ran successfully, the output contains the embeddings that were added to the table.

Google Dataflow samples

In the sample repo, the samples-dataflow directory contains flows executed with Google Dataflow.

Set up the gcloud CLI

  1. In the Google Cloud console, select or create a Google Cloud project.

  2. Enable the Billing API.

  3. Set your Google Cloud project values as environment variables:

    export GCP_PROJECT_ID=<your-gcp-project-id>
    export GCP_PROJECT_CODE=<your-gcp-project-code>
    export GCP_USER=<your-gcp-email>
    export GCP_COMPUTE_ENGINE=${GCP_PROJECT_CODE}-compute@developer.gserviceaccount.com
  4. Install the gcloud CLI.

  5. Log in and authenticate with Google Cloud:

    gcloud auth login
  6. Set your project ID, and then print your project to confirm you’re connected:

    gcloud config set project ${GCP_PROJECT_ID}
    gcloud projects describe ${GCP_PROJECT_ID}
    Result:
    createTime: '2024-07-12T14:53:48.447964Z'
    lifecycleState: ACTIVE
    name: my-quickstart
    projectId: my-quickstart-429214
    projectNumber: '186311514751'
  7. Enable the required APIs, and then wait for the process to complete:

    gcloud services enable dataflow compute_component \
       logging storage_component storage_api \
       bigquery pubsub datastore.googleapis.com \
       cloudresourcemanager.googleapis.com
  8. Add roles to your project to allow your user and service accounts to access Dataflow.

    Your user account must have the Dataflow Admin role and the Service Account User role. The Compute Engine default service account must have the Dataflow Worker role.

    gcloud projects add-iam-policy-binding ${GCP_PROJECT_ID} \
        --member="user:${GCP_USER}" \
        --role=roles/iam.serviceAccountUser
    gcloud projects add-iam-policy-binding ${GCP_PROJECT_ID}  \
        --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \
        --role=roles/dataflow.admin
    gcloud projects add-iam-policy-binding ${GCP_PROJECT_ID}  \
        --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \
        --role=roles/dataflow.worker
    gcloud projects add-iam-policy-binding ${GCP_PROJECT_ID}  \
        --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \
        --role=roles/storage.objectAdmin
  9. Create a Google Cloud Storage bucket:

    export GCP_BUCKET_INPUT=gs://astra_dataflow_inputs
    gsutil mb -c STANDARD -l US ${GCP_BUCKET_INPUT}
  10. Change directory to the samples-dataflow folder:

    cd samples-dataflow
    pwd
  11. Copy the local file src/test/resources/language-codes.csv to the GCP bucket:

    gsutil cp src/test/resources/language-codes.csv ${GCP_BUCKET_INPUT}/csv/
    gsutil ls
  12. Set environment variables for token and the SCB path, and then create secrets for these values in Google Cloud Secret Manager:

    export GCP_SECRET_TOKEN=token
    export GCP_SECRET_SECURE_BUNDLE=/tmp/secure-connect-bundle-db-DATABASE_NAMEzip
    
    gcloud secrets create ${GCP_SECRET_TOKEN} \
      --data-file <(echo -n "${ASTRA_TOKEN}") \
      --replication-policy="automatic"
    
    gcloud secrets add-iam-policy-binding ${GCP_SECRET_TOKEN} \
       --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \
       --role='roles/secretmanager.secretAccessor'
    
    gcloud secrets create ${GCP_SECRET_SECURE_BUNDLE} \
      --data-file ${ASTRA_SCB_PATH} \
      --replication-policy="automatic"
    
    gcloud secrets add-iam-policy-binding ${GCP_SECRET_SECURE_BUNDLE} \
       --member="serviceAccount:${GCP_COMPUTE_ENGINE}" \
       --role='roles/secretmanager.secretAccessor'
    
    gcloud secrets list
  13. Create a namespace in Astra DB:

    astra db create-keyspace DATABASE_NAME \
       -k KEYSPACE_NAME \
       --if-not-exist
  14. Export environment variables:

    export ASTRA_DB=DATABASE_NAME
    export ASTRA_KEYSPACE=KEYSPACE_NAME
    export ASTRA_TABLE=TABLE_NAME
    export ASTRA_SECRET_TOKEN=projects/${GCP_PROJECT_CODE}/secrets/${GCP_SECRET_TOKEN}/versions/1
    export ASTRA_SECRET_SECURE_BUNDLE=projects/${GCP_PROJECT_CODE}/secrets/${GCP_SECRET_SECURE_BUNDLE}/versions/1
    export GCP_PROJECT_ID=<your-gcp-project-id>

Run Google Dataflow flows

Before you run these flows, make sure that you Set up the gcloud CLI.

To run the flows in sample-flows, use the following commands:

  • Import a CSV from Google Cloud Storage

  • Export a table to Google Cloud Storage

  • Export an Astra table to BigQuery

Use this flow to import a CSV from Google Cloud Storage to an Astra DB table.

  1. Run the pipeline:

     mvn compile exec:java \
     -Dexec.mainClass=com.datastax.astra.dataflow.Gcs_To_AstraDb \
     -Dexec.args="\
     --astraToken=${ASTRA_SECRET_TOKEN} \
     --astraSecureConnectBundle=${ASTRA_SECRET_SECURE_BUNDLE} \
     --astraKeyspace=${ASTRA_KEYSPACE} \
     --csvInput=${GCP_INPUT_CSV} \
     --project=${GCP_PROJECT_ID} \
     --runner=DataflowRunner \
     --region=us-central1"
  2. Confirm the Astra table is populated.

    astra db cqlsh ${ASTRA_DB} \
       -k ${ASTRA_KEYSPACE} \
       -e "SELECT * FROM languages LIMIT 10;"

Use this flow to export data from an Astra DB table to Google Cloud Storage.

  1. Create a GCP output bucket:

    export GCP_OUTPUT_CSV=gs://astra_dataflow_outputs
    gsutil mb -c STANDARD -l US ${GCP_OUTPUT_CSV}
  2. Run the pipeline:

    mvn compile exec:java \
    -Dexec.mainClass=com.datastax.astra.dataflow.AstraDb_To_Gcs \
    -Dexec.args="\
    --astraToken=${ASTRA_SECRET_TOKEN} \
    --astraSecureConnectBundle=${ASTRA_SECRET_SECURE_BUNDLE} \
    --astraKeyspace=${ASTRA_KEYSPACE} \
    --table=${ASTRA_TABLE} \
    --outputFolder=${GCP_OUTPUT_CSV} \
    --project=${GCP_PROJECT_ID} \
    --runner=DataflowRunner \
    --region=us-central1"

Use this flow to export table data from Astra DB to Google BigQuery.

  1. Create a dataset in dataflow_input_us BigQuery:

    export GCP_BIGQUERY_DATASET=dataflow_input_us
    bq mk ${GCP_BIGQUERY_DATASET}
    bq ls --format=pretty
  2. Create a BigQuery table using the schema_language_codes.json file in samples-dataflow/src/main/resources/schema_language_codes.json:

    export GCP_BIGQUERY_TABLE=destination
    bq mk --table --schema src/main/resources/schema_language_codes.json ${GCP_BIGQUERY_DATASET}.${GCP_BIGQUERY_TABLE}
  3. Run the pipeline:

    mvn compile exec:java \
     -Dexec.mainClass=com.datastax.astra.dataflow.AstraDb_To_BigQuery \
     -Dexec.args="\
     --astraToken=${ASTRA_SECRET_TOKEN} \
     --astraSecureConnectBundle=${ASTRA_SECRET_SECURE_BUNDLE} \
     --astraKeyspace=${ASTRA_KEYSPACE} \
     --table=languages \
     --bigQueryDataset=${GCP_BIGQUERY_DATASET} \
     --bigQueryTable=${GCP_BIGQUERY_TABLE} \
     --runner=DataflowRunner \
     --project=${GCP_PROJECT_ID} \
     --region=us-central1"
  4. Verify that the BigQuery table contains the data from Astra DB:

    bq head -n 10 ${GCP_BIGQUERY_DATASET}.${GCP_BIGQUERY_TABLE}

Astra DBIO connectors

Flows use Astra DB I/O connectors to run read, write, delete, and read all operations in pipelines.

To add the dependency to your Maven project, find the latest version on Maven Central, and set it in pom.xml:

<dependency>
   <groupId>com.datastax.astra</groupId>
   <artifactId>beam-sdks-java-io-astra</artifactId>
   <version>${latest-version}</version>
</dependency>

To read data from Astra DB, use AstraDbIO.Read<Entity>. The entity must be a Serializable object.

The following example uses the Cassandra object mapping of Driver 4x:

byte[] scbZip = ...

AstraDbIO.Read<LanguageCode> read = AstraDbIO.<LanguageCode>read()
  .withToken("token")
  .withKeyspace("keyspace")
  .withSecureConnectBundle(scbZip)
  .withTable("table")
  .withMinNumberOfSplits(20)
  .withCoder(SerializableCoder.of(LanguageCode.class))
  .withMapperFactoryFn(new LanguageCodeDaoMapperFactoryFn())
  .withEntity(LanguageCode.class);

The mapperFactoryFn implements SerializableFunction<CqlSession, AstraDbMapper<Entity>>.

Alternatively, you can specify a query. The table name is not mandatory.

 AstraDbIO.Read<LanguageCode> read2 = AstraDbIO.<LanguageCode>read()
  .withToken("token")
  .withKeyspace("keyspace")
  .withSecureConnectBundle(scbZip)
  .withQuery("select * from table where ...")
  .withMinNumberOfSplits(20)
  .withCoder(SerializableCoder.of(LanguageCode.class))
  .withMapperFactoryFn(new LanguageCodeDaoMapperFactoryFn())
  .withEntity(LanguageCode.class);

To write data to Astra DB, use AstraDbIO.Write<Entity>:

AstraDbIO.Write<LanguageCode> write = AstraDbIO.<LanguageCode>write()
  .withToken("token")
  .withKeyspace("keyspace")
  .withSecureConnectBundle(scbZip)
  .withMapperFactoryFn(new LanguageCodeDaoMapperFactoryFn())
  .withEntity(LanguageCode.class);

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com