Integrate Airbyte with Astra DB Serverless

query_builder 15 min

Airbyte builds extract, transform, and load (ETL) pipelines from data sources to destinations.

This tutorial uses an Airbyte connector to extract data from a GNews source, and then load that data into aAstra DB Serverless destination.

Airbyte is available as a self-hosted or cloud-hosted service. This tutorial uses Airbyte Cloud.

Prerequisites

This tutorial requires the following:

Create the Airbyte pipeline

In your Airbyte Cloud workspace, create a Source, Destination, and Connection for your Airbyte pipeline.

Add a GNews source in Airbyte

A Source is an application that extracts data from an underlying data store. This can be an API, a file, a database, or a data warehouse.

  1. Add a source to pull general articles from the GNews API.

  2. Test and verify the source. All connection tests passed appears if the source is valid.

See the Airbyte documentation for more details about how to create a source.

Add an Astra DB destination in Airbyte

In Airbyte, a destination is a target that receives and loads data into an underlying data store. This can be a data warehouse, a data lake, another database, or an analytics tool. For more information about adding destinations, see the Airbyte documentation.

  1. Add a destination to send data to Astra DB:

    • Destination name: Astra DB

    • Chunk size: 512

    • OpenAI API key: Your OpenAI API key

    • Astra DB application token: Your application token

    • Astra DB endpoint: Your database’s API endpoint

    • Astra DB keyspace: default_keyspace

    • Astra DB collection: airbyte

  2. Test and verify the destination. All connection tests passed appears if the destination is valid.

Connect GNews and Astra DB

A connection is an Airbyte component that pulls data from a source and pushes data to a destination. For more information about creating connections, see the Airbyte documentation.

  1. Set up a connection between the GNews source and the Astra DB destination:

    • Leave the Configuration values as default. Airbyte syncs every 24 hours to the Astra DB destination.

    • Enable the top_headlines and search streams.

    • Select the Incremental | Append + Deduped Sync mode. Each sync appends only modified data to your existing tables and keeps only the most recent data.

  2. Wait for the first sync job to complete. You may need to manually run the sync job if it doesn’t start automatically.

Verify the integration

Connect to your Astra DB Serverless database to verify that the airbyte collection is receiving the GNews articles.

  1. In the root of your project, create a .env file with the following environment variables:

    .env
    ASTRA_DB_APPLICATION_TOKEN="APPLICATION_TOKEN"
    ASTRA_DB_API_ENDPOINT="ASTRA_DB_API_ENDPOINT"
    ASTRA_DB_KEYSPACE="default_keyspace"
    ASTRA_DB_COLLECTION_NAME="airbyte"
  2. Install dependencies:

    pip install astrapy python-dotenv
  3. Create a Python program to connect to your Astra DB Serverless database and print one article from the airbyte collection:

    airbyte-integration.py
    import os
    
    from astrapy import DataAPIClient
    from dotenv import load_dotenv
    
    load_dotenv()
    
    client = DataAPIClient()
    database = client.get_database(
        os.environ["ASTRA_DB_API_ENDPOINT"],
        token=os.environ["ASTRA_DB_APPLICATION_TOKEN"],
        keyspace=os.environ["ASTRA_DB_KEYSPACE"],
    )
    collection = database.get_collection(os.environ["ASTRA_DB_COLLECTION_NAME"])
    
    
    print(collection.find_one(projection={"$vector": True}))
  4. Run the integration program:

    airbyte-integration.py
    python3 airbyte-integration.py
    Result

    The output contains source metadata, vector embeddings, and the article text. The following example output is truncated and reformatted for clarity:

    {
      "data": {
        "document": {
          "_id": "5ac002a7-c051-41e0-9628-b35b4549acee",
          "$vector": [
            -0.022076305689849227,
            ...,
            -0.03238973221677352
          ],
          "source": {
            "name": "ABC News",
            "url": "https://www.abc.net.au"
          },
          "_ab_stream": "top_headlines",
          "_ab_record_id": "top_headlines_https://www.abc.net.au/news/.../10349491",
          "text": "content: On a warm late summer's day in Dunkley yesterday, ..."
        }
      }
    }
  5. Confirm that Airbyte is streaming data from the search stream. Replace the last line in the Python script with the following code, and then rerun the script.

    airbyte-integration.py
    print(
        collection.find_one(
            {"_ab_stream": "search"},
            projection={"$vector": True},
        )
    )
    Result

    The following example output is truncated and reformatted for clarity:

    {
      "data": {
        "documents": [
          {
            "_id": "6e1316d7-cccd-438b-9316-d7cccdd38bf4",
            "$vector": [
              0.063985432166200121,
              ...,
              -0.000311508654776436
            ],
            "source": {
              "name": "SooToday",
              "url": "https://www.sootoday.com"
            },
            "_ab_stream": "search",
            "_ab_record_id": "search_https://www.sootoday.com/.../1862684",
            "text": "content: Description • Develop recreation therapy programs ..."
          }
        ]
      }
    }

You have confirmed your Airbyte pipeline is streaming data from the GNews API to your Astra DB Serverless database.

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com