Integrate Feast with Astra DB Serverless
Feast is an Apache-licensed open-source feature store for machine learning.
Starting with version 0.24
, the Feast online store for Cassandra supports both Apache Cassandra® and Astra DB Serverless.
Prerequisites
-
You have an active Astra account.
-
You have created a Serverless (Vector) database.
-
You have created an application token with the Database Administrator role.
-
You have downloaded the Secure Connect Bundle (SCB) for your database and noted the path to the downloaded
secure-connect-DATABASE_NAME.zip
file.
Install Feast
Feast can be installed manually or with the helper CLI.
-
Install Feast manually
-
Install Feast with the helper CLI
-
Install Feast with the
cassandra
extra:pip install "feast[cassandra]"
-
Initialize a new feature repository:
feast init astraFeatures cd astraFeatures/feature_repo
-
Open the store configuration file
feature_store.yaml
, and then replace theonline_store
section with your database’s values:feature_store.yamlonline_store: type: cassandra secure_bundle_path: PATH_TO_SECURE_CONNECT_BUNDLE_ZIP username: token password: APPLICATION_TOKEN keyspace: DB_NAMESPACE
-
Use the values in
feature-store.yaml
to initialize Feast. With theapply
command, features defined in Python modules (in this case,example.py
) are scanned and used for actual deployment of the infrastructure.feast apply
Feast creates new tables in your namespace:
Deploying infrastructure for driver_hourly_stats Deploying infrastructure for driver_hourly_stats_fresh
-
To use the helper CLI, first install Feast with the
cassandra
extra:pip install "feast[cassandra]"
-
Initialize a new feature repository with the
cassandra
template:feast init astraFeatures -t cassandra
-
When prompted, enter value for the following:
-
Secure Connect Bundle
: The path to your SCB zip file -
Client ID
:token
-
Client Secret
: Your application token -
keyspace
: A namespace in your database, such asdefault_keyspace
You can press kdb:[n] to use the default options for the other settings.
Regular [C]assandra or [A]stra DB? [C]: A Enter the full path to your Secure Connect Bundle: /home/mary/downloads/secure-connect.zip Enter the Client ID from your Astra DB token: token Enter the Client Secret from your Astra DB token: AstraCS:... Specify the keyspace to use [feast_keyspace]: default_keyspace Specify protocol version? [y/N]: n Specify load-balancing? [y/N]: n Specify concurrency levels? [y/N]: n Creating a new Feast repository in /home/mary/coding/feast/astraFeatures
-
-
Change directory to the new repository, and then initialize Feast. With the
apply
command, features defined in Python modules (in this case,example.py
) are scanned and used for actual deployment of the infrastructure.cd astraFeatures/feature_repo feast apply
Feast creates new tables in your namespace:
Deploying infrastructure for driver_hourly_stats Deploying infrastructure for driver_hourly_stats_fresh
Generate training data
Use Feast’s get_historical_features
store method to scan the offline source data and perform a point-in-time join.
This method constructs the features requested up to a specified timestamp.
-
Create a
generate.py
script that generates a schema for the feature:generate.pyfrom datetime import datetime, timedelta import pandas as pd from feast import FeatureStore # The entity dataframe is the dataframe we want to enrich with feature values entity_df = pd.DataFrame.from_dict( { # entity's join key -> entity values "driver_id": [1001, 1002, 1003], # label name -> label values "label_driver_reported_satisfaction": [1, 5, 3], # "event_timestamp" (reserved key) -> timestamps "event_timestamp": [ datetime.now() - timedelta(minutes=11), datetime.now() - timedelta(minutes=36), datetime.now() - timedelta(minutes=73), ], } ) store = FeatureStore(repo_path=".") training_df = store.get_historical_features( entity_df=entity_df, features=[ "driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate", "driver_hourly_stats:avg_daily_trips", ], ).to_df() print("----- Feature schema -----\n") print(training_df.info()) print() print("----- Example features -----\n") print(training_df.head())
-
Run the script:
python generate.py
The script generates a schema for the feature:
----- Feature schema ----- <class 'pandas.core.frame.DataFrame'> RangeIndex: 3 entries, 0 to 2 Data columns (total 6 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 driver_id 3 non-null int64 1 label_driver_reported_satisfaction 3 non-null int64 2 event_timestamp 3 non-null datetime64[ns, UTC] 3 conv_rate 3 non-null float32 4 acc_rate 3 non-null float32 5 avg_daily_trips 3 non-null int32 dtypes: datetime64[ns, UTC](1), float32(2), int32(1), int64(2) memory usage: 236.0 bytes None ----- Example features ----- driver_id label_driver_reported_satisfaction event_timestamp conv_rate acc_rate avg_daily_trips 0 1002 5 2024-07-01 14:27:40.786684+00:00 0.241222 0.193434 634 1 1001 1 2024-07-01 14:52:40.786677+00:00 0.791722 0.200128 583 2 1003 3 2024-07-01 13:50:40.786685+00:00 0.505599 0.828616 976
-
Declare a
CURRENT_TIME
environment variable, and then reference it with Feast’smaterialize-incremental
command in the terminal:CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S") feast materialize-incremental $CURRENT_TIME
Feast carries the latest feature values over to the online store for quick access during feature serving:
Materializing 2 feature views to 2024-07-01 15:04:32-04:00 into the cassandra online store. driver_hourly_stats from 2024-06-30 19:04:40-04:00 to 2024-07-01 15:04:32-04:00: 0%| | 0/5 [00:00<?, ?it/s]07/01/2024 03:04:41 PM cassandra.cluster 6it [00:02, 2.55it/s] driver_hourly_stats_fresh from 2024-06-30 19:04:43-04:00 to 2024-07-01 15:04:32-04:00: 6it [00:00, 38.05it/s]
-
In the Astra Portal, inspect the Astra DB table to confirm that the rows were added.
Fetch feature vectors from the online store
Use the get_online_features
store method to query the online store.
-
Create a
fetch_online.py
script that returns the feature vectors from the most recentmaterialize
operation:fetch-online.pyfrom pprint import pprint from feast import FeatureStore store = FeatureStore(repo_path=".") feature_vector = store.get_online_features( features=[ "driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate", "driver_hourly_stats:avg_daily_trips", ], entity_rows=[ # {join_key: entity_value} {"driver_id": 1004}, {"driver_id": 1005}, ], ).to_dict() pprint(feature_vector)
-
Run the script:
python fetch_online.py
The script returns the latest feature vectors from the online store:
{'acc_rate': [0.8428952693939209, 0.7182396054267883], 'avg_daily_trips': [815, 467], 'conv_rate': [0.5581942796707153, 0.8678529858589172], 'driver_id': [1004, 1005]}