Python client usage
This page provides language-specific guidance for using the Data API Python client.
For information about installing and getting started with the Python client, see Get started with the Data API.
Client hierarchy
When you create apps using the Data API clients, you must instantiate a DataAPIClient
object.
The DataAPIClient
object serves as the entry point to the client hierarchy. It includes the following concepts:
Adjacent to these concepts are the administration classes for database administration. The specific administration classes you use, and how you instantiate them, depends on your client language and database type (Astra DB, HCD, or DSE).
-
-
AstraDbDatabaseAdmin
orDataAPIDatabaseAdmin
-
You directly instantiate the DataAPIClient
object only.
Then, through the DataAPIClient
object, you can instantiate and access other classes and concepts.
Where necessary, instructions for instantiating other classes are provided in the command reference relevant to each class.
For instructions for instantiating the DataAPIClient
object, see Instantiate a client object.
DataAPIVector
This option requires client version 2.0-preview or later. For more information, see Data API client upgrade guide. |
The astrapy.data_types.DataAPIVector
class is the preferred object to represent and encode
vectors when interacting with the Data API using the Python client.
A DataAPIVector
is a wrapper around a list of float numbers, and supports the basic
access patterns of the equivalent list:
from astrapy.data_types import DataAPIVector
# Initialize a vector
vector = DataAPIVector([0.1, -0.2, 0.3])
# Access a component by index
print(vector[1])
# Loop over the vector components
for x in vector:
print(x)
# Compute the vector Euclidean norm
print(sum(x*x for x in vector)**0.5)
You can always use plain lists of numbers where a vector is expected.
However, when writing vectors to a table,
using a DataAPIVector
ensures that the vector
data is encoded using the faster, more efficient binary format. This results in a more
performant insertion, especially when inserting multiple rows. For collections,
regardless of the representation, binary encoding is the default serialization format.
Similarly, during read operations, vectors are by default returned as DataAPIVector
objects. The following code example assumes default APIOptions for all involved objects:
# When inserting to a collection, binary encoding is always used
collection.insert_one({"$vector": DataAPIVector([1, 2, 3])})
collection.insert_one({"$vector": [4, 5, 6]})
# When reading from a collection, DataAPIVector is always returned
# The following outputs (reformatted for clarity):
# [
# {'$vector': DataAPIVector([4.0, 5.0, 6.0])},
# {'$vector': DataAPIVector([1.0, 2.0, 3.0])}
# ]
collection.find({}, projection={"_id": False, "$vector": True}).to_list()
# When inserting to a table, binary encoding is only used with DataAPIVector
my_table.insert_one({'primary_column': 'A', 'vector_column': DataAPIVector([9, 8, 7])})
my_table.insert_one({'primary_column': 'B', 'vector_column': [6, 5, 4]})
# When reading from a table, DataAPIVector is always returned
# The following outputs (reformatted for clarity):
# [
# {'primary_column': 'B', 'vector_column': DataAPIVector([6.0, 5.0, 4.0])},
# {'primary_column': 'A', 'vector_column': DataAPIVector([9.0, 8.0, 7.0])}
# ]
my_table.find({}).to_list()
See Serdes Options and Custom Data Types for ways to change the default behavior of the client regarding usage of DataAPIVectors.
Client custom data types
This option requires client version 2.0-preview or later. For more information, see Data API client upgrade guide. |
The Python client comes with its own set of data types to augment or replace the standard library classes. This makes it possible to more accurately model the contents of certain column types in tables.
-
When reading from a table, custom classes are preferred by default. For more details on how to configure a table differently (and the limitations associated with this choice), see Serdes Options and Custom Data Types.
-
When writing to a table, you can use both standard-library and custom data types in most cases.
The following list summarizes the custom data types available.
Data type | Replaces | Remarks and Example |
---|---|---|
|
|
See DataAPIVector for more information.
|
|
|
See DataAPITimestamp and Datetimes for more information, especially on the distinction between naive and aware datetimes.
|
|
|
|
|
|
|
|
|
Durations are not a well-defined span of time, and considering a duration to be a Objects can be created from duration strings in both the ISO-8601 and the Apache Cassandra® formats.
|
|
|
|
|
|
|
APIOptions
This option requires client version 2.0-preview or later. For more information, see Data API client upgrade guide. |
You can customize many ways that the Python client interacts with the API. For example, you can customize timeouts, serialization and deserialization, and authentication parameters.
Commonly-used parameters (such as the authentication token when instantiating a DataAPIClient
or a Database
object) are available as named method parameters. For other parameters, you can use the "API Options" (class astrapy.api_options.APIOptions
) to adjust the client behavior.
Each object in the
abstraction hierarchy (DataAPIClient
, Database
, Table
, Collection
, and other classes)
has options that determine how the object behaves.
In order to customize the behavior from the preset defaults, you should create
an APIOptions
object and either:
-
pass it as the
api_options
argument to the DataAPIClient constructor or any of the.with_options
and.to_sync
/.to_async
methods, to get a new instance of an object with some settings changed. -
pass it as the
spawn_api_options
argument to "spawning methods", such asget_collection
,create_collection
,get_table
,create_table
,get_database
,create_database
, orget_database_admin
to set these option overrides for the returned object.
The APIOptions
object passed as argument can define zero, some or all of its
members. The database_additional_headers
, admin_additional_headers
, and redacted_header_names
parameters are merged with the inherited ones. Any other specified parameters will override the inherited value. If an override is provided (even if it is None
), it completely replaces the inherited value. Any unspecified options will be unchanged.
The structure of APIOptions is one and the same throughout the object hierarchy.
This makes it possible to set, for example, a serialization option for reading
from collections at the Database
level, so that each Collection
spawned from the Database
will have the desired behavior. However, this makes it also possible to set an option that has no effect on the object.
Parameters of the APIOptions
object constructor
Name | Type | Summary |
---|---|---|
|
|
An identifier for the environment for the Data API. This can describe an Astra DB environment (such as the default of "prod"), or a self-deployed setup (such as "dse" or "hcd"). This setting cannot be overridden through customization: it can only be provided when creating the |
|
|
An iterable of "caller identities" to be used in identifying the caller through the User-Agent header when issuing requests to the Data API. Each caller identity is a |
|
|
A free-form dictionary of additional headers to employ when issuing requests to the Data API from |
|
|
A free-form dictionary of additional headers to employ when issuing requests to both the Data API and the DevOps API from |
|
|
A set of case-insensitive strings denoting the headers that contain secrets. These headers will be masked when logging request details. |
|
|
An instance of |
|
|
An instance of |
|
|
An instance of |
|
|
an instance of |
|
|
An instance of |
|
|
An instance of |
Here is an example script demonstrating customization of some API Options settings for various client objects:
from astrapy import DataAPIClient
from astrapy.api_options import (
APIOptions,
SerdesOptions,
TimeoutOptions,
)
from astrapy.authentication import (
StaticTokenProvider,
AWSEmbeddingHeadersProvider,
)
# Disable custom datatypes in all reads:
no_cdt_options = APIOptions(
serdes_options=SerdesOptions(
custom_datatypes_in_reading=False,
)
)
my_client = DataAPIClient(api_options=no_cdt_options)
# These spawned objects inherit that setting:
my_database = my_client.get_database(
"https://...",
token="my-token-1",
)
my_table = my_database.get_table("my_table")
# Make a copy of table
with some redefined timeouts
# and a certain header-based authentication for its vectorize provider:
my_table_timeouts = TimeoutOptions(
request_timeout_ms=15000,
general_method_timeout_ms=30000,
table_admin_timeout_ms=120000,
)
my_table_apikey_provider = AWSEmbeddingHeadersProvider(
embedding_access_id="my-access-id",
embedding_secret_id="my-secret-id",
)
my_table_slow_copy = my_table.with_options(
api_options=APIOptions(
embedding_api_key=my_table_apikey_provider,
timeout_options=my_table_timeouts,
),
)
# Create another 'Database' with a different auth token
# (for get_database, the 'token=' shorthand shown above does the same):
my_other_database = my_client.get_database(
"https://...",
spawn_api_options=APIOptions(
token="my-token-2",
),
)
# Spawn a collection from a database and set it to use
# another token and a different policy with decimals:
my_other_table = my_database.get_collection(
"my_other_table",
spawn_api_options=APIOptions(
token="my-token-3",
serdes_options=SerdesOptions(
use_decimals_in_collections=True,
)
),
)
Serdes options and custom data types
This option requires client version 2.0-preview or later. For more information, see Data API client upgrade guide. |
One of the attributes of the API Options is serdes_options
, a value of type
SerdesOptions
. This option controls complementary processes of
translating Python data for rows and documents into a Data API payload,
and of converting a Data API response back into the appropriate Python objects.
The flags collected in SerdesOptions
have two roles:
-
Write-path settings affect how values are encoded in the JSON payload to the API
-
Read-path settings determine the choice of data types used to represent the values found in the API responses when a method returns data.
When creating a SerdesOptions
object for customizing the client behavior,
there are no mandatory arguments. Any attribute that is not specified inherits
the corresponding setting from its parent, or spawner, class.
Parameters of the SerdesOptions
object constructor:
Name | Type | Summary |
---|---|---|
|
|
Write-path. Whether to encode vectors using the faster, more efficient binary encoding as opposed to sending plain lists of numbers. For tables, this affects vectors passed as instances of |
|
|
Read-path. Whether return values from read methods should use the client’s custom classes (default setting of |
|
|
Write-path. If this is set to |
|
|
Both read- and write-path. The
Before switching this setting to |
|
|
Write-path. Python datetimes can be either "naive" or "aware" of a timezone offset information. Only the latter type can be translated unambiguously and without implied assumptions into a well-defined timestamp. Because the Data API always stores timestamps, by default the client will raise an error if a write is attempted that uses a naive datetime. If this setting is changed to |
|
|
Read-path. When reading timestamps from tables or collections with the setting |
custom_datatypes_in_reading
details
The following table describes the returned data types depending on the custom_datatypes_in_reading
setting. All custom datatypes are in module astrapy.data_types
.
When True (default) |
When False |
Notes |
---|---|---|
|
|
No loss of expressivity or accuracy. |
|
|
The Python stdlib class covers a shorter year range (1AD to 9999AD). When receiving values outside of this range, the client may error. Also, depending on the year, the returned datetimes may not yield an exact |
|
|
Subject to the same limited year range as |
|
|
An approximation may occur since the standard-library class has microsecond precision (while the database stores up to nanosecond precision). |
|
|
Durations, as used on the database, are intrinsically different from |
|
|
Generally this is a safe recast. However, it could raise an error in future versions of the Data API should non-hashable data types (e.g. lists) be admitted as keys in |
|
|
Generally, this is a safe recast. |
DataAPITimestamp
and datetimes
This option requires client version 2.0-preview or later. For more information, see Data API client upgrade guide. |
The Python client accepts both standard-library datetimes and its own
DataAPITimestamp
object for writes.
Python datetimes can be either "naive" or "aware" of a timezone offset information. Only the latter type can be translated unambiguously and without implied assumptions into a well-defined timestamp. Because the Data API always stores timestamps, by default the client will raise an error if a write is attempted that uses a naive datetime.
It is best to ensure that all datetimes are timezone-aware. However, you can
switch to a more relaxed policy by ensuring that the APIOptions.serdes_options.accept_naive_datetimes
is set to True
. The internal conversions to a timestamp use the datetime’s .timestamp()
method, which uses the system locale implicitly. If a table or collection is shared by instances of the application running with different system locales, then this will affect the accuracy of the datetime.
naive_date = datetime.datetime(2024, 11, 30, 10, 30)
aware_date = datetime.datetime(
2024, 11, 30, 10, 30,
tzinfo=datetime.timezone.utc,
)
# This command gives an error (if my_collection has default APIOptions)
my_collection.insert_one(
{"mode": "naive", "event_date": naive_date}
)
# Conversely, this command succeeds:
my_collection.insert_one({"mode": "aware", "event_date": aware_date})
# You can change the settings and use naive datetimes in writes:
from astrapy.api_options import APIOptions, SerdesOptions
relaxed_collection = my_collection.with_options(
api_options=APIOptions(
serdes_options=SerdesOptions(accept_naive_datetimes=True)
)
)
# With the updated settings, this no longer errors
relaxed_collection.insert_one({"mode": "naive", "event_date": naive_date})
When reading timestamps from tables or collections through the Python client,
the default behavior is to return DataAPITimestamp
objects.
By changing the setting APIOptions.serdes_options.custom_datatypes_in_reading
to False
, ordinary datetime.datetime
objects are returned instead. Depending
on the value of APIOptions.serdes_options.datetime_tzinfo
, these datetimes
can be aware with the configured timezone (default is UTC) or not
(if datetime_tzinfo
is set to None
, which is not recommended).
# Default reading behavior:
# Returns DataAPITimestamp(timestamp_ms=1732959000000 [2024-11-30T09:30:00.000Z])
my_collection.find_one({"mode": "naive"})["event_date"]
# Switch to stdlib types and unset the timezone for returned datetimes:
from astrapy.api_options import APIOptions, SerdesOptions
stdlib_collection = my_collection.with_options(
api_options=APIOptions(
serdes_options=SerdesOptions(
custom_datatypes_in_reading=False,
datetime_tzinfo=None,
)
)
)
# Returns datetime.datetime(2024, 11, 30, 10, 30)
stdlib_collection.find_one({"mode": "naive"})["event_date"]
Timeout options
This option requires client version 2.0-preview or later. For more information, see Data API client upgrade guide. |
See Replacement of client timeout settings for tips on migrating from client version 1.x.
Use the timeout_options
attribute of the API Options to configure timeouts
for HTTP requests. The configuration will apply to both Data API and DevOps API operations.
Each operation is subject to multiple types of timeout. For example, the general_method_timeout_ms
timeout limits the overall duration of an insert_many
operation, and the request_timeout_ms
timeout limits each HTTP request that the operation performs.
In addition to setting the timeout behavior through the APIOptions for a certain object
(such as a Database
or a Table
), you can set a timeout when you invoke a method that issues an HTTP request.
The method docstring and signature will help determining the relevant timeout; however, all methods issuing
requests feature a timeout_ms
parameter, which is an alias for the appropriate timeout.
All timeout values are expressed with an integer number of milliseconds. A timeout of zero means that the timeout is disabled, but the associated operations may still have to obey other timeouts to limit their duration.
If a timeout occurs, an exception of type astrapy.exceptions.DataAPITimeoutException
is raised.
The error object contains contextual information that may help determining how to resolve the problem.
Parameters of the TimeoutOptions
object constructor
Name | Type | Summary |
---|---|---|
|
|
The timeout imposed on a single HTTP request. This is applied to all HTTP requests to both the Data API and the DevOps API. The |
|
|
A timeout to use on the overall duration of a method invocation. This is valid for data management methods which are not concerned with schema or admin operations. For methods that include a single HTTP request (such as For methods that possibly include several HTTP requests (such as Defaults to 30 s. |
|
|
A timeout for all collection-related schema and admin operations, such as creating, dropping, and listing collections. With the exception of collection creation, each individual request are also limited by |
|
|
A timeout for all table-related schema and admin operations such as creating, altering, dropping, and listing tables or indexes. Each individual request is also limited by |
|
|
A timeout for all database-related admin operations, such as creating, dropping and listing databases, getting database info, and querying for the available embedding providers. The longest-running operations in this class are the creation and the destruction of a database. If called with the Each individual request is also limited by |
|
|
A timeout for all keyspace-related admin operations, such as creating, altering dropping, and listing keyspaces. Each individual request is also limited by |
FindCursor
This option requires client version 2.0-preview or later. For more information, see Data API client upgrade guide. |
Every time a find
command is called on a Table or a Collection, a FindCursor
object is returned.
FindCursor
objects (in their subclasses TableFindCursor
and CollectionFindCursor
)
represent a lazy stream of results and implements an iterable interface that manages
progressive retrieval of new results (pagination).
The basic usage pattern is that of consuming the cursor item by item, as demonstrated here:
cursor = my_table.find(
{"match_id": "challenge6"},
projection={"winner": True},
limit=3,
)
for row in cursor:
print(row)
# Output:
# {'winner': 'Donna'}
# {'winner': 'Erick'}
# {'winner': 'Fiona'}
rows = my_table.find(
{"match_id": "challenge6"},
projection={"winner": True},
limit=3,
).to_list()
print(rows)
# [{'winner': 'Donna'}, {'winner': 'Erick'}, {'winner': 'Fiona'}]
Cursors have the following properties that can be inspected at any time:
Name | Return type | Summary |
---|---|---|
|
|
The current state of the cursor. Values are in
|
|
|
Whether the cursor has the potential to yield more data. |
|
|
The number of items the cursors has yielded. (How many items have been already read by the code consuming the cursor.) |
|
|
The number of items (documents or rows) currently stored in the client-side buffer of this cursor. Reading this property never triggers new API calls to re-fill the buffer. |
|
|
The object on which a |
The following methods, in addition to the iteration described above, will alter the cursor internal state.
Name | Return type | Summary |
---|---|---|
|
|
Closes the cursor, regardless of its state. A cursor can be closed at any time, discarding any items that have not been consumed. |
|
|
Rewinds the cursor, bringing it back to its initial state of no items consumed. All cursor settings (filter, mapping, projection, etc.) are retained. |
|
|
Consumes (returns) up to the requested number of buffered items (rows or documents). The returned items are marked as consumed, meaning that subsequently consuming the cursor will start after those items. |
|
|
Consumes the remaining rows in the cursor, invoking a provided callback Calling this method on a The callback function can return any value. The return value is generally discarded, with the following exception: if the function returns the boolean |
|
|
Converts all rows that remain to be consumed from a cursor into a list. Calling this method on a If the cursor is Calling this method is not recommended if a large list of results is anticipated because it would involve a large number of data exchanges with the Data API and possibly a massive memory usage to construct the list. In such cases, you should follow a lazy pattern of iterating and consuming the rows. |
|
|
Returns a Boolean indicating whether the cursor has more documents to return.
This method can trigger the fetch operation of a new page, if the current buffer is empty. Calling |
|
|
Returns the query vector used in the vector (ANN) search that originated this cursor, if applicable. If this is not an ANN search, or it was invoked without the Calling The method can be invoked on a |
The following methods will not alter the
cursor internal state. Instead, they produce a copy, possibly with some altered attributes.
These can be used to further modify the details of the underlying find
parameters.
Except for the clone
method, the cursor must be in the IDLE
state.
With the exception of map
and clone
, usage of these methods is normally not necessary
since these correspond to arguments to the find
method itself.
Name | Return type | Summary |
---|---|---|
|
|
Creates a new |
|
|
Returns a copy of this cursor with a new |
|
|
Return a copy of this cursor with a new |
|
|
Returns a copy of this cursor with a new |
|
|
Returns a copy of this cursor with a new |
|
|
Returns a copy of this cursor with a new |
|
|
Returns a copy of this cursor with a new |
|
|
Returns a copy of this cursor with a new |
|
|
Returns a copy of this cursor with a mapping function to transform the returned items. Calling this method on a cursor with a mapping already set results in the mapping functions being composed. This operation is allowed only if the cursor state is still |
The following code demonstrates use of map
, to_list
and for_each
.
For simplicity, the script only uses a TableFindCursor
object
(such as can be obtained by calling
find
on a Table),
but the same usage patterns work for
CollectionFindCursor
objects. Similar patterns work for AsyncCollectionCursor
and AsyncTableCursor
objects..
Additional information on the methods outlined in this section can be found in the
client API Reference.
Full example script
from astrapy import DataAPIClient
client = DataAPIClient("TOKEN")
database = client.get_database("API_ENDPOINT")
from astrapy.constants import SortMode
from astrapy.info import (
CreateTableDefinition,
ColumnType,
)
my_table = database.create_table(
"games",
definition=(
CreateTableDefinition.builder()
.add_column("match_id", ColumnType.TEXT)
.add_column("round", ColumnType.TINYINT)
.add_vector_column("m_vector", dimension=3)
.add_column("score", ColumnType.INT)
.add_column("when", ColumnType.TIMESTAMP)
.add_column("winner", ColumnType.TEXT)
.add_set_column("fighters", ColumnType.UUID)
.add_partition_by(["match_id"])
.add_partition_sort({"round": SortMode.ASCENDING})
.build()
),
)
insert_result = my_table.insert_many(
[
{"match_id": "challenge6", "round": 1, "winner": "Donna"},
{"match_id": "challenge6", "round": 2, "winner": "Erick"},
{"match_id": "challenge6", "round": 3, "winner": "Fiona"},
{"match_id": "challenge6", "round": 4, "winner": "Georg"},
{"match_id": "challenge6", "round": 5, "winner": "Helen"},
],
)
# Get a cursor
cursor = my_table.find(
{"match_id": "challenge6"},
projection={"winner": True},
limit=3,
)
for row in cursor:
print(row)
# Output:
# {'winner': 'Donna'}
# {'winner': 'Erick'}
# {'winner': 'Fiona'}
# Applying 'map' to a cursor:
cursor_mapped = my_table.find(
{"match_id": "challenge6"},
projection={"winner": True},
limit=3,
).map(lambda row: row["winner"])
for value in cursor_mapped:
print(value)
# Output:
# Donna
# Erick
# Fiona
# Applying 'map' twice:
cursor_mapped_twice = my_table.find(
{"match_id": "challenge6"},
projection={"winner": True},
limit=3,
).map(lambda row: row["winner"]).map(lambda w: w.upper())
for value in cursor_mapped_twice:
print(value)
# Output:
# DONNA
# ERICK
# FIONA
# Calling 'to_list' on an IDLE cursor:
cursor_tl = my_table.find(
{"match_id": "challenge6"},
projection={"winner": True},
limit=2,
)
print(cursor_tl.to_list())
# Output:
# [{'winner': 'Donna'}, {'winner': 'Erick'}]
print(cursor_tl.state)
# Output:
# FindCursorState.CLOSED
# Calling 'to_list' on a partially-consumed (ACTIVE) cursor:
cursor_pc = my_table.find(
{"match_id": "challenge6"},
projection={"winner": True},
limit=2,
)
cursor_pc.next()
# Output:
# {'winner': 'Donna'}
print(cursor_pc.consumed)
# Output:
# 1
print(cursor_pc.state)
# Output:
# FindCursorState.STARTED
print(cursor_pc.to_list())
# Output:
# [{'winner': 'Erick'}]
# Calling 'for_each' across the whole of a cursor:
cursor_fe = my_table.find(
{"match_id": "challenge6"},
projection={"winner": True},
limit=3,
)
def printer(row):
print(f"-> {row['winner']}")
cursor_fe.for_each(printer)
# Output:
# -> Donna
# -> Erick
# -> Fiona
print(cursor_fe.state)
# Output:
# FindCursorState.CLOSED
# Calling 'for_each' with an early-stop callback:
cursor_es = my_table.find(
{"match_id": "challenge6"},
projection={"winner": True},
limit=3,
)
def printer_es(row):
go_on = row["winner"] != "Erick"
print(f"-> {row['winner']} (go_on={go_on})")
return go_on
cursor_es.for_each(printer_es)
# Output:
# -> Donna (go_on=True)
# -> Erick (go_on=False)
print(cursor_es.consumed)
# Output:
# 2
print(cursor_es.to_list())
# Output:
# [{'winner': 'Fiona'}]
Example:
# A simple cursor from 'find':
cursor = my_table.find(
{"match_id": "challenge6"},
projection={"winner": True},
limit=3,
)
for row in cursor:
print(row)
# Output:
# {'winner': 'Donna'}
# {'winner': 'Erick'}
# {'winner': 'Fiona'}
# Applying 'map' to a cursor:
cursor_mapped = my_table.find(
{"match_id": "challenge6"},
projection={"winner": True},
limit=3,
).map(lambda row: row["winner"])
for value in cursor_mapped:
print(value)
# Output:
# Donna
# Erick
# Fiona
# Applying 'map' twice:
cursor_mapped_twice = my_table.find(
{"match_id": "challenge6"},
projection={"winner": True},
limit=3,
).map(lambda row: row["winner"]).map(lambda w: w.upper())
for value in cursor_mapped_twice:
print(value)
# Output:
# DONNA
# ERICK
# FIONA
# Calling 'to_list' on an IDLE cursor:
cursor_tl = my_table.find(
{"match_id": "challenge6"},
projection={"winner": True},
limit=2,
)
print(cursor_tl.to_list())
# Output:
# [{'winner': 'Donna'}, {'winner': 'Erick'}]
print(cursor_tl.state)
# Output:
# FindCursorState.CLOSED
# Calling 'to_list' on a partially-consumed (ACTIVE) cursor:
cursor_pc = my_table.find(
{"match_id": "challenge6"},
projection={"winner": True},
limit=2,
)
cursor_pc.next()
# Output:
# {'winner': 'Donna'}
print(cursor_pc.consumed)
# Output:
# 1
print(cursor_pc.state)
# Output:
# FindCursorState.STARTED
print(cursor_pc.to_list())
# Output:
# [{'winner': 'Erick'}]
# Calling 'for_each' across the whole of a cursor:
cursor_fe = my_table.find(
{"match_id": "challenge6"},
projection={"winner": True},
limit=3,
)
def printer(row):
print(f"-> {row['winner']}")
cursor_fe.for_each(printer)
# Output:
# -> Donna
# -> Erick
# -> Fiona
print(cursor_fe.state)
# Output:
# FindCursorState.CLOSED
# Calling 'for_each' with an early-stop callback:
cursor_es = my_table.find(
{"match_id": "challenge6"},
projection={"winner": True},
limit=3,
)
def printer_es(row):
go_on = row["winner"] != "Erick"
print(f"-> {row['winner']} (go_on={go_on})")
return go_on
cursor_es.for_each(printer_es)
# Output:
# -> Donna (go_on=True)
# -> Erick (go_on=False)
print(cursor_es.consumed)
# Output:
# 2
print(cursor_es.to_list())
# Output:
# [{'winner': 'Fiona'}]
Asynchronous interface
The Python client offers a complete asynchronous API, which mirrors the synchronous
one and is designed to work with asyncio
. Although most of this documentation focuses
on the synchronous counterpart, all async classes and methods have the
same signature and behavior (except for the obvious modifications due to the async
execution model).
For the async interface:
-
Classes for data-related work have an async version:
AsyncDatabase
,AsyncTable
,AsyncCollection
. Methods of these classes that issue HTTP requests have the same name and signature as their sync equivalent, but are awaitable. Conversion methodsto_async
/to_sync
convert between the two. -
There is one
DataAPIClient
, which is shared for the sync and the async usages. Similarly, there is oneAstraDBAdmin
, oneAstraDBDatabaseAdmin
, and oneDataAPIDatabaseAdmin
. These classes have pairs of methods (a sync and an async version). In particular, they all have bothget_database
andget_async_database
methods, which return aDatabase
and anAsyncDatabase
respectively. TheAstraDBAdmin
class also haslist_databases
/async_list_databases
,database_info
/async_database_info
,create_database
/async_create_database
, anddrop_database
/async_drop_database
. TheAstraDBDatabaseAdmin
andDataAPIDatabaseAdmin
classes also havelist_keyspaces
/async_list_keyspaces
,create_keyspace
/async_create_keyspace
,drop_keyspace
/async_drop_keyspace
, andfind_embedding_providers
/async_find_embedding_providers
. TheAstraDBDatabaseInfo
class hasinfo
/async_info
anddrop
/async_drop
. -
Cursors that are obtained by calling
find
on async tables and collections implement the async iteration protocol (async for …
). Other methods on these cursors are awaitable if they involve HTTP request(s).
An asynchronous script should first instantiate DataAPIClient
and then call get_async_database
in the instantiated DataAPIClient
to instantiate AsyncDatabase
. Then, invoke await methods such as get_collection
or create_table
on this
database, and use the resulting asynchronous collections or tables with the async versions or methods. The following is a sample full script demonstrating these concepts.
import asyncio
import os
from astrapy import DataAPIClient
async def amain():
# get an ordinary DataAPIClient as the starting point
my_client = DataAPIClient(token=os.environ["ASTRA_DB_APPLICATION_TOKEN"])
# get an ordinary AstraDBAdmin and call an async method on it
admin = my_client.get_admin()
db_list = await admin.async_list_databases()
print(f"Databases: {', '.join(db.name for db in db_list)}")
# Get an AsyncDatabase from the client
adatabase = my_client.get_async_database(
os.environ["ASTRA_DB_API_ENDPOINT"],
)
# Create a collection on DB and get an AsyncCollection object:
acollection = await adatabase.create_collection("my_collection")
# Use the AsyncCollection object:
insertion_result = await acollection.insert_many(
[{"_id": i} for i in range(5)]
)
print(f"Inserted ids: {insertion_result.inserted_ids}")
acursor = acollection.find({})
async for doc in acursor:
print(f" * Document: {doc}")
acursor2 = acollection.find({}, limit=2)
the_docs = await acursor2.to_list()
print(f"Some docs: {the_docs}")
await acollection.delete_many({})
if __name__ == "__main__":
asyncio.run(amain())
Typing support
This option requires client version 2.0-preview or later. For more information, see Data API client upgrade guide. |
Type hints are optional but fully supported when working with
Collection
and Table
objects and their contents. The relevant classes feature type parameters
that provide the required type hints to achieve type safety.
The following code sample demonstrates how some statements change when adding full type support:
# ***************#
# No type hints: #
# ***************#
# Create and get a Table:
my_table = database.create_table(
"friends_t",
definition=table_definition,
)
the_same_table = database.get_table("friends_t")
# Create and get a Collection:
my_collection = database.create_collection("friends_c")
the_same_collection = database.get_collection("friends_c")
# Run a find and get a cursor:
projected_cursor_c = my_collection.find(
{"city_id": 6},
projection={"name": True, "nickname": True, "_id": False},
)
# *****************#
# With type hints: #
# *****************#
# Create and get a Table:
my_table: Table[MyFullTableDict] = database.create_table(
"friends_t",
definition=table_definition,
row_type=MyFullTableDict,
)
the_same_table: Table[MyFullTableDict] = database.get_table(
"friends_t",
row_type=MyFullTableDict,
)
# Create and get a Collection:
my_collection: Collection[MyFullCollectionDict] = database.create_collection(
"friends_c",
document_type=MyFullCollectionDict,
)
the_same_collection: Collection[MyFullCollectionDict] = database.get_collection(
"friends_c",
document_type=MyFullCollectionDict,
)
# Run a find and get a cursor:
projected_cursor_c = my_collection.find(
{"city_id": 6},
projection={"name": True, "nickname": True, "_id": False},
document_type=MyNameNickDict,
)
Both Collection
and Table
objects have one type parameter
for the documents/rows they return when queried. If not specified, the type parameter
defaults to a generic unconstrained dictionary.
Similarly, FindCursor
objects have two type parameters, one for the "raw" items as they
are received from the Data API,
and the other for the type of the items after the cursor mapping is applied. When no mapping is applied, the two
type parameters coincide.
The standard practice to achieve type safety consists of the following steps:
-
Subclass the standard library
TypedDict
in a way that represents the desired dictionaries for rows and documents, including partial items for projections, if desired. -
Make sure the
row_type
/document_type
parameter is supplied to the following database methods returning typed objects:create_collection
,get_collection
,create_table
,get_table
.You should also set a type hint to the variable being assigned. For example,my_table: Table[MyTypedDict] = …
. -
The table
alter
method, which can modify the schema, also accepts arow_type
parameter. You should provide a type hint to thealter
method and start using the return value right away in place of the previousTable
instance. -
Invocations of
find
will return a doubly typedTableCursor
. To give a hint about the type of the returned items, arow_type
ordocument_type
parameter should be passed tofind
. -
If explicit type hints are provided in the "entry points" mentioned above, the rest of the type inference proceeds automatically without other explicit hints in the code.
A full script demonstrating the above prescription is given here:
Untyped version of the same script
# ***************#
# No type hints: #
# ***************#
# Preliminaries
from astrapy import DataAPIClient
client = DataAPIClient("TOKEN")
database = client.get_database("API_ENDPOINT")
# Functions to format and transform (for demonstration purposes)
def format_name(fname, fnick):
return f"{fname} '{fnick}'"
def extract_nick(row):
return row["nickname"]
def format_nick(nick):
return f"<{nick.upper()} !!!>"
# Interaction with TABLES
from astrapy.constants import SortMode
from astrapy.info import (
CreateTableDefinition,
ColumnType,
TableScalarColumnTypeDescriptor,
AlterTableAddColumns,
)
table_definition = (
CreateTableDefinition.builder()
.add_column("city_id", ColumnType.TINYINT)
.add_column("name", ColumnType.TEXT)
.add_column("age", ColumnType.INT)
.add_partition_by(["city_id"])
.add_partition_sort({"name": SortMode.ASCENDING})
.build()
)
# Create a table
my_table = database.create_table(
"friends_t",
definition=table_definition,
)
# Use get_table to obtain an equivalent Table object
the_same_table = database.get_table("friends_t")
# Insert rows
my_table.insert_many([
{"city_id": 6, "name": "Paula", "age": 39},
{"city_id": 6, "name": "Liam", "age": 25},
{"city_id": 6, "name": "Dana", "age": 31},
])
# Get a row
paula_row = my_table.find_one({"city_id": 6, "name": "Paula"})
if paula_row is None:
raise ValueError("Paula not found. Hmm.")
# Read a row column
paula_age = paula_row["age"]
# Alter the table, getting a new Table object as result
enriched_table = my_table.alter(
AlterTableAddColumns(
columns={
"nickname": TableScalarColumnTypeDescriptor(
column_type=ColumnType.TEXT,
),
}
)
)
# Update rows with the newly-added column
enriched_table.update_one(
{"city_id": 6, "name": "Paula"}, {"$set": {"nickname": "The Wise"}}
)
enriched_table.update_one(
{"city_id": 6, "name": "Liam"}, {"$set": {"nickname": "Franz"}}
)
enriched_table.update_one(
{"city_id": 6, "name": "Dana"}, {"$set": {"nickname": "Dee"}}
)
# Scroll through the rows from a find with a certain projection, and use them
projected_cursor_t = enriched_table.find(
{"city_id": 6},
projection={"name": True, "nickname": True},
)
print("\nWith a cursor on a table:")
for nn_row in projected_cursor_t:
formatted_name = format_name(nn_row["name"], nn_row["nickname"])
print(f"* {formatted_name}")
# Scroll through rows from a find with a mapping attached
mapped_cursor_t = enriched_table.find(
{"city_id": 6},
projection={"nickname": True},
).map(extract_nick)
print("\nWith a mapped cursor on a table:")
for nn_nickname in mapped_cursor_t:
print(f"=> {format_nick(nn_nickname)}")
# Interaction with COLLECTIONS
# Create a collection
my_collection = database.create_collection("friends_c")
# Use get_collection to get an equivalent Collection object
the_same_collection = database.get_collection("friends_c")
# Insert documents (with temporary nickname for now)
my_collection.insert_many([
{"city_id": 6, "name": "Paula", "age": 39, "nickname": "n/a"},
{"city_id": 6, "name": "Liam", "age": 25, "nickname": "n/a"},
{"city_id": 6, "name": "Dana", "age": 31, "nickname": "n/a"},
])
# Get a document
paula_document = my_collection.find_one({"city_id": 6, "name": "Paula"})
if paula_document is None:
raise ValueError("Paula not found. Hmm.")
# Read a field from the document
paula_age = paula_document["age"]
# Update documents with a new field
my_collection.update_one(
{"city_id": 6, "name": "Paula"}, {"$set": {"nickname": "The Wise"}}
)
my_collection.update_one(
{"city_id": 6, "name": "Liam"}, {"$set": {"nickname": "Franz"}}
)
my_collection.update_one(
{"city_id": 6, "name": "Dana"}, {"$set": {"nickname": "Dee"}}
)
# Iterate over documents from a find with a certain projection and use them
projected_cursor_c = my_collection.find(
{"city_id": 6},
projection={"name": True, "nickname": True, "_id": False},
)
print("\nWith a cursor on a collection:")
for nn_doc in projected_cursor_c:
formatted_name = format_name(nn_doc["name"], nn_doc["nickname"])
print(f"* {formatted_name}")
# Iterate over documents from a find with a mapping attached
mapped_cursor_c = my_collection.find(
{"city_id": 6},
projection={"nickname": True}
).map(extract_nick)
print("\nWith a mapped cursor on a collection:")
for nn_nickname in mapped_cursor_c:
print(f"=> {format_nick(nn_nickname)}")
# *****************#
# With type hints: #
# *****************#
from typing import TypedDict
# Preliminaries
from astrapy import DataAPIClient
client = DataAPIClient("TOKEN")
database = client.get_database("API_ENDPOINT")
# TypedDict objects for the involved rows/document
class NickDict(TypedDict):
nickname: str
class MyNameNickDict(TypedDict):
name: str
nickname: str
class MyFullTableDict(TypedDict):
city_id: int
name: str
age: int
class MyFullEnrichedTableDict(TypedDict):
city_id: int
name: str
age: int
nickname: str
class MyFullCollectionDict(TypedDict, total=False):
_id: str | None
city_id: int
name: str
age: int
nickname: str
# Additional import of Table/Collection needed for type hints
from astrapy import Collection, Table
# Functions to format and transform (for demonstration purposes)
def format_name(fname: str, fnick: str) -> str:
return f"{fname} '{fnick}'"
def extract_nick(row: NickDict) -> str:
return row["nickname"]
def format_nick(nick: str) -> str:
return f"<{nick.upper()} !!!>"
# Interaction with TABLES
from astrapy.constants import SortMode
from astrapy.info import (
CreateTableDefinition,
ColumnType,
TableScalarColumnTypeDescriptor,
AlterTableAddColumns,
)
table_definition = (
CreateTableDefinition.builder()
.add_column("city_id", ColumnType.TINYINT)
.add_column("name", ColumnType.TEXT)
.add_column("age", ColumnType.INT)
.add_partition_by(["city_id"])
.add_partition_sort({"name": SortMode.ASCENDING})
.build()
)
# Create a table
my_table: Table[MyFullTableDict] = database.create_table(
"friends_t",
definition=table_definition,
row_type=MyFullTableDict,
)
# Use get_table to obtain an equivalent Table object
the_same_table: Table[MyFullTableDict] = database.get_table(
"friends_t",
row_type=MyFullTableDict,
)
# Insert rows
my_table.insert_many([
{"city_id": 6, "name": "Paula", "age": 39},
{"city_id": 6, "name": "Liam", "age": 25},
{"city_id": 6, "name": "Dana", "age": 31},
])
# Get a row
paula_row = my_table.find_one({"city_id": 6, "name": "Paula"})
if paula_row is None:
raise ValueError("Paula not found. Hmm.")
# Read a row column
# Variable paula_row is now typed as MyFullTableDict -> paula_age as `int`:
paula_age = paula_row["age"]
# Alter the table, getting a new Table object as result:
# the reason this method does return at all is to adopt the return value
# entirely, which can encode the change in the row type:
enriched_table: Table[MyFullEnrichedTableDict] = my_table.alter(
AlterTableAddColumns(
columns={
"nickname": TableScalarColumnTypeDescriptor(
column_type=ColumnType.TEXT,
),
}
),
row_type=MyFullEnrichedTableDict,
)
# Update rows with the newly-added column
enriched_table.update_one(
{"city_id": 6, "name": "Paula"}, {"$set": {"nickname": "The Wise"}}
)
enriched_table.update_one(
{"city_id": 6, "name": "Liam"}, {"$set": {"nickname": "Franz"}}
)
enriched_table.update_one(
{"city_id": 6, "name": "Dana"}, {"$set": {"nickname": "Dee"}}
)
# Scroll through the rows from a find with a certain projection, and use them
# This `find` returns a `TableFindCursor[MyFullEnrichedTableDict, MyNameNickDict]`:
projected_cursor_t = enriched_table.find(
{"city_id": 6},
projection={"name": True, "nickname": True},
row_type=MyNameNickDict,
)
print("\nWith a cursor on a table:")
for nn_row in projected_cursor_t:
# Variable nn_row is typed as a `MyNameNickDict`
formatted_name = format_name(nn_row["name"], nn_row["nickname"])
print(f"* {formatted_name}")
# Scroll through rows from a find with a mapping attached
# (1) `find` returns a `TableFindCursor[MyFullEnrichedTableDict, NickDict]`
# (2) The map function is a `Callable[[NickDict], str]`
# => The mapped cursor gets type `TableFindCursor[MyFullEnrichedTableDict, str]`
mapped_cursor_t = enriched_table.find(
{"city_id": 6},
projection={"nickname": True},
row_type=NickDict,
).map(extract_nick)
print("\nWith a mapped cursor on a table:")
for nn_nickname in mapped_cursor_t:
# ... and nn_nickname is typed as a str:
print(f"=> {format_nick(nn_nickname)}")
# Interaction with COLLECTIONS
# Create a collection
my_collection: Collection[MyFullCollectionDict] = database.create_collection(
"friends_c",
document_type=MyFullCollectionDict,
)
# Use get_collection to get an equivalent Collection object
the_same_collection: Collection[MyFullCollectionDict] = database.get_collection(
"friends_c",
document_type=MyFullCollectionDict,
)
# Insert documents (with temporary nickname for now)
my_collection.insert_many([
{"city_id": 6, "name": "Paula", "age": 39, "nickname": "n/a"},
{"city_id": 6, "name": "Liam", "age": 25, "nickname": "n/a"},
{"city_id": 6, "name": "Dana", "age": 31, "nickname": "n/a"},
])
# Get a document
paula_document = my_collection.find_one({"city_id": 6, "name": "Paula"})
if paula_document is None:
raise ValueError("Paula not found. Hmm.")
# Read a field from the document
# Var. paula_document inferred a MyFullCollectionDict -> paula_age as `int`:
paula_age = paula_document["age"]
# Update documents with a new field
my_collection.update_one(
{"city_id": 6, "name": "Paula"}, {"$set": {"nickname": "The Wise"}}
)
my_collection.update_one(
{"city_id": 6, "name": "Liam"}, {"$set": {"nickname": "Franz"}}
)
my_collection.update_one(
{"city_id": 6, "name": "Dana"}, {"$set": {"nickname": "Dee"}}
)
# Iterate over documents from a find with a certain projection and use them
# `find` returns a `CollectionFindCursor[MyFullCollectionDict, MyNameNickDict]`:
projected_cursor_c = my_collection.find(
{"city_id": 6},
projection={"name": True, "nickname": True, "_id": False},
document_type=MyNameNickDict,
)
print("\nWith a cursor on a collection:")
for nn_doc in projected_cursor_c:
# Variable nn_doc is typed as a `MyNameNickDict`
formatted_name = format_name(nn_doc["name"], nn_doc["nickname"])
print(f"* {formatted_name}")
# Iterate over documents from a find with a mapping attached
# (1) `find` return value is a `CollectionFindCursor[MyFullCollectionDict, NickDict]`
# (2) The map function is a `Callable[[NickDict], str]`
# => The mapped cursor gets the type `CollectionFindCursor[MyFullCollectionDict, str]`
mapped_cursor_c = my_collection.find(
{"city_id": 6},
projection={"nickname": True},
document_type=NickDict,
).map(extract_nick)
print("\nWith a mapped cursor on a collection:")
for nn_nickname in mapped_cursor_c:
# ... and nn_nickname is typed as a str:
print(f"=> {format_nick(nn_nickname)}")
See also
For command specifications, see the command references, such as Work with collections, and Work with tables.
For major changes between versions, see Data API client upgrade guide.
For the complete client reference, see Python client reference.