Develop applications with FastAPI and Astra DB Classic
FastAPI is a modern, performant, web framework for building APIs with Python.
This guide provides best practices and examples for using Astra databases in your FastAPI applications. The Apache Cassandra® Python driver handles the database connection and CQL query execution.
This guide uses only plain CQL statements. It doesn’t use object mappers.
For more information about storage engines in FastAPI applications, see the FastAPI databases documentation.
FastAPI and the Cassandra Python driver are designed for use with fixed-schema tables.
Prerequisites
-
Familiarity with FastAPI and Python
-
An Astra DB Classic database with at least one keyspace
You can manually create the tables for your FastAPI application, or you can run a script to automatically create the required tables before starting your application for the first time. For an example, see the sample application.
-
Your database’s Secure Connect Bundle (SCB)
Try the sample application
If you prefer to try FastAPI with Astra in the context of a sample application, download and extract the sample Astra FastAPI application.
For setup and usage instructions, see the sample application’s README.md.
Snippets of this sample application are used throughout this guide to illustrate key concepts.
Connect and use an Astra database in a FastAPI application
The following sections explain requirements and best practices for configuring an Astra connection in your FastAPI application.
Install dependencies
FastAPI applications backed by Astra require the following packages at minimum:
-
FastAPI -
uvicornor another ASGI server program
In addition to the required dependencies, your application likely has additional dependencies.
For example, the sample application’s requirements.txt file also declares the python-dotenv package, which is used by that project to read secrets from a .env file.
Set environment variables
Set connection parameters in environment variables, using secure references and following industry best practices for credential management:
export ASTRA_DB_CLIENT_ID="token"
export ASTRA_DB_CLIENT_SECRET="AstraCS:..."
export ASTRA_DB_SECURE_BUNDLE_PATH="/path/to/scb.zip"
export ASTRA_DB_KEYSPACE="default_keyspace"
Provide the following:
-
ASTRA_DB_CLIENT_ID: The literal stringtoken -
ASTRA_DB_CLIENT_SECRET: An Astra application token with access to the database that you want to use in your FastAPI application -
ASTRA_DB_SECURE_BUNDLE_PATH: The full path to your database’s Secure Connect Bundle (SCB) -
ASTRA_DB_KEYSPACE: The name of the keyspace within your database that you want to use in your FastAPI application
For more information about these values, see the Prerequisites.
Create the Session object
The following connection script from the sample application shows how to connect to your Astra database using the Cassandra Python driver.
The cassandra.cluster.Session object represents the driver’s connection to the database.
Once established, the Session is used throughout the application to execute CQL queries.
Following best practices for drivers, the same Session object is reused throughout the application.
The Cluster and PlainTextAuthProvider imports at the top of the script are necessary to create the Session.
import os
import atexit
from dotenv import load_dotenv, find_dotenv
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
# read .env file for connection params
dotenv_file = find_dotenv('.env')
load_dotenv(dotenv_file)
ASTRA_DB_CLIENT_ID = os.environ['ASTRA_DB_CLIENT_ID']
ASTRA_DB_CLIENT_SECRET = os.environ['ASTRA_DB_CLIENT_SECRET']
ASTRA_DB_SECURE_BUNDLE_PATH = os.environ['ASTRA_DB_SECURE_BUNDLE_PATH']
ASTRA_DB_KEYSPACE = os.environ['ASTRA_DB_KEYSPACE']
# global cache variables to re-use a single Session
cluster = None
session = None
def get_session():
"""
Return the database Session, always the same.
If no Session has been created yet, create it and store it for later calls.
"""
global session
global cluster
if session is None:
print('[get_session] Creating session')
cluster = Cluster(
cloud={
'secure_connect_bundle': ASTRA_DB_SECURE_BUNDLE_PATH,
},
auth_provider=PlainTextAuthProvider(
ASTRA_DB_CLIENT_ID,
ASTRA_DB_CLIENT_SECRET,
),
)
session = cluster.connect(ASTRA_DB_KEYSPACE)
else:
print('[get_session] Reusing session')
return session
@atexit.register
def shutdown_driver():
if session is not None:
print('[shutdown_driver] Closing connection')
cluster.shutdown()
session.shutdown()
The last lines of the db_connect.py script illustrate how to close the session and cluster to properly clean up the driver resources when the application is shutting down.
Although this doesn’t strictly apply to FastAPI applications that run continuously, it is demonstrated here for the sake of completeness.
Make the Session available to FastAPI endpoints
As the application’s endpoints interact with the database, they reuse the Session object through an adaptation of the get_session() function declared in the db_connect.py script.
This function provides the globally-cached Session object when called.
When first invoked, the function creates the Session object idiomatically using the necessary connection parameters.
To make the Session available to FastAPI endpoint functions, you can use FastAPI’s Depends dependency-injection feature in your endpoint functions.
For example, in the sample application, the definition of each endpoint function includes the argument session=Depends(g_get_session).
The dependency is automatically resolved by FastAPI when the endpoint is called, making the Session available to the endpoint function.
@app.get('/animal/{genus}')
async def get_animals(genus, session=Depends(g_get_session)):
animals = retrieve_animals_by_genus(session, genus)
return animals
Depends takes a callable as an argument.
In the case of the sample application, a function is passed to Depends: Specifically, this is an async generator that can yield the Session object to the endpoint function that triggered Depends.
This is a thin wrapper around the get_session() function that converts it to a generator (yield) with a desired signature:
from storage.db_connect import get_session
async def g_get_session():
yield get_session()
|
Be sure to import For example, the sample application passes api.py
Additional imports are needed for other parts of the code.
The preceding example shows only the imports for passing the |
The get_session() function and the supporting use of Depends(g_get_session) are written specifically for the sample application, but you can adapt them to your own FastAPI applications as needed.
For more examples of the Depends pattern, see the sample application.
Recommendations for FastAPI applications with Astra
The following recommendations can help avoid errors and improve performance of CQL queries in your FastAPI applications.
Automatically create the required tables
The following initialization script from the sample application automatically creates the required tables in your database if they don’t already exist.
This script also serves to test the connection to your database because it uses the get_session() function from the db_connect.py script.
A similar script can be useful for your own FastAPI applications to ensure that the minimum required schema is in place before your application starts serving requests.
db_initialize.py
from db_connect import get_session
from cassandra.query import BatchStatement
INIT_CQL_A = '''
CREATE TABLE IF NOT EXISTS animals (
genus TEXT,
species TEXT,
image_url TEXT,
size_cm FLOAT,
sightings INT,
taxonomy LIST<TEXT>,
PRIMARY KEY ((genus), species)
);
'''
INIT_CQL_P = '''
CREATE TABLE IF NOT EXISTS plants (
genus TEXT,
species TEXT,
sightings INT,
PRIMARY KEY ((genus), species)
);
'''
POPULATE_CQL_0 = '''
INSERT INTO animals (
genus,
species,
image_url,
size_cm,
sightings,
taxonomy
) VALUES (
'Vanessa',
'cardui',
'https://imgur.com/WrPsKkD',
5.5,
12,
['Arthropoda', 'Insecta', 'Lepidoptera', 'Nymphalidae']
);
'''
POPULATE_CQL_1 = '''
INSERT INTO animals (
genus,
species,
image_url,
size_cm,
sightings,
taxonomy
) VALUES (
'Vanessa',
'atalanta',
'https://imgur.com/2fSEnt1',
4.8,
43,
['Arthropoda', 'Insecta', 'Lepidoptera', 'Nymphalidae']
);
'''
POPULATE_CQL_2 = '''
INSERT INTO animals (
genus,
species,
image_url,
size_cm,
sightings,
taxonomy
) VALUES (
'Saitis',
'barbipes',
'https://imgur.com/coVy27e',
0.6,
4,
['Arthropoda', 'Arachnida', 'Aranea', 'Salticidae']
);
'''
PLANTAIN_SPECIES = [
'afra',
'africana',
'aitchisonii',
'alpina',
'amplexicaulis',
'arborescens',
'arenaria',
'argentea',
'aristata',
'asiatica',
'aucklandica',
'bigelovii',
'canescens',
'coreana',
'cordata',
'coronopus',
'cornuti',
'cretica',
'cynops',
'debilis',
'elongata',
'erecta',
'eriopoda',
'erosa',
'fernandezia',
'fischeri',
'gentianoides',
'glabrifolia',
'grayana',
'hawaiensis',
'hedleyi',
'helleri',
'heterophylla',
'hillebrandii',
'himalaica',
'holosteum',
'hookeriana',
'incisa',
'indica',
'krajinai',
'lagopus',
'lanceolata',
'lanigera',
'leiopetala',
'longissima',
'macrocarpa',
'major',
'maritima',
'maxima',
'media',
'melanochrous',
'moorei',
'musicola',
'nivalis',
'nubicola',
'obconica',
'ovata',
'pachyphylla',
'palmata',
'patagonica',
'polysperma',
'princeps',
'purshii',
'pusilla',
'psyllium',
'raoulii',
'rapensis',
'remota',
'reniformis',
'rhodosperma',
'rigida',
'robusta',
'rugelii',
'rupicola',
'schneideri',
'sempervirens',
'sparsiflora',
'spathulata',
'subnuda',
'tanalensis',
'taqueti',
'tenuiflora',
'triandra',
'triantha',
'tweedyi',
'virginica',
'winteri',
'wrightiana',
]
MINIMAL_INSERT_CQL = 'INSERT INTO plants (genus, species, sightings) VALUES (?, ?, ?);'
def init_db():
session = get_session()
print('[init_db] Running init scripts')
session.execute(INIT_CQL_A)
session.execute(POPULATE_CQL_0)
session.execute(POPULATE_CQL_1)
session.execute(POPULATE_CQL_2)
session.execute(INIT_CQL_P)
minimal_insert = session.prepare(MINIMAL_INSERT_CQL)
batch = BatchStatement()
for idx, species in enumerate(PLANTAIN_SPECIES):
# we just scramble the numbers for fun
batch.add(minimal_insert, ('Plantago', species, 1 + (idx) % 5 + (idx + 5) % 3))
session.execute(batch)
print('[init_db] Init script finished')
if __name__ == '__main__':
init_db()
Use prepared statements
As a best practice with FastAPI, your endpoint functions should focus on handling the request-response cycle, delegating any supporting logic to separate functions or modules.
In the sample application, each endpoint function invokes a separate function in db_io.py that handles the actual database operations:
from storage.db_io import (
store_animal,
retrieve_animal,
retrieve_animals_by_genus,
generator_retrieve_plant_by_genus,
)
Typically, an application’s endpoints are called many times. For this reason, it can be beneficial to use prepared statements for the corresponding CQL queries. Prepared statements are queries that you can run multiple times with different parameters. You define the statement once, and then your application can call the prepared statement as needed, passing unique parameters to each execution.
|
Prepared statements aren’t appropriate for every query, even repeated queries. For more information, see Prepared statements with Cassandra drivers. |
In the sample application, the db_io.py module holds a cache of prepared statements: One for each type of database query.
This cache (prepared_cache) is populated on the first invocation of each endpoint.
The first call can be slightly slower while the prepared statement is created and stored in the cache.
However, performance can improve significantly on subsequent calls that reuse the cached prepared statement, if prepared statements are used appropriately across the application.
The following truncated example shows the prepared_cache, the get_prepared_statement() function that manages it, and database functions using the cached prepared statements:
prepared_cache = {}
def get_prepared_statement(session, stmt):
if stmt not in prepared_cache:
print(f'[get_prepared_statement] Preparing statement "{stmt}"')
prepared_cache[stmt] = session.prepare(stmt)
return prepared_cache[stmt]
def store_animal(session, animal):
store_cql = 'INSERT INTO animals (genus,species,image_url,size_cm,sightings,taxonomy) VALUES (?,?,?,?,?,?);'
prepared_store = get_prepared_statement(session, store_cql)
session.execute(
prepared_store,
(
animal.genus,
animal.species,
animal.image_url,
animal.size_cm,
animal.sightings,
animal.taxonomy,
),
)
def retrieve_animal(session, genus, species):
get_one_cql = 'SELECT * FROM animals WHERE genus=? AND species=?;'
prepared_get_one = get_prepared_statement(session, get_one_cql)
row = session.execute(prepared_get_one, (genus, species)).one()
if row:
return Animal(**row._asdict())
else:
return row
def retrieve_animals_by_genus(session, genus):
get_many_cql = 'SELECT * FROM animals WHERE genus=?;'
prepared_get_many = get_prepared_statement(session, get_many_cql)
rows = session.execute(prepared_get_many, (genus,))
return (
Animal(**row._asdict())
for row in rows
)
Stream and reconstruct large responses
In some cases, your endpoints might return large responses, such as a GET that returns a long list of items.
It can be unwieldy and suboptimal to retrieve the full list at the API level, and then prepare a whole response string to return to the caller.
Instead, it is preferable to start sending out the response as soon as possible, even while the data is still being retrieved from the database.
FastAPI makes this possible through generators/iterators and streaming responses.
The Cassandra Python driver handles pagination of large result sets transparently. Regardless of the actual grouping of rows into pages, you see only a homogeneous iterable over all rows at the Python code level. This means that you can make a CQL query function into a FastAPI-compatible generator with few code changes.
As shown in the following example, the generator_retrieve_plant_by_genus() function from the sample application is a generator.
It executes a query, and then produces a response that can be streamed.
Aside from the hint in the function name and the use of yield, the body of the query is the same as the regular, non-generator version of the same query.
# Generator read
def generator_retrieve_plant_by_genus(session, genus):
get_many_cql = 'SELECT * FROM plants WHERE genus=?;'
prepared_get_many = get_prepared_statement(session, get_many_cql)
rows = session.execute(prepared_get_many, (genus,))
for row in rows:
yield Plant(**row._asdict())
# Non-generator read
def retrieve_plants_by_genus(session, genus):
get_many_cql = 'SELECT * FROM plants WHERE genus=?;'
prepared_get_many = get_prepared_statement(session, get_many_cql)
rows = session.execute(prepared_get_many, (genus,))
return (
Plant(**row._asdict())
for row in rows
)
However, the interaction between the endpoint function and the caller is more complex and different than non-streamed responses.
FastAPI’s StreamingResponse construct makes it possible to consume a generator, returning its components as a Chunked response that is then sent to the caller piece-by-piece.
This means that, while the Cassandra driver handles pagination on the database side and streams the results as an iterator, the API code, on the response side, must also stream the response piece-by-piece using FastAPI’s StreamingResponse:
@app.get('/plant/{genus}')
async def get_plant(genus, session=Depends(g_get_session)):
plants = generator_retrieve_plant_by_genus(session, genus)
return StreamingResponse(
format_streaming_response(plants),
media_type='application/json',
)
As the response pieces are generated, your endpoint function must manually construct a syntactically valid response from the pieces produced by the generator. How you do this depends on the content and format of the streamed response and the format that the caller expects.
For example, the sample application uses a format_streaming_response() endpoint function to handle the square brackets and commas needed to build a valid JSON list from the individual items produced by the generator.
This function is, itself, also a generator.
Effectively, it consumes the results of the first generator and outputs a stream of well-formed JSON pieces that can be sent to the caller.
def format_streaming_response(iterable):
yield '['
for index, item in enumerate(iterable):
yield '%s%s' % (
'' if index == 0 else ',',
# the `.json()` method is available for Pydantic models
# and it is equivalent to calling `json.dumps(some_dict)`.
item.json(),
)
yield ']'
Despite being iterated multiple times, the client still receives the full response, and it can start processing it once the full response is present. The benefit of streaming the response is that you avoid storing the entire bulk of the response in memory on the API side for the duration of the request.
|
In the module where you define your read/write endpoint functions, make sure your imports include
|
Pydantic usage
FastAPI is fully compatible with Pydantic. As shown in the sample application, you can define Pydantic models to represent your table schemas:
from pydantic import BaseModel
from typing import List
class Animal(BaseModel):
genus: str
species: str
image_url: str
size_cm: float
sightings: int
taxonomy: List[str]
class Plant(BaseModel):
genus: str
species: str
sightings: int