Integrate Apache Airflow® with Astra DB Serverless
You can use Astra DB Serverless databases in your Apache Airflow® Directed Acyclic Graphs (DAGs).
This is handled through the Apache Cassandra® Python driver and Airflow’s apache-cassandra extra.
Although the Python driver provides built-in support for connections to Astra, the Airflow interface doesn’t accept the cloud_config.secure_connect_bundle parameter from the driver.
To work around this, use cql-proxy to create a compatible, SSL-encrypted connection using your database’s Secure Connect Bundle (SCB).
Prerequisites
-
An Apache Airflow installation
-
Familiarity with proxy server configuration
-
An Astra DB Serverless database with a keyspace that you want to use in your DAG
-
An application token with a role that permits the actions required by your DAG
-
Your database’s Secure Connect Bundle (SCB)
Install the Apache Cassandra extra
Install the apache-airflow-providers-apache-cassandra extra in your Airflow environment to enable support for Cassandra connections, including Astra:
pip install apache-airflow-providers-apache-cassandra
Deploy cql-proxy
Use the instructions in the cql-proxy README to deploy cql-proxy:
-
It must be accessible to your Airflow instance.
-
Use the SCB and application token for authentication.
-
Bind
cql-proxyto the listen IP of the server instance.
./cql-proxy \
--bind SERVER_LISTEN_ADDRESS \
--astra-bundle PATH/TO/SCB.zip \
--username token \
--password APPLICATION_TOKEN
The --username must be the literal string token.
Create the cql-proxy connection in Airflow
In Airflow, create a connection to your cql-proxy deployment.
These steps use the Airflow web UI.
For other methods, see Managing connections in Airflow.
-
Click Admin, and then select Connections.
-
Click Add Connection.
-
For Connection ID, enter a name to identify this connection in your DAG code, such as
astra_cqlproxy. -
For Connection Type, select
Cassandrafrom the drop-down menu.If this option isn’t available, make sure you installed the Apache Cassandra extra.
-
For Host, enter the listen address that
cql-proxyis bound to. -
For Schema, enter the name of a keyspace in your Astra DB Serverless database that you want to use in your DAG.
To use multiple keyspaces, you must create multiple connections.
To use multiple databases, you either need multiple
cql-proxyinstances or you need to modify your instance to start with dynamic inputs. This is becausecql-proxycan only use one SCB at a time.If you have multi-region databases, you might need to create a
cql-proxyinstance and Airflow connection for each region because multi-region databases have a unique SCB for each region. -
For Login, enter
token. -
For Password, enter your application token.
-
For Port, enter the port that
cql-proxyis listening on for the CQL native binary protocol. The default is9042. -
Click Save.
Test the connection in a DAG
Use the connection in your own DAG, or use the following example code to test the connection.
For guidance on creating and running DAGs, see the Airflow documentation.
The following example DAG references the cql-proxy connection in the line hook = CassandraHook('astra_cqlproxy').
Replace astra_cqlproxy with your connection ID.
This example also sets its unique identifier in with DAG('cass_hooks_tutorial', …).
If you want to use a different DAG ID, replace cass_hooks_tutorial with your preferred ID.
Finally, this example creates two unique tasks: check_table_exists and query_system_local.
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.python import PythonOperator
from airflow.providers.apache.cassandra.sensors.record import CassandraRecordSensor
from airflow.providers.apache.cassandra.sensors.table import CassandraTableSensor
from airflow.providers.apache.cassandra.hooks.cassandra import CassandraHook
import pprint
default_args = {
'owner': 'admin',
'depends_on_past': False,
'email': ['admin@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
hook = CassandraHook('astra_cqlproxy')
pp = pprint.PrettyPrinter(indent=4)
strCQL = "SELECT * FROM system.local;"
def check_table_exists(keyspace_name, table_name):
print("Checking for the existence of " + keyspace_name + "." + table_name)
hook.keyspace = keyspace_name
return hook.table_exists(table_name)
def execute_query(query):
pp.pprint(hook.get_conn().execute(query).current_rows)
with DAG(
'cass_hooks_tutorial',
default_args=default_args,
description='An example Cassandra DAG using Cassandra hooks',
schedule_interval=timedelta(days=1),
start_date=datetime(2022,2,7),
catchup=False,
tags=['example','cassandra'],
) as dag:
check_table_exists = PythonOperator(
task_id="check_table_exists",
python_callable=check_table_exists,
op_args=['system','local'],
)
query_system_local = PythonOperator(
task_id="query_system_local",
python_callable=execute_query,
op_args=[strCQL]
)
check_table_exists >> query_system_local
Save your DAG, and then trigger a test run.
For example, to trigger the DAG with the airflow CLI:
airflow tasks test DAG_ID TASK_ID EXECUTION_DATE
Using the preceding example DAG on November 6, 2022, you would use the following command:
airflow tasks test cass_hooks_tutorial check_table_exists 2022-11-06
This command prints a lot of output. The important line to check is the last line that indicates whether the task succeeded or failed:
INFO - Marking task as SUCCESS. dag_id=cass_hooks_tutorial, task_id=check_table_exists, execution_date=...