Integrate Airbyte with Astra DB Serverless
Airbyte builds extract, transform, and load (ETL) pipelines from data sources to destinations.
This tutorial uses an Airbyte connector to extract data from a GNews source, and then load that data into aAstra DB Serverless destination.
Airbyte is available as a self-hosted or cloud-hosted service. This tutorial uses Airbyte Cloud.
Prerequisites
This tutorial requires the following:
-
An active Astra account
-
An active Serverless (Vector) database
-
An application token with the Database Administrator role
-
An active Airbyte account
Create the Airbyte pipeline
In your Airbyte Cloud workspace, create a Source, Destination, and Connection for your Airbyte pipeline.
Add a GNews source in Airbyte
A Source is an application that extracts data from an underlying data store. This can be an API, a file, a database, or a data warehouse.
-
Add a source to pull general articles from the GNews API.
-
Test and verify the source. All connection tests passed appears if the source is valid.
See the Airbyte documentation for more details about how to create a source.
Add an Astra DB destination in Airbyte
In Airbyte, a destination is a target that receives and loads data into an underlying data store. This can be a data warehouse, a data lake, another database, or an analytics tool. For more information about adding destinations, see the Airbyte documentation.
-
Add a destination to send data to Astra DB:
-
Destination name:
Astra DB
-
Chunk size:
512
-
OpenAI API key: Your OpenAI API key
-
Astra DB application token: Your application token
-
Astra DB endpoint: Your database’s API endpoint
-
Astra DB keyspace:
default_keyspace
-
Astra DB collection:
airbyte
-
-
Test and verify the destination. All connection tests passed appears if the destination is valid.
Connect GNews and Astra DB
A connection is an Airbyte component that pulls data from a source and pushes data to a destination. For more information about creating connections, see the Airbyte documentation.
-
Set up a connection between the GNews source and the Astra DB destination:
-
Leave the Configuration values as default. Airbyte syncs every 24 hours to the Astra DB destination.
-
Enable the top_headlines and search streams.
-
Select the Incremental | Append + Deduped Sync mode. Each sync appends only modified data to your existing tables and keeps only the most recent data.
-
-
Wait for the first sync job to complete. You may need to manually run the sync job if it doesn’t start automatically.
Verify the integration
Connect to your Astra DB Serverless database to verify that the airbyte
collection is receiving the GNews articles.
-
In the root of your project, create a
.env
file with the following environment variables:.envASTRA_DB_APPLICATION_TOKEN="APPLICATION_TOKEN" ASTRA_DB_API_ENDPOINT="ASTRA_DB_API_ENDPOINT" ASTRA_DB_KEYSPACE="default_keyspace" ASTRA_DB_COLLECTION_NAME="airbyte"
-
Install dependencies:
pip install astrapy python-dotenv
-
Create a Python program to connect to your Astra DB Serverless database and print one article from the
airbyte
collection:airbyte-integration.pyimport os from astrapy import DataAPIClient from dotenv import load_dotenv load_dotenv() client = DataAPIClient() database = client.get_database( os.environ["ASTRA_DB_API_ENDPOINT"], token=os.environ["ASTRA_DB_APPLICATION_TOKEN"], keyspace=os.environ["ASTRA_DB_KEYSPACE"], ) collection = database.get_collection(os.environ["ASTRA_DB_COLLECTION_NAME"]) print(collection.find_one(projection={"$vector": True}))
-
Run the integration program:
airbyte-integration.pypython3 airbyte-integration.py
Result
The output contains source metadata, vector embeddings, and the article text. The following example output is truncated and reformatted for clarity:
{ "data": { "document": { "_id": "5ac002a7-c051-41e0-9628-b35b4549acee", "$vector": [ -0.022076305689849227, ..., -0.03238973221677352 ], "source": { "name": "ABC News", "url": "https://www.abc.net.au" }, "_ab_stream": "top_headlines", "_ab_record_id": "top_headlines_https://www.abc.net.au/news/.../10349491", "text": "content: On a warm late summer's day in Dunkley yesterday, ..." } } }
-
Confirm that Airbyte is streaming data from the
search
stream. Replace the last line in the Python script with the following code, and then rerun the script.airbyte-integration.pyprint( collection.find_one( {"_ab_stream": "search"}, projection={"$vector": True}, ) )
Result
The following example output is truncated and reformatted for clarity:
{ "data": { "documents": [ { "_id": "6e1316d7-cccd-438b-9316-d7cccdd38bf4", "$vector": [ 0.063985432166200121, ..., -0.000311508654776436 ], "source": { "name": "SooToday", "url": "https://www.sootoday.com" }, "_ab_stream": "search", "_ab_record_id": "search_https://www.sootoday.com/.../1862684", "text": "content: Description • Develop recreation therapy programs ..." } ] } }
You have confirmed your Airbyte pipeline is streaming data from the GNews API to your Astra DB Serverless database.