Integrate Feast with Astra DB Serverless

query_builder 30 min

Feast is an Apache-licensed open-source feature store for machine learning.

Starting with version 0.24, the Feast online store for Cassandra supports both Apache Cassandra® and Astra DB Serverless.

Prerequisites

Install Feast

Feast can be installed manually or with the helper CLI.

  • Install Feast manually

  • Install Feast with the helper CLI

  1. Install Feast with the cassandra extra:

    pip install "feast[cassandra]"
  2. Initialize a new feature repository:

    feast init astraFeatures
    cd astraFeatures/feature_repo
  3. Open the store configuration file feature_store.yaml, and then replace the online_store section with your database’s values:

    feature_store.yaml
    online_store:
        type: cassandra
        secure_bundle_path: PATH_TO_SECURE_CONNECT_BUNDLE_ZIP
        username: token
        password: APPLICATION_TOKEN
        keyspace: DB_NAMESPACE
  4. Use the values in feature-store.yaml to initialize Feast. With the apply command, features defined in Python modules (in this case, example.py) are scanned and used for actual deployment of the infrastructure.

    feast apply

    Feast creates new tables in your namespace:

    Deploying infrastructure for driver_hourly_stats
    Deploying infrastructure for driver_hourly_stats_fresh
  1. To use the helper CLI, first install Feast with the cassandra extra:

    pip install "feast[cassandra]"
  2. Initialize a new feature repository with the cassandra template:

    feast init astraFeatures -t cassandra
  3. When prompted, enter value for the following:

    • Secure Connect Bundle: The path to your SCB zip file

    • Client ID: token

    • Client Secret: Your application token

    • keyspace: A namespace in your database, such as default_keyspace

      You can press kdb:[n] to use the default options for the other settings.

      Regular [C]assandra or [A]stra DB? [C]: A
      Enter the full path to your Secure Connect Bundle: /home/mary/downloads/secure-connect.zip
      Enter the Client ID from your Astra DB token: token
      Enter the Client Secret from your Astra DB token: AstraCS:...
      Specify the keyspace to use [feast_keyspace]: default_keyspace
      Specify protocol version? [y/N]: n
      Specify load-balancing? [y/N]: n
      Specify concurrency levels? [y/N]: n
      
      Creating a new Feast repository in /home/mary/coding/feast/astraFeatures
  4. Change directory to the new repository, and then initialize Feast. With the apply command, features defined in Python modules (in this case, example.py) are scanned and used for actual deployment of the infrastructure.

    cd astraFeatures/feature_repo
    feast apply

    Feast creates new tables in your namespace:

    Deploying infrastructure for driver_hourly_stats
    Deploying infrastructure for driver_hourly_stats_fresh

Generate training data

Use Feast’s get_historical_features store method to scan the offline source data and perform a point-in-time join. This method constructs the features requested up to a specified timestamp.

  1. Create a generate.py script that generates a schema for the feature:

    generate.py
    from datetime import datetime, timedelta
    import pandas as pd
    
    from feast import FeatureStore
    
    # The entity dataframe is the dataframe we want to enrich with feature values
    entity_df = pd.DataFrame.from_dict(
        {
            # entity's join key -> entity values
            "driver_id": [1001, 1002, 1003],
    
            # label name -> label values
            "label_driver_reported_satisfaction": [1, 5, 3],
    
            # "event_timestamp" (reserved key) -> timestamps
            "event_timestamp": [
                datetime.now() - timedelta(minutes=11),
                datetime.now() - timedelta(minutes=36),
                datetime.now() - timedelta(minutes=73),
            ],
        }
    )
    
    store = FeatureStore(repo_path=".")
    
    training_df = store.get_historical_features(
        entity_df=entity_df,
        features=[
            "driver_hourly_stats:conv_rate",
            "driver_hourly_stats:acc_rate",
            "driver_hourly_stats:avg_daily_trips",
        ],
    ).to_df()
    
    print("----- Feature schema -----\n")
    print(training_df.info())
    
    print()
    print("----- Example features -----\n")
    print(training_df.head())
  2. Run the script:

    python generate.py

    The script generates a schema for the feature:

    ----- Feature schema -----
    
    <class 'pandas.core.frame.DataFrame'>
    RangeIndex: 3 entries, 0 to 2
    Data columns (total 6 columns):
     #   Column                              Non-Null Count  Dtype
    ---  ------                              --------------  -----
     0   driver_id                           3 non-null      int64
     1   label_driver_reported_satisfaction  3 non-null      int64
     2   event_timestamp                     3 non-null      datetime64[ns, UTC]
     3   conv_rate                           3 non-null      float32
     4   acc_rate                            3 non-null      float32
     5   avg_daily_trips                     3 non-null      int32
    dtypes: datetime64[ns, UTC](1), float32(2), int32(1), int64(2)
    memory usage: 236.0 bytes
    None
    
    ----- Example features -----
    
       driver_id  label_driver_reported_satisfaction                  event_timestamp  conv_rate  acc_rate  avg_daily_trips
    0       1002                                   5 2024-07-01 14:27:40.786684+00:00   0.241222  0.193434              634
    1       1001                                   1 2024-07-01 14:52:40.786677+00:00   0.791722  0.200128              583
    2       1003                                   3 2024-07-01 13:50:40.786685+00:00   0.505599  0.828616              976
  3. Declare a CURRENT_TIME environment variable, and then reference it with Feast’s materialize-incremental command in the terminal:

    CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S")
    feast materialize-incremental $CURRENT_TIME

    Feast carries the latest feature values over to the online store for quick access during feature serving:

    Materializing 2 feature views to 2024-07-01 15:04:32-04:00 into the cassandra online store.
    
    driver_hourly_stats from 2024-06-30 19:04:40-04:00 to 2024-07-01 15:04:32-04:00:
      0%|                                                                         | 0/5 [00:00<?, ?it/s]07/01/2024 03:04:41 PM cassandra.cluster
    6it [00:02,  2.55it/s]
    driver_hourly_stats_fresh from 2024-06-30 19:04:43-04:00 to 2024-07-01 15:04:32-04:00:
    6it [00:00, 38.05it/s]
  4. In the Astra Portal, inspect the Astra DB table to confirm that the rows were added.

Fetch feature vectors from the online store

Use the get_online_features store method to query the online store.

  1. Create a fetch_online.py script that returns the feature vectors from the most recent materialize operation:

    fetch-online.py
    from pprint import pprint
    from feast import FeatureStore
    
    store = FeatureStore(repo_path=".")
    
    feature_vector = store.get_online_features(
        features=[
            "driver_hourly_stats:conv_rate",
            "driver_hourly_stats:acc_rate",
            "driver_hourly_stats:avg_daily_trips",
        ],
        entity_rows=[
            # {join_key: entity_value}
            {"driver_id": 1004},
            {"driver_id": 1005},
        ],
    ).to_dict()
    
    pprint(feature_vector)
  2. Run the script:

    python fetch_online.py

    The script returns the latest feature vectors from the online store:

    {'acc_rate': [0.8428952693939209, 0.7182396054267883],
     'avg_daily_trips': [815, 467],
     'conv_rate': [0.5581942796707153, 0.8678529858589172],
     'driver_id': [1004, 1005]}

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com