Integrate Feast with Astra DB Serverless
Feast is an Apache-licensed open-source feature store for machine learning.
Starting with version 0.24, the Feast online store for Apache Cassandra® supports both Cassandra and Astra DB Serverless.
Prerequisites
-
An active Serverless (Vector) database
-
An application token with the Database Administrator role
-
The Secure Connect Bundle (SCB) for your database
Install Feast
Feast can be installed manually or with the helper CLI.
-
Install Feast manually
-
Install Feast with the helper CLI
-
Install Feast with the
cassandraextra:pip install "feast[cassandra]" -
Initialize a new feature repository:
feast init astraFeatures cd astraFeatures/feature_repo -
Open the store configuration file
feature_store.yaml, and then replace theonline_storesection with your database’s values:feature_store.yamlonline_store: type: cassandra secure_bundle_path: PATH/TO/SCB.zip username: token password: APPLICATION_TOKEN keyspace: KEYSPACE_NAME -
Use the values in
feature-store.yamlto initialize Feast. With theapplycommand, features defined in Python modules (in this case,example.py) are scanned and used for actual deployment of the infrastructure.feast applyFeast creates new tables in your keyspace:
Deploying infrastructure for driver_hourly_stats Deploying infrastructure for driver_hourly_stats_fresh
-
To use the helper CLI, first install Feast with the
cassandraextra:pip install "feast[cassandra]" -
Initialize a new feature repository with the
cassandratemplate:feast init astraFeatures -t cassandra -
When prompted, enter value for the following:
-
Secure Connect Bundle: The path to your SCB zip file -
Client ID:token -
Client Secret: Your application token -
keyspace: A keyspace in your database, such asdefault_keyspaceYou can press kdb:[n] to use the default options for the other settings.
Regular [C]assandra or [A]stra DB? [C]: A Enter the full path to your Secure Connect Bundle: /home/mary/downloads/secure-connect.zip Enter the Client ID from your Astra DB token: token Enter the Client Secret from your Astra DB token: AstraCS:... Specify the keyspace to use [feast_keyspace]: default_keyspace Specify protocol version? [y/N]: n Specify load-balancing? [y/N]: n Specify concurrency levels? [y/N]: n Creating a new Feast repository in /home/mary/coding/feast/astraFeatures
-
-
Change directory to the new repository, and then initialize Feast. With the
applycommand, features defined in Python modules (in this case,example.py) are scanned and used for actual deployment of the infrastructure.cd astraFeatures/feature_repo feast applyFeast creates new tables in your keyspace:
Deploying infrastructure for driver_hourly_stats Deploying infrastructure for driver_hourly_stats_fresh
Generate training data
Use Feast’s get_historical_features store method to scan the offline source data and perform a point-in-time join.
This method constructs the features requested up to a specified timestamp.
-
Create a
generate.pyscript that generates a schema for the feature:generate.pyfrom datetime import datetime, timedelta import pandas as pd from feast import FeatureStore # The entity dataframe is the dataframe we want to enrich with feature values entity_df = pd.DataFrame.from_dict( { # entity's join key -> entity values "driver_id": [1001, 1002, 1003], # label name -> label values "label_driver_reported_satisfaction": [1, 5, 3], # "event_timestamp" (reserved key) -> timestamps "event_timestamp": [ datetime.now() - timedelta(minutes=11), datetime.now() - timedelta(minutes=36), datetime.now() - timedelta(minutes=73), ], } ) store = FeatureStore(repo_path=".") training_df = store.get_historical_features( entity_df=entity_df, features=[ "driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate", "driver_hourly_stats:avg_daily_trips", ], ).to_df() print("----- Feature schema -----\n") print(training_df.info()) print() print("----- Example features -----\n") print(training_df.head()) -
Run the script:
python generate.pyThe script generates a schema for the feature:
----- Feature schema ----- <class 'pandas.core.frame.DataFrame'> RangeIndex: 3 entries, 0 to 2 Data columns (total 6 columns): # Column Non-Null Count Dtype --- ------ -------------- ----- 0 driver_id 3 non-null int64 1 label_driver_reported_satisfaction 3 non-null int64 2 event_timestamp 3 non-null datetime64[ns, UTC] 3 conv_rate 3 non-null float32 4 acc_rate 3 non-null float32 5 avg_daily_trips 3 non-null int32 dtypes: datetime64[ns, UTC](1), float32(2), int32(1), int64(2) memory usage: 236.0 bytes None ----- Example features ----- driver_id label_driver_reported_satisfaction event_timestamp conv_rate acc_rate avg_daily_trips 0 1002 5 2024-07-01 14:27:40.786684+00:00 0.241222 0.193434 634 1 1001 1 2024-07-01 14:52:40.786677+00:00 0.791722 0.200128 583 2 1003 3 2024-07-01 13:50:40.786685+00:00 0.505599 0.828616 976 -
Declare a
CURRENT_TIMEenvironment variable, and then reference it with Feast’smaterialize-incrementalcommand in the terminal:CURRENT_TIME=$(date -u +"%Y-%m-%dT%H:%M:%S") feast materialize-incremental $CURRENT_TIMEFeast carries the latest feature values over to the online store for quick access during feature serving:
Materializing 2 feature views to 2024-07-01 15:04:32-04:00 into the cassandra online store. driver_hourly_stats from 2024-06-30 19:04:40-04:00 to 2024-07-01 15:04:32-04:00: 0%| | 0/5 [00:00<?, ?it/s]07/01/2024 03:04:41 PM cassandra.cluster 6it [00:02, 2.55it/s] driver_hourly_stats_fresh from 2024-06-30 19:04:43-04:00 to 2024-07-01 15:04:32-04:00: 6it [00:00, 38.05it/s] -
In the Astra Portal, inspect the Astra DB table to confirm that the rows were added.
Fetch feature vectors from the online store
Use the get_online_features store method to query the online store.
-
Create a
fetch_online.pyscript that returns the feature vectors from the most recentmaterializeoperation:fetch-online.pyfrom pprint import pprint from feast import FeatureStore store = FeatureStore(repo_path=".") feature_vector = store.get_online_features( features=[ "driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate", "driver_hourly_stats:avg_daily_trips", ], entity_rows=[ # {join_key: entity_value} {"driver_id": 1004}, {"driver_id": 1005}, ], ).to_dict() pprint(feature_vector) -
Run the script:
python fetch_online.pyThe script returns the latest feature vectors from the online store:
{'acc_rate': [0.8428952693939209, 0.7182396054267883], 'avg_daily_trips': [815, 467], 'conv_rate': [0.5581942796707153, 0.8678529858589172], 'driver_id': [1004, 1005]}