Module astrapy.admin
Expand source code
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import asyncio
import logging
import re
import time
import warnings
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
import httpx
from deprecation import DeprecatedWarning
from astrapy.api_commander import APICommander
from astrapy.authentication import coerce_token_provider
from astrapy.constants import Environment
from astrapy.core.defaults import DEFAULT_AUTH_HEADER
from astrapy.core.ops import AstraDBOps
from astrapy.cursors import CommandCursor
from astrapy.exceptions import (
DataAPIFaultyResponseException,
DevOpsAPIException,
MultiCallTimeoutManager,
base_timeout_info,
ops_recast_method_async,
ops_recast_method_sync,
to_dataapi_timeout_exception,
)
from astrapy.info import AdminDatabaseInfo, DatabaseInfo, FindEmbeddingProvidersResult
if TYPE_CHECKING:
from astrapy import AsyncDatabase, Database
from astrapy.authentication import TokenProvider
logger = logging.getLogger(__name__)
DATABASE_POLL_NAMESPACE_SLEEP_TIME = 2
DATABASE_POLL_SLEEP_TIME = 15
STATUS_MAINTENANCE = "MAINTENANCE"
STATUS_ACTIVE = "ACTIVE"
STATUS_PENDING = "PENDING"
STATUS_INITIALIZING = "INITIALIZING"
STATUS_ERROR = "ERROR"
STATUS_TERMINATING = "TERMINATING"
database_id_matcher = re.compile(
"^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$"
)
api_endpoint_parser = re.compile(
r"https://"
r"([0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12})"
r"-"
r"([a-z0-9\-]+)"
r".apps.astra[\-]{0,1}"
r"(dev|test)?"
r".datastax.com"
)
generic_api_url_matcher = re.compile(r"^https?:\/\/[a-zA-Z0-9\-.]+(\:[0-9]{1,6}){0,1}$")
DEV_OPS_URL_MAP = {
Environment.PROD: "https://api.astra.datastax.com",
Environment.DEV: "https://api.dev.cloud.datastax.com",
Environment.TEST: "https://api.test.cloud.datastax.com",
}
API_ENDPOINT_TEMPLATE_MAP = {
Environment.PROD: "https://{database_id}-{region}.apps.astra.datastax.com",
Environment.DEV: "https://{database_id}-{region}.apps.astra-dev.datastax.com",
Environment.TEST: "https://{database_id}-{region}.apps.astra-test.datastax.com",
}
API_PATH_ENV_MAP = {
Environment.PROD: "/api/json",
Environment.DEV: "/api/json",
Environment.TEST: "/api/json",
#
Environment.DSE: "",
Environment.HCD: "",
Environment.CASSANDRA: "",
Environment.OTHER: "",
}
API_VERSION_ENV_MAP = {
Environment.PROD: "/v1",
Environment.DEV: "/v1",
Environment.TEST: "/v1",
#
Environment.DSE: "v1",
Environment.HCD: "v1",
Environment.CASSANDRA: "v1",
Environment.OTHER: "v1",
}
@dataclass
class ParsedAPIEndpoint:
"""
The results of successfully parsing an Astra DB API endpoint, for internal
by database metadata-related functions.
Attributes:
database_id: e. g. "01234567-89ab-cdef-0123-456789abcdef".
region: a region ID, such as "us-west1".
environment: a label, whose value is one of Environment.PROD,
Environment.DEV or Environment.TEST.
"""
database_id: str
region: str
environment: str
def parse_api_endpoint(api_endpoint: str) -> Optional[ParsedAPIEndpoint]:
"""
Parse an API Endpoint into a ParsedAPIEndpoint structure.
Args:
api_endpoint: a full API endpoint for the Data Api.
Returns:
The parsed ParsedAPIEndpoint. If parsing fails, return None.
"""
match = api_endpoint_parser.match(api_endpoint)
if match and match.groups():
d_id, d_re, d_en_x = match.groups()
return ParsedAPIEndpoint(
database_id=d_id,
region=d_re,
environment=d_en_x if d_en_x else "prod",
)
else:
return None
def parse_generic_api_url(api_endpoint: str) -> Optional[str]:
"""
Validate a generic API Endpoint string,
such as `http://10.1.1.1:123` or `https://my.domain`.
Args:
api_endpoint: a string supposedly expressing a valid API Endpoint
(not necessarily an Astra DB one).
Returns:
a normalized (stripped) version of the endpoint if valid. If invalid,
return None.
"""
if api_endpoint and api_endpoint[-1] == "/":
_api_endpoint = api_endpoint[:-1]
else:
_api_endpoint = api_endpoint
match = generic_api_url_matcher.match(_api_endpoint)
if match:
return match[0].rstrip("/")
else:
return None
def build_api_endpoint(environment: str, database_id: str, region: str) -> str:
"""
Build the API Endpoint full strings from database parameters.
Args:
environment: a label, whose value can be Environment.PROD
or another of Environment.* for which this operation makes sense.
database_id: e. g. "01234567-89ab-cdef-0123-456789abcdef".
region: a region ID, such as "us-west1".
Returns:
the endpoint string, such as "https://01234567-...-eu-west1.apps.datastax.com"
"""
return API_ENDPOINT_TEMPLATE_MAP[environment].format(
database_id=database_id,
region=region,
)
def fetch_raw_database_info_from_id_token(
id: str,
*,
token: Optional[str],
environment: str = Environment.PROD,
max_time_ms: Optional[int] = None,
) -> Dict[str, Any]:
"""
Fetch database information through the DevOps API and return it in
full, exactly like the API gives it back.
Args:
id: e. g. "01234567-89ab-cdef-0123-456789abcdef".
token: a valid token to access the database information.
max_time_ms: a timeout, in milliseconds, for waiting on a response.
Returns:
The full response from the DevOps API about the database.
"""
astra_db_ops = AstraDBOps(
token=token,
dev_ops_url=DEV_OPS_URL_MAP[environment],
)
try:
gd_response = astra_db_ops.get_database(
database=id,
timeout_info=base_timeout_info(max_time_ms),
)
return gd_response
except httpx.TimeoutException as texc:
raise to_dataapi_timeout_exception(texc)
async def async_fetch_raw_database_info_from_id_token(
id: str,
*,
token: Optional[str],
environment: str = Environment.PROD,
max_time_ms: Optional[int] = None,
) -> Dict[str, Any]:
"""
Fetch database information through the DevOps API and return it in
full, exactly like the API gives it back.
Async version of the function, for use in an asyncio context.
Args:
id: e. g. "01234567-89ab-cdef-0123-456789abcdef".
token: a valid token to access the database information.
max_time_ms: a timeout, in milliseconds, for waiting on a response.
Returns:
The full response from the DevOps API about the database.
"""
astra_db_ops = AstraDBOps(
token=token,
dev_ops_url=DEV_OPS_URL_MAP[environment],
)
try:
gd_response = await astra_db_ops.async_get_database(
database=id,
timeout_info=base_timeout_info(max_time_ms),
)
return gd_response
except httpx.TimeoutException as texc:
raise to_dataapi_timeout_exception(texc)
def fetch_database_info(
api_endpoint: str,
token: Optional[str],
namespace: Optional[str],
max_time_ms: Optional[int] = None,
) -> Optional[DatabaseInfo]:
"""
Fetch database information through the DevOps API.
Args:
api_endpoint: a full API endpoint for the Data Api.
token: a valid token to access the database information.
namespace: the desired namespace that will be used in the result.
If not specified, the resulting database info will show it as None.
max_time_ms: a timeout, in milliseconds, for waiting on a response.
Returns:
A DatabaseInfo object.
If the API endpoint fails to be parsed, None is returned.
For valid-looking endpoints, if something goes wrong an exception is raised.
"""
parsed_endpoint = parse_api_endpoint(api_endpoint)
if parsed_endpoint:
gd_response = fetch_raw_database_info_from_id_token(
id=parsed_endpoint.database_id,
token=token,
environment=parsed_endpoint.environment,
max_time_ms=max_time_ms,
)
raw_info = gd_response["info"]
if namespace is not None and namespace not in raw_info["keyspaces"]:
raise DevOpsAPIException(f"Namespace {namespace} not found on DB.")
else:
return DatabaseInfo(
id=parsed_endpoint.database_id,
region=parsed_endpoint.region,
namespace=namespace,
name=raw_info["name"],
environment=parsed_endpoint.environment,
raw_info=raw_info,
)
else:
return None
async def async_fetch_database_info(
api_endpoint: str,
token: Optional[str],
namespace: Optional[str],
max_time_ms: Optional[int] = None,
) -> Optional[DatabaseInfo]:
"""
Fetch database information through the DevOps API.
Async version of the function, for use in an asyncio context.
Args:
api_endpoint: a full API endpoint for the Data Api.
token: a valid token to access the database information.
namespace: the desired namespace that will be used in the result.
If not specified, the resulting database info will show it as None.
max_time_ms: a timeout, in milliseconds, for waiting on a response.
Returns:
A DatabaseInfo object.
If the API endpoint fails to be parsed, None is returned.
For valid-looking endpoints, if something goes wrong an exception is raised.
"""
parsed_endpoint = parse_api_endpoint(api_endpoint)
if parsed_endpoint:
gd_response = await async_fetch_raw_database_info_from_id_token(
id=parsed_endpoint.database_id,
token=token,
environment=parsed_endpoint.environment,
max_time_ms=max_time_ms,
)
raw_info = gd_response["info"]
if namespace is not None and namespace not in raw_info["keyspaces"]:
raise DevOpsAPIException(f"Namespace {namespace} not found on DB.")
else:
return DatabaseInfo(
id=parsed_endpoint.database_id,
region=parsed_endpoint.region,
namespace=namespace,
name=raw_info["name"],
environment=parsed_endpoint.environment,
raw_info=raw_info,
)
else:
return None
def _recast_as_admin_database_info(
admin_database_info_dict: Dict[str, Any],
*,
environment: str,
) -> AdminDatabaseInfo:
return AdminDatabaseInfo(
info=DatabaseInfo(
id=admin_database_info_dict["id"],
region=admin_database_info_dict["info"]["region"],
namespace=admin_database_info_dict["info"]["keyspace"],
name=admin_database_info_dict["info"]["name"],
environment=environment,
raw_info=admin_database_info_dict["info"],
),
available_actions=admin_database_info_dict.get("availableActions"),
cost=admin_database_info_dict["cost"],
cqlsh_url=admin_database_info_dict["cqlshUrl"],
creation_time=admin_database_info_dict["creationTime"],
data_endpoint_url=admin_database_info_dict["dataEndpointUrl"],
grafana_url=admin_database_info_dict["grafanaUrl"],
graphql_url=admin_database_info_dict["graphqlUrl"],
id=admin_database_info_dict["id"],
last_usage_time=admin_database_info_dict["lastUsageTime"],
metrics=admin_database_info_dict["metrics"],
observed_status=admin_database_info_dict["observedStatus"],
org_id=admin_database_info_dict["orgId"],
owner_id=admin_database_info_dict["ownerId"],
status=admin_database_info_dict["status"],
storage=admin_database_info_dict["storage"],
termination_time=admin_database_info_dict["terminationTime"],
raw_info=admin_database_info_dict,
)
def normalize_api_endpoint(
id_or_endpoint: str,
region: Optional[str],
token: TokenProvider,
environment: str,
max_time_ms: Optional[int] = None,
) -> str:
"""
Ensure that a id(+region) / endpoint init signature is normalized into
an api_endpoint string.
This is an impure function: if necessary, attempt a DevOps API call to
integrate the information (i.e. if a DB ID without region is passed).
This function is tasked with raising an exception if region is passed along
with an API endpoint (and they do not match).
Args:
id_or_endpoint: either the Database ID or a full standard endpoint.
region: a string with the database region.
token: a TokenProvider for the possible DevOps request to issue.
environment: one of the Astra DB `astrapy.constants.Environment` values.
max_time_ms: used in case the DevOps API request is necessary.
Returns:
a normalized API Endpoint string (unless it raises an exception).
"""
_api_endpoint: str
parsed_endpoint = parse_api_endpoint(id_or_endpoint)
if parsed_endpoint is not None:
if region is not None and region != parsed_endpoint.region:
raise ValueError(
"An explicit `region` parameter is provided, which does not match "
"the supplied API endpoint. Please refrain from specifying `region`."
)
_api_endpoint = id_or_endpoint
else:
# it's a genuine ID
_region: str
if region:
_region = region
else:
logger.info(f"fetching raw database info for {id_or_endpoint}")
this_db_info = fetch_raw_database_info_from_id_token(
id=id_or_endpoint,
token=token.get_token(),
environment=environment,
max_time_ms=max_time_ms,
)
logger.info(f"finished fetching raw database info for {id_or_endpoint}")
_region = this_db_info["info"]["region"]
_api_endpoint = build_api_endpoint(
environment=environment,
database_id=id_or_endpoint,
region=_region,
)
return _api_endpoint.strip("/")
def normalize_id_endpoint_parameters(
id: Optional[str], api_endpoint: Optional[str]
) -> str:
if id is None:
if api_endpoint is None:
raise ValueError(
"Exactly one of the `id` and `api_endpoint` "
"synonymous parameters must be passed."
)
else:
return api_endpoint
else:
if api_endpoint is not None:
raise ValueError(
"The `id` and `api_endpoint` synonymous parameters "
"cannot be supplied at the same time."
)
else:
return id
class AstraDBAdmin:
"""
An "admin" object, able to perform administrative tasks at the databases
level, such as creating, listing or dropping databases.
Args:
token: an access token with enough permission to perform admin tasks.
This can be either a literal token string or a subclass of
`astrapy.authentication.TokenProvider`.
environment: a label, whose value is one of Environment.PROD (default),
Environment.DEV or Environment.TEST.
caller_name: name of the application, or framework, on behalf of which
the DevOps API calls are performed. This ends up in the request user-agent.
caller_version: version of the caller.
dev_ops_url: in case of custom deployments, this can be used to specify
the URL to the DevOps API, such as "https://api.astra.datastax.com".
Generally it can be omitted. The environment (prod/dev/...) is
determined from the API Endpoint.
dev_ops_api_version: this can specify a custom version of the DevOps API
(such as "v2"). Generally not needed.
Example:
>>> from astrapy import DataAPIClient
>>> my_client = DataAPIClient("AstraCS:...")
>>> my_astra_db_admin = my_client.get_admin()
>>> database_list = my_astra_db_admin.list_databases()
>>> len(database_list)
3
>>> database_list[2].id
'01234567-...'
>>> my_db_admin = my_astra_db_admin.get_database_admin("01234567-...")
>>> my_db_admin.list_namespaces()
['default_keyspace', 'staging_namespace']
"""
def __init__(
self,
token: Optional[Union[str, TokenProvider]] = None,
*,
environment: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
dev_ops_url: Optional[str] = None,
dev_ops_api_version: Optional[str] = None,
) -> None:
self.token_provider = coerce_token_provider(token)
self.environment = (environment or Environment.PROD).lower()
if dev_ops_url is None:
self.dev_ops_url = DEV_OPS_URL_MAP[self.environment]
else:
self.dev_ops_url = dev_ops_url
self._caller_name = caller_name
self._caller_version = caller_version
self._dev_ops_url = dev_ops_url
self._dev_ops_api_version = dev_ops_api_version
self._astra_db_ops = AstraDBOps(
token=self.token_provider.get_token(),
dev_ops_url=self.dev_ops_url,
dev_ops_api_version=dev_ops_api_version,
caller_name=caller_name,
caller_version=caller_version,
)
def __repr__(self) -> str:
env_desc: str
if self.environment == Environment.PROD:
env_desc = ""
else:
env_desc = f', environment="{self.environment}"'
return (
f'{self.__class__.__name__}("{str(self.token_provider)[:12]}..."{env_desc})'
)
def __eq__(self, other: Any) -> bool:
if isinstance(other, AstraDBAdmin):
return all(
[
self.token_provider == other.token_provider,
self.environment == other.environment,
self.dev_ops_url == other.dev_ops_url,
self.dev_ops_url == other.dev_ops_url,
self._caller_name == other._caller_name,
self._caller_version == other._caller_version,
self._dev_ops_url == other._dev_ops_url,
self._dev_ops_api_version == other._dev_ops_api_version,
self._astra_db_ops == other._astra_db_ops,
]
)
else:
return False
def _copy(
self,
*,
token: Optional[Union[str, TokenProvider]] = None,
environment: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
dev_ops_url: Optional[str] = None,
dev_ops_api_version: Optional[str] = None,
) -> AstraDBAdmin:
return AstraDBAdmin(
token=coerce_token_provider(token) or self.token_provider,
environment=environment or self.environment,
caller_name=caller_name or self._caller_name,
caller_version=caller_version or self._caller_version,
dev_ops_url=dev_ops_url or self._dev_ops_url,
dev_ops_api_version=dev_ops_api_version or self._dev_ops_api_version,
)
def with_options(
self,
*,
token: Optional[Union[str, TokenProvider]] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> AstraDBAdmin:
"""
Create a clone of this AstraDBAdmin with some changed attributes.
Args:
token: an Access Token to the database. Example: `"AstraCS:xyz..."`.
This can be either a literal token string or a subclass of
`astrapy.authentication.TokenProvider`.
caller_name: name of the application, or framework, on behalf of which
the Data API and DevOps API calls are performed. This ends up in
the request user-agent.
caller_version: version of the caller.
Returns:
a new AstraDBAdmin instance.
Example:
>>> another_astra_db_admin = my_astra_db_admin.with_options(
... caller_name="caller_identity",
... caller_version="1.2.0",
... )
"""
return self._copy(
token=token,
caller_name=caller_name,
caller_version=caller_version,
)
def set_caller(
self,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> None:
"""
Set a new identity for the application/framework on behalf of which
the DevOps API calls will be performed (the "caller").
New objects spawned from this client afterwards will inherit the new settings.
Args:
caller_name: name of the application, or framework, on behalf of which
the DevOps API calls are performed. This ends up in the request user-agent.
caller_version: version of the caller.
Example:
>>> my_astra_db_admin.set_caller(
... caller_name="the_caller",
... caller_version="0.1.0",
... )
"""
logger.info(f"setting caller to {caller_name}/{caller_version}")
self._caller_name = caller_name
self._caller_version = caller_version
self._astra_db_ops.set_caller(caller_name, caller_version)
@ops_recast_method_sync
def list_databases(
self,
*,
max_time_ms: Optional[int] = None,
) -> CommandCursor[AdminDatabaseInfo]:
"""
Get the list of databases, as obtained with a request to the DevOps API.
Args:
max_time_ms: a timeout, in milliseconds, for the API request.
Returns:
A CommandCursor to iterate over the detected databases,
represented as AdminDatabaseInfo objects.
Example:
>>> database_cursor = my_astra_db_admin.list_databases()
>>> database_list = list(database_cursor)
>>> len(database_list)
3
>>> database_list[2].id
'01234567-...'
>>> database_list[2].status
'ACTIVE'
>>> database_list[2].info.region
'eu-west-1'
"""
logger.info("getting databases")
gd_list_response = self._astra_db_ops.get_databases(
timeout_info=base_timeout_info(max_time_ms)
)
logger.info("finished getting databases")
if not isinstance(gd_list_response, list):
raise DevOpsAPIException(
"Faulty response from get-databases DevOps API command.",
)
else:
# we know this is a list of dicts which need a little adjusting
return CommandCursor(
address=self._astra_db_ops.base_url,
items=[
_recast_as_admin_database_info(
db_dict,
environment=self.environment,
)
for db_dict in gd_list_response
],
)
@ops_recast_method_async
async def async_list_databases(
self,
*,
max_time_ms: Optional[int] = None,
) -> CommandCursor[AdminDatabaseInfo]:
"""
Get the list of databases, as obtained with a request to the DevOps API.
Async version of the method, for use in an asyncio context.
Args:
max_time_ms: a timeout, in milliseconds, for the API request.
Returns:
A CommandCursor to iterate over the detected databases,
represented as AdminDatabaseInfo objects.
Note that the return type is not an awaitable, rather
a regular iterable, e.g. for use in ordinary "for" loops.
Example:
>>> async def check_if_db_exists(db_id: str) -> bool:
... db_cursor = await my_astra_db_admin.async_list_databases()
... db_list = list(dd_cursor)
... return db_id in db_list
...
>>> asyncio.run(check_if_db_exists("xyz"))
True
>>> asyncio.run(check_if_db_exists("01234567-..."))
False
"""
logger.info("getting databases, async")
gd_list_response = await self._astra_db_ops.async_get_databases(
timeout_info=base_timeout_info(max_time_ms)
)
logger.info("finished getting databases, async")
if not isinstance(gd_list_response, list):
raise DevOpsAPIException(
"Faulty response from get-databases DevOps API command.",
)
else:
# we know this is a list of dicts which need a little adjusting
return CommandCursor(
address=self._astra_db_ops.base_url,
items=[
_recast_as_admin_database_info(
db_dict,
environment=self.environment,
)
for db_dict in gd_list_response
],
)
@ops_recast_method_sync
def database_info(
self, id: str, *, max_time_ms: Optional[int] = None
) -> AdminDatabaseInfo:
"""
Get the full information on a given database, through a request to the DevOps API.
Args:
id: the ID of the target database, e. g.
"01234567-89ab-cdef-0123-456789abcdef".
max_time_ms: a timeout, in milliseconds, for the API request.
Returns:
An AdminDatabaseInfo object.
Example:
>>> details_of_my_db = my_astra_db_admin.database_info("01234567-...")
>>> details_of_my_db.id
'01234567-...'
>>> details_of_my_db.status
'ACTIVE'
>>> details_of_my_db.info.region
'eu-west-1'
"""
logger.info(f"getting database info for '{id}'")
gd_response = self._astra_db_ops.get_database(
database=id,
timeout_info=base_timeout_info(max_time_ms),
)
logger.info(f"finished getting database info for '{id}'")
if not isinstance(gd_response, dict):
raise DevOpsAPIException(
"Faulty response from get-database DevOps API command.",
)
else:
return _recast_as_admin_database_info(
gd_response,
environment=self.environment,
)
@ops_recast_method_async
async def async_database_info(
self, id: str, *, max_time_ms: Optional[int] = None
) -> AdminDatabaseInfo:
"""
Get the full information on a given database, through a request to the DevOps API.
This is an awaitable method suitable for use within an asyncio event loop.
Args:
id: the ID of the target database, e. g.
"01234567-89ab-cdef-0123-456789abcdef".
max_time_ms: a timeout, in milliseconds, for the API request.
Returns:
An AdminDatabaseInfo object.
Example:
>>> async def check_if_db_active(db_id: str) -> bool:
... db_info = await my_astra_db_admin.async_database_info(db_id)
... return db_info.status == "ACTIVE"
...
>>> asyncio.run(check_if_db_active("01234567-..."))
True
"""
logger.info(f"getting database info for '{id}', async")
gd_response = await self._astra_db_ops.async_get_database(
database=id,
timeout_info=base_timeout_info(max_time_ms),
)
logger.info(f"finished getting database info for '{id}', async")
if not isinstance(gd_response, dict):
raise DevOpsAPIException(
"Faulty response from get-database DevOps API command.",
)
else:
return _recast_as_admin_database_info(
gd_response,
environment=self.environment,
)
@ops_recast_method_sync
def create_database(
self,
name: str,
*,
cloud_provider: str,
region: str,
namespace: Optional[str] = None,
wait_until_active: bool = True,
max_time_ms: Optional[int] = None,
) -> AstraDBDatabaseAdmin:
"""
Create a database as requested, optionally waiting for it to be ready.
Args:
name: the desired name for the database.
cloud_provider: one of 'aws', 'gcp' or 'azure'.
region: any of the available cloud regions.
namespace: name for the one namespace the database starts with.
If omitted, DevOps API will use its default.
wait_until_active: if True (default), the method returns only after
the newly-created database is in ACTIVE state (a few minutes,
usually). If False, it will return right after issuing the
creation request to the DevOps API, and it will be responsibility
of the caller to check the database status before working with it.
max_time_ms: a timeout, in milliseconds, for the whole requested
operation to complete.
Note that a timeout is no guarantee that the creation request
has not reached the API server.
Returns:
An AstraDBDatabaseAdmin instance.
Example:
>>> my_new_db_admin = my_astra_db_admin.create_database(
... "new_database",
... cloud_provider="aws",
... region="ap-south-1",
... )
>>> my_new_db = my_new_db_admin.get_database()
>>> my_coll = my_new_db.create_collection("movies", dimension=2)
>>> my_coll.insert_one({"title": "The Title", "$vector": [0.1, 0.2]})
"""
database_definition = {
k: v
for k, v in {
"name": name,
"tier": "serverless",
"cloudProvider": cloud_provider,
"region": region,
"capacityUnits": 1,
"dbType": "vector",
"keyspace": namespace,
}.items()
if v is not None
}
timeout_manager = MultiCallTimeoutManager(
overall_max_time_ms=max_time_ms, exception_type="devops_api"
)
logger.info(f"creating database {name}/({cloud_provider}, {region})")
cd_response = self._astra_db_ops.create_database(
database_definition=database_definition,
timeout_info=base_timeout_info(max_time_ms),
)
logger.info(
"devops api returned from creating database "
f"{name}/({cloud_provider}, {region})"
)
if cd_response is not None and "id" in cd_response:
new_database_id = cd_response["id"]
if wait_until_active:
last_status_seen = STATUS_PENDING
while last_status_seen in {STATUS_PENDING, STATUS_INITIALIZING}:
logger.info(f"sleeping to poll for status of '{new_database_id}'")
time.sleep(DATABASE_POLL_SLEEP_TIME)
last_db_info = self.database_info(
id=new_database_id,
max_time_ms=timeout_manager.remaining_timeout_ms(),
)
last_status_seen = last_db_info.status
if last_status_seen != STATUS_ACTIVE:
raise DevOpsAPIException(
f"Database {name} entered unexpected status {last_status_seen} after PENDING"
)
# return the database instance
logger.info(
f"finished creating database '{new_database_id}' = "
f"{name}/({cloud_provider}, {region})"
)
return AstraDBDatabaseAdmin.from_astra_db_admin(
id=new_database_id,
region=region,
astra_db_admin=self,
)
else:
raise DevOpsAPIException("Could not create the database.")
@ops_recast_method_async
async def async_create_database(
self,
name: str,
*,
cloud_provider: str,
region: str,
namespace: Optional[str] = None,
wait_until_active: bool = True,
max_time_ms: Optional[int] = None,
) -> AstraDBDatabaseAdmin:
"""
Create a database as requested, optionally waiting for it to be ready.
This is an awaitable method suitable for use within an asyncio event loop.
Args:
name: the desired name for the database.
cloud_provider: one of 'aws', 'gcp' or 'azure'.
region: any of the available cloud regions.
namespace: name for the one namespace the database starts with.
If omitted, DevOps API will use its default.
wait_until_active: if True (default), the method returns only after
the newly-created database is in ACTIVE state (a few minutes,
usually). If False, it will return right after issuing the
creation request to the DevOps API, and it will be responsibility
of the caller to check the database status before working with it.
max_time_ms: a timeout, in milliseconds, for the whole requested
operation to complete.
Note that a timeout is no guarantee that the creation request
has not reached the API server.
Returns:
An AstraDBDatabaseAdmin instance.
Example:
>>> asyncio.run(
... my_astra_db_admin.async_create_database(
... "new_database",
... cloud_provider="aws",
... region="ap-south-1",
.... )
... )
AstraDBDatabaseAdmin(id=...)
"""
database_definition = {
k: v
for k, v in {
"name": name,
"tier": "serverless",
"cloudProvider": cloud_provider,
"region": region,
"capacityUnits": 1,
"dbType": "vector",
"keyspace": namespace,
}.items()
if v is not None
}
timeout_manager = MultiCallTimeoutManager(
overall_max_time_ms=max_time_ms, exception_type="devops_api"
)
logger.info(f"creating database {name}/({cloud_provider}, {region}), async")
cd_response = await self._astra_db_ops.async_create_database(
database_definition=database_definition,
timeout_info=base_timeout_info(max_time_ms),
)
logger.info(
"devops api returned from creating database "
f"{name}/({cloud_provider}, {region}), async"
)
if cd_response is not None and "id" in cd_response:
new_database_id = cd_response["id"]
if wait_until_active:
last_status_seen = STATUS_PENDING
while last_status_seen in {STATUS_PENDING, STATUS_INITIALIZING}:
logger.info(
f"sleeping to poll for status of '{new_database_id}', async"
)
await asyncio.sleep(DATABASE_POLL_SLEEP_TIME)
last_db_info = await self.async_database_info(
id=new_database_id,
max_time_ms=timeout_manager.remaining_timeout_ms(),
)
last_status_seen = last_db_info.status
if last_status_seen != STATUS_ACTIVE:
raise DevOpsAPIException(
f"Database {name} entered unexpected status {last_status_seen} after PENDING"
)
# return the database instance
logger.info(
f"finished creating database '{new_database_id}' = "
f"{name}/({cloud_provider}, {region}), async"
)
return AstraDBDatabaseAdmin.from_astra_db_admin(
id=new_database_id,
region=region,
astra_db_admin=self,
)
else:
raise DevOpsAPIException("Could not create the database.")
@ops_recast_method_sync
def drop_database(
self,
id: str,
*,
wait_until_active: bool = True,
max_time_ms: Optional[int] = None,
) -> Dict[str, Any]:
"""
Drop a database, i.e. delete it completely and permanently with all its data.
Args:
id: The ID of the database to drop, e. g.
"01234567-89ab-cdef-0123-456789abcdef".
wait_until_active: if True (default), the method returns only after
the database has actually been deleted (generally a few minutes).
If False, it will return right after issuing the
drop request to the DevOps API, and it will be responsibility
of the caller to check the database status/availability
after that, if desired.
max_time_ms: a timeout, in milliseconds, for the whole requested
operation to complete.
Note that a timeout is no guarantee that the deletion request
has not reached the API server.
Returns:
A dictionary of the form {"ok": 1} in case of success.
Otherwise, an exception is raised.
Example:
>>> database_list_pre = my_astra_db_admin.list_databases()
>>> len(database_list_pre)
3
>>> my_astra_db_admin.drop_database("01234567-...")
{'ok': 1}
>>> database_list_post = my_astra_db_admin.list_databases()
>>> len(database_list_post)
2
"""
timeout_manager = MultiCallTimeoutManager(
overall_max_time_ms=max_time_ms, exception_type="devops_api"
)
logger.info(f"dropping database '{id}'")
te_response = self._astra_db_ops.terminate_database(
database=id,
timeout_info=base_timeout_info(max_time_ms),
)
logger.info(f"devops api returned from dropping database '{id}'")
if te_response == id:
if wait_until_active:
last_status_seen: Optional[str] = STATUS_TERMINATING
_db_name: Optional[str] = None
while last_status_seen == STATUS_TERMINATING:
logger.info(f"sleeping to poll for status of '{id}'")
time.sleep(DATABASE_POLL_SLEEP_TIME)
#
detected_databases = [
a_db_info
for a_db_info in self.list_databases(
max_time_ms=timeout_manager.remaining_timeout_ms(),
)
if a_db_info.id == id
]
if detected_databases:
last_status_seen = detected_databases[0].status
_db_name = detected_databases[0].info.name
else:
last_status_seen = None
if last_status_seen is not None:
_name_desc = f" ({_db_name})" if _db_name else ""
raise DevOpsAPIException(
f"Database {id}{_name_desc} entered unexpected status {last_status_seen} after PENDING"
)
logger.info(f"finished dropping database '{id}'")
return {"ok": 1}
else:
raise DevOpsAPIException(
f"Could not issue a successful terminate-database DevOps API request for {id}."
)
@ops_recast_method_async
async def async_drop_database(
self,
id: str,
*,
wait_until_active: bool = True,
max_time_ms: Optional[int] = None,
) -> Dict[str, Any]:
"""
Drop a database, i.e. delete it completely and permanently with all its data.
Async version of the method, for use in an asyncio context.
Args:
id: The ID of the database to drop, e. g.
"01234567-89ab-cdef-0123-456789abcdef".
wait_until_active: if True (default), the method returns only after
the database has actually been deleted (generally a few minutes).
If False, it will return right after issuing the
drop request to the DevOps API, and it will be responsibility
of the caller to check the database status/availability
after that, if desired.
max_time_ms: a timeout, in milliseconds, for the whole requested
operation to complete.
Note that a timeout is no guarantee that the deletion request
has not reached the API server.
Returns:
A dictionary of the form {"ok": 1} in case of success.
Otherwise, an exception is raised.
Example:
>>> asyncio.run(
... my_astra_db_admin.async_drop_database("01234567-...")
... )
{'ok': 1}
"""
timeout_manager = MultiCallTimeoutManager(
overall_max_time_ms=max_time_ms, exception_type="devops_api"
)
logger.info(f"dropping database '{id}', async")
te_response = await self._astra_db_ops.async_terminate_database(
database=id,
timeout_info=base_timeout_info(max_time_ms),
)
logger.info(f"devops api returned from dropping database '{id}', async")
if te_response == id:
if wait_until_active:
last_status_seen: Optional[str] = STATUS_TERMINATING
_db_name: Optional[str] = None
while last_status_seen == STATUS_TERMINATING:
logger.info(f"sleeping to poll for status of '{id}', async")
await asyncio.sleep(DATABASE_POLL_SLEEP_TIME)
#
detected_databases = [
a_db_info
for a_db_info in await self.async_list_databases(
max_time_ms=timeout_manager.remaining_timeout_ms(),
)
if a_db_info.id == id
]
if detected_databases:
last_status_seen = detected_databases[0].status
_db_name = detected_databases[0].info.name
else:
last_status_seen = None
if last_status_seen is not None:
_name_desc = f" ({_db_name})" if _db_name else ""
raise DevOpsAPIException(
f"Database {id}{_name_desc} entered unexpected status {last_status_seen} after PENDING"
)
logger.info(f"finished dropping database '{id}', async")
return {"ok": 1}
else:
raise DevOpsAPIException(
f"Could not issue a successful terminate-database DevOps API request for {id}."
)
def get_database_admin(
self,
id: Optional[str] = None,
*,
api_endpoint: Optional[str] = None,
region: Optional[str] = None,
max_time_ms: Optional[int] = None,
) -> AstraDBDatabaseAdmin:
"""
Create an AstraDBDatabaseAdmin object for admin work within a certain database.
Args:
id: the target database ID (e.g. `01234567-89ab-cdef-0123-456789abcdef`)
or the corresponding API Endpoint
(e.g. `https://<ID>-<REGION>.apps.astra.datastax.com`).
api_endpoint: a named alias for the `id` first (positional) parameter,
with the same meaning. It cannot be passed together with `id`.
region: the region to use for connecting to the database. The
database must be located in that region.
The region cannot be specified when the API endoint is used as `id`.
Note that if this parameter is not passed, and cannot be inferred
from the API endpoint, an additional DevOps API request is made
to determine the default region and use it subsequently.
max_time_ms: a timeout, in milliseconds, for the DevOps API
HTTP request should it be necessary (see the `region` argument).
Returns:
An AstraDBDatabaseAdmin instance representing the requested database.
Example:
>>> my_db_admin = my_astra_db_admin.get_database_admin("01234567-...")
>>> my_db_admin.list_namespaces()
['default_keyspace']
>>> my_db_admin.create_namespace("that_other_one")
{'ok': 1}
>>> my_db_admin.list_namespaces()
['default_keyspace', 'that_other_one']
Note:
This method does not perform any admin-level operation through
the DevOps API. For actual creation of a database, see the
`create_database` method.
"""
_id_or_endpoint = normalize_id_endpoint_parameters(id, api_endpoint)
return AstraDBDatabaseAdmin.from_astra_db_admin(
id=_id_or_endpoint,
region=region,
astra_db_admin=self,
max_time_ms=max_time_ms,
)
def get_database(
self,
id: Optional[str] = None,
*,
api_endpoint: Optional[str] = None,
token: Optional[Union[str, TokenProvider]] = None,
namespace: Optional[str] = None,
region: Optional[str] = None,
api_path: Optional[str] = None,
api_version: Optional[str] = None,
max_time_ms: Optional[int] = None,
) -> Database:
"""
Create a Database instance for a specific database, to be used
when doing data-level work (such as creating/managing collections).
Args:
id: the target database ID (e.g. `01234567-89ab-cdef-0123-456789abcdef`)
or the corresponding API Endpoint
(e.g. `https://<ID>-<REGION>.apps.astra.datastax.com`).
api_endpoint: a named alias for the `id` first (positional) parameter,
with the same meaning. It cannot be passed together with `id`.
token: if supplied, is passed to the Database instead of
the one set for this object.
This can be either a literal token string or a subclass of
`astrapy.authentication.TokenProvider`.
namespace: used to specify a certain namespace the resulting
Database will primarily work on. If not specified, similar
as for `region`, an additional DevOps API call reveals
the default namespace for the target database.
region: the region to use for connecting to the database. The
database must be located in that region.
The region cannot be specified when the API endoint is used as `id`.
Note that if this parameter is not passed, and cannot be inferred
from the API endpoint, an additional DevOps API request is made
to determine the default region and use it subsequently.
api_path: path to append to the API Endpoint. In typical usage, this
should be left to its default of "/api/json".
api_version: version specifier to append to the API path. In typical
usage, this should be left to its default of "v1".
max_time_ms: a timeout, in milliseconds, for the DevOps API
HTTP request should it be necessary (see the `region` argument).
Returns:
A Database object ready to be used.
Example:
>>> my_db = my_astra_db_admin.get_database(
... "01234567-...",
... region="us-east1",
... )
>>> coll = my_db.create_collection("movies", dimension=2)
>>> my_coll.insert_one({"title": "The Title", "$vector": [0.3, 0.4]})
Note:
This method does not perform any admin-level operation through
the DevOps API. For actual creation of a database, see the
`create_database` method of class AstraDBAdmin.
"""
# lazy importing here to avoid circular dependency
from astrapy import Database
_id_or_endpoint = normalize_id_endpoint_parameters(id, api_endpoint)
_token = coerce_token_provider(token) or self.token_provider
normalized_api_endpoint = normalize_api_endpoint(
id_or_endpoint=_id_or_endpoint,
region=region,
token=_token,
environment=self.environment,
max_time_ms=max_time_ms,
)
_namespace: str
if namespace:
_namespace = namespace
else:
parsed_api_endpoint = parse_api_endpoint(normalized_api_endpoint)
if parsed_api_endpoint is None:
raise ValueError(
f"Cannot parse the API endpoint ({normalized_api_endpoint})."
)
this_db_info = self.database_info(
parsed_api_endpoint.database_id,
max_time_ms=max_time_ms,
)
_namespace = this_db_info.info.namespace
return Database(
api_endpoint=normalized_api_endpoint,
token=_token,
namespace=_namespace,
caller_name=self._caller_name,
caller_version=self._caller_version,
environment=self.environment,
api_path=api_path,
api_version=api_version,
)
def get_async_database(
self,
id: Optional[str] = None,
*,
api_endpoint: Optional[str] = None,
token: Optional[Union[str, TokenProvider]] = None,
namespace: Optional[str] = None,
region: Optional[str] = None,
api_path: Optional[str] = None,
api_version: Optional[str] = None,
) -> AsyncDatabase:
"""
Create an AsyncDatabase instance for a specific database, to be used
when doing data-level work (such as creating/managing collections).
This method has identical behavior and signature as the sync
counterpart `get_database`: please see that one for more details.
"""
return self.get_database(
id=id,
api_endpoint=api_endpoint,
token=token,
namespace=namespace,
region=region,
api_path=api_path,
api_version=api_version,
).to_async()
class DatabaseAdmin(ABC):
"""
An abstract class defining the interface for a database admin object.
This supports generic namespace crud, as well as spawning databases,
without committing to a specific database architecture (e.g. Astra DB).
"""
environment: str
spawner_database: Union[Database, AsyncDatabase]
@abstractmethod
def list_namespaces(self, *pargs: Any, **kwargs: Any) -> List[str]:
"""Get a list of namespaces for the database."""
...
@abstractmethod
def create_namespace(
self,
name: str,
*,
update_db_namespace: Optional[bool] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""
Create a namespace in the database, returning {'ok': 1} if successful.
"""
...
@abstractmethod
def drop_namespace(self, name: str, *pargs: Any, **kwargs: Any) -> Dict[str, Any]:
"""
Drop (delete) a namespace from the database, returning {'ok': 1} if successful.
"""
...
@abstractmethod
async def async_list_namespaces(self, *pargs: Any, **kwargs: Any) -> List[str]:
"""
Get a list of namespaces for the database.
(Async version of the method.)
"""
...
@abstractmethod
async def async_create_namespace(
self, name: str, *, update_db_namespace: Optional[bool] = None, **kwargs: Any
) -> Dict[str, Any]:
"""
Create a namespace in the database, returning {'ok': 1} if successful.
(Async version of the method.)
"""
...
@abstractmethod
async def async_drop_namespace(
self, name: str, *pargs: Any, **kwargs: Any
) -> Dict[str, Any]:
"""
Drop (delete) a namespace from the database, returning {'ok': 1} if successful.
(Async version of the method.)
"""
...
@abstractmethod
def get_database(self, *pargs: Any, **kwargs: Any) -> Database:
"""Get a Database object from this database admin."""
...
@abstractmethod
def get_async_database(self, *pargs: Any, **kwargs: Any) -> AsyncDatabase:
"""Get an AsyncDatabase object from this database admin."""
...
@abstractmethod
def find_embedding_providers(
self, *pargs: Any, **kwargs: Any
) -> FindEmbeddingProvidersResult:
"""Query the Data API for the available embedding providers."""
...
@abstractmethod
async def async_find_embedding_providers(
self, *pargs: Any, **kwargs: Any
) -> FindEmbeddingProvidersResult:
"""
Query the Data API for the available embedding providers.
(Async version of the method.)
"""
...
class AstraDBDatabaseAdmin(DatabaseAdmin):
"""
An "admin" object, able to perform administrative tasks at the namespaces level
(i.e. within a certain database), such as creating/listing/dropping namespaces.
This is one layer below the AstraDBAdmin concept, in that it is tied to
a single database and enables admin work within it. As such, it is generally
created by a method call on an AstraDBAdmin.
Args:
id: the target database ID (e.g. `01234567-89ab-cdef-0123-456789abcdef`)
or the corresponding API Endpoint
(e.g. `https://<ID>-<REGION>.apps.astra.datastax.com`).
api_endpoint: a named alias for the `id` first (positional) parameter,
with the same meaning. It cannot be passed together with `id`.
token: an access token with enough permission to perform admin tasks.
This can be either a literal token string or a subclass of
`astrapy.authentication.TokenProvider`.
region: the region to use for connecting to the database. The
database must be located in that region.
The region cannot be specified when the API endoint is used as `id`.
Note that if this parameter is not passed, and cannot be inferred
from the API endpoint, an additional DevOps API request is made
to determine the default region and use it subsequently.
environment: a label, whose value is one of Environment.PROD (default),
Environment.DEV or Environment.TEST.
caller_name: name of the application, or framework, on behalf of which
the DevOps API calls are performed. This ends up in the request user-agent.
caller_version: version of the caller.
dev_ops_url: in case of custom deployments, this can be used to specify
the URL to the DevOps API, such as "https://api.astra.datastax.com".
Generally it can be omitted. The environment (prod/dev/...) is
determined from the API Endpoint.
dev_ops_api_version: this can specify a custom version of the DevOps API
(such as "v2"). Generally not needed.
api_path: path to append to the API Endpoint. In typical usage, this
class is created by a method such as `Database.get_database_admin()`,
which passes the matching value. Generally to be left to its Astra DB
default of "/api/json".
api_version: version specifier to append to the API path. In typical
usage, this class is created by a method such as
`Database.get_database_admin()`, which passes the matching value.
Generally to be left to its Astra DB default of "/v1".
spawner_database: either a Database or an AsyncDatabase instance. This represents
the database class which spawns this admin object, so that, if required,
a namespace creation can retroactively "use" the new namespace in the spawner.
Used to enable the Async/Database.get_admin_database().create_namespace() pattern.
max_time_ms: a timeout, in milliseconds, for the DevOps API
HTTP request should it be necessary (see the `region` argument).
Example:
>>> from astrapy import DataAPIClient
>>> my_client = DataAPIClient("AstraCS:...")
>>> admin_for_my_db = my_client.get_admin().get_database_admin("01234567-...")
>>> admin_for_my_db.list_namespaces()
['default_keyspace', 'staging_namespace']
>>> admin_for_my_db.info().status
'ACTIVE'
Note:
creating an instance of AstraDBDatabaseAdmin does not trigger actual creation
of the database itself, which should exist beforehand. To create databases,
see the AstraDBAdmin class.
"""
def __init__(
self,
id: Optional[str] = None,
*,
api_endpoint: Optional[str] = None,
token: Optional[Union[str, TokenProvider]] = None,
region: Optional[str] = None,
environment: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
dev_ops_url: Optional[str] = None,
dev_ops_api_version: Optional[str] = None,
api_path: Optional[str] = None,
api_version: Optional[str] = None,
spawner_database: Optional[Union[Database, AsyncDatabase]] = None,
max_time_ms: Optional[int] = None,
) -> None:
# lazy import here to avoid circular dependency
from astrapy.database import Database
self.token_provider = coerce_token_provider(token)
self.environment = (environment or Environment.PROD).lower()
_id_or_endpoint = normalize_id_endpoint_parameters(id, api_endpoint)
normalized_api_endpoint = normalize_api_endpoint(
id_or_endpoint=_id_or_endpoint,
region=region,
token=self.token_provider,
environment=self.environment,
max_time_ms=max_time_ms,
)
self.api_endpoint = normalized_api_endpoint
parsed_api_endpoint = parse_api_endpoint(self.api_endpoint)
if parsed_api_endpoint is None:
raise ValueError(
f"Cannot parse the provided API endpoint ({self.api_endpoint})."
)
self._database_id = parsed_api_endpoint.database_id
self._region = parsed_api_endpoint.region
if parsed_api_endpoint.environment != self.environment:
raise ValueError(
"Environment mismatch between client and provided "
"API endpoint. You can try adding "
f'`environment="{parsed_api_endpoint.environment}"` '
"to the class constructor."
)
#
self.caller_name = caller_name
self.caller_version = caller_version
self._astra_db_admin = AstraDBAdmin(
token=self.token_provider,
environment=self.environment,
caller_name=self.caller_name,
caller_version=self.caller_version,
dev_ops_url=dev_ops_url,
dev_ops_api_version=dev_ops_api_version,
)
# API Commander (for the vectorizeOps invocations)
self.api_path = (
api_path if api_path is not None else API_PATH_ENV_MAP[self.environment]
)
self.api_version = (
api_version
if api_version is not None
else API_VERSION_ENV_MAP[self.environment]
)
self._commander_headers = {
DEFAULT_AUTH_HEADER: self.token_provider.get_token(),
}
self._api_commander = APICommander(
api_endpoint=self.api_endpoint,
path="/".join(comp for comp in [self.api_path, self.api_version] if comp),
headers=self._commander_headers,
callers=[(self.caller_name, self.caller_version)],
)
if spawner_database is not None:
self.spawner_database = spawner_database
else:
# leaving the namespace to its per-environment default
# (a task for the Database)
self.spawner_database = Database(
api_endpoint=self.api_endpoint,
token=self.token_provider,
namespace=None,
caller_name=self.caller_name,
caller_version=self.caller_version,
environment=self.environment,
api_path=self.api_path,
api_version=self.api_version,
)
def __repr__(self) -> str:
env_desc: str
if self.environment == Environment.PROD:
env_desc = ""
else:
env_desc = f', environment="{self.environment}"'
return (
f'{self.__class__.__name__}(api_endpoint="{self.api_endpoint}", '
f'"{str(self.token_provider)[:12]}..."{env_desc})'
)
def __eq__(self, other: Any) -> bool:
if isinstance(other, AstraDBDatabaseAdmin):
return all(
[
self.api_endpoint == other.api_endpoint,
self.token_provider == other.token_provider,
self.environment == other.environment,
self._astra_db_admin == other._astra_db_admin,
]
)
else:
return False
def _copy(
self,
id: Optional[str] = None,
token: Optional[Union[str, TokenProvider]] = None,
region: Optional[str] = None,
environment: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
dev_ops_url: Optional[str] = None,
dev_ops_api_version: Optional[str] = None,
) -> AstraDBDatabaseAdmin:
return AstraDBDatabaseAdmin(
id=id or self._database_id,
token=coerce_token_provider(token) or self.token_provider,
region=region or self._region,
environment=environment or self.environment,
caller_name=caller_name or self._astra_db_admin._caller_name,
caller_version=caller_version or self._astra_db_admin._caller_version,
dev_ops_url=dev_ops_url or self._astra_db_admin._dev_ops_url,
dev_ops_api_version=dev_ops_api_version
or self._astra_db_admin._dev_ops_api_version,
)
def with_options(
self,
*,
id: Optional[str] = None,
token: Optional[Union[str, TokenProvider]] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> AstraDBDatabaseAdmin:
"""
Create a clone of this AstraDBDatabaseAdmin with some changed attributes.
Args:
id: e. g. "01234567-89ab-cdef-0123-456789abcdef".
token: an Access Token to the database. Example: `"AstraCS:xyz..."`.
This can be either a literal token string or a subclass of
`astrapy.authentication.TokenProvider`.
caller_name: name of the application, or framework, on behalf of which
the Data API and DevOps API calls are performed. This ends up in
the request user-agent.
caller_version: version of the caller.
Returns:
a new AstraDBDatabaseAdmin instance.
Example:
>>> admin_for_my_other_db = admin_for_my_db.with_options(
... id="abababab-0101-2323-4545-6789abcdef01",
... )
"""
return self._copy(
id=id,
token=token,
caller_name=caller_name,
caller_version=caller_version,
)
def set_caller(
self,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> None:
"""
Set a new identity for the application/framework on behalf of which
the DevOps API calls will be performed (the "caller").
New objects spawned from this client afterwards will inherit the new settings.
Args:
caller_name: name of the application, or framework, on behalf of which
the DevOps API calls are performed. This ends up in the request user-agent.
caller_version: version of the caller.
Example:
>>> admin_for_my_db.set_caller(
... caller_name="the_caller",
... caller_version="0.1.0",
... )
"""
logger.info(f"setting caller to {caller_name}/{caller_version}")
self._astra_db_admin.set_caller(caller_name, caller_version)
@property
def id(self) -> str:
"""
The ID of this database admin.
Example:
>>> my_db_admin.id
'01234567-89ab-cdef-0123-456789abcdef'
"""
return self._database_id
@property
def region(self) -> str:
"""
The region for this database admin.
Example:
>>> my_db_admin.region
'us-east-1'
"""
return self._region
@staticmethod
def from_astra_db_admin(
id: str,
*,
region: Optional[str],
astra_db_admin: AstraDBAdmin,
max_time_ms: Optional[int] = None,
) -> AstraDBDatabaseAdmin:
"""
Create an AstraDBDatabaseAdmin from an AstraDBAdmin and a database ID.
Args:
id: the target database ID (e.g. `01234567-89ab-cdef-0123-456789abcdef`)
or the corresponding API Endpoint
(e.g. `https://<ID>-<REGION>.apps.astra.datastax.com`).
region: the region to use for connecting to the database. The
database must be located in that region.
The region cannot be specified when the API endoint is used as `id`.
Note that if this parameter is not passed, and cannot be inferred
from the API endpoint, an additional DevOps API request is made
to determine the default region and use it subsequently.
astra_db_admin: an AstraDBAdmin object that has visibility over
the target database.
max_time_ms: a timeout, in milliseconds, for the DevOps API
HTTP request should it be necessary (see the `region` argument).
Returns:
An AstraDBDatabaseAdmin object, for admin work within the database.
Example:
>>> from astrapy import DataAPIClient, AstraDBDatabaseAdmin
>>> admin_for_my_db = AstraDBDatabaseAdmin.from_astra_db_admin(
... id="01234567-...",
... astra_db_admin=DataAPIClient("AstraCS:...").get_admin(),
... )
>>> admin_for_my_db.list_namespaces()
['default_keyspace', 'staging_namespace']
>>> admin_for_my_db.info().status
'ACTIVE'
Note:
Creating an instance of AstraDBDatabaseAdmin does not trigger actual creation
of the database itself, which should exist beforehand. To create databases,
see the AstraDBAdmin class.
"""
return AstraDBDatabaseAdmin(
id=id,
token=astra_db_admin.token_provider,
region=region,
environment=astra_db_admin.environment,
caller_name=astra_db_admin._caller_name,
caller_version=astra_db_admin._caller_version,
dev_ops_url=astra_db_admin._dev_ops_url,
dev_ops_api_version=astra_db_admin._dev_ops_api_version,
max_time_ms=max_time_ms,
)
@staticmethod
def from_api_endpoint(
api_endpoint: str,
*,
token: Optional[Union[str, TokenProvider]] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
dev_ops_url: Optional[str] = None,
dev_ops_api_version: Optional[str] = None,
) -> AstraDBDatabaseAdmin:
"""
Create an AstraDBDatabaseAdmin from an API Endpoint and optionally a token.
Args:
api_endpoint: a full API endpoint for the Data Api.
token: an access token with enough permissions to do admin work.
This can be either a literal token string or a subclass of
`astrapy.authentication.TokenProvider`.
caller_name: name of the application, or framework, on behalf of which
the DevOps API calls are performed. This ends up in the request user-agent.
caller_version: version of the caller.
dev_ops_url: in case of custom deployments, this can be used to specify
the URL to the DevOps API, such as "https://api.astra.datastax.com".
Generally it can be omitted. The environment (prod/dev/...) is
determined from the API Endpoint.
dev_ops_api_version: this can specify a custom version of the DevOps API
(such as "v2"). Generally not needed.
Returns:
An AstraDBDatabaseAdmin object, for admin work within the database.
Example:
>>> from astrapy import AstraDBDatabaseAdmin
>>> admin_for_my_db = AstraDBDatabaseAdmin.from_api_endpoint(
... api_endpoint="https://01234567-....apps.astra.datastax.com",
... token="AstraCS:...",
... )
>>> admin_for_my_db.list_namespaces()
['default_keyspace', 'another_namespace']
>>> admin_for_my_db.info().status
'ACTIVE'
Note:
Creating an instance of AstraDBDatabaseAdmin does not trigger actual creation
of the database itself, which should exist beforehand. To create databases,
see the AstraDBAdmin class.
"""
parsed_api_endpoint = parse_api_endpoint(api_endpoint)
if parsed_api_endpoint:
return AstraDBDatabaseAdmin(
id=parsed_api_endpoint.database_id,
token=token,
region=parsed_api_endpoint.region,
environment=parsed_api_endpoint.environment,
caller_name=caller_name,
caller_version=caller_version,
dev_ops_url=dev_ops_url,
dev_ops_api_version=dev_ops_api_version,
)
else:
raise ValueError("Cannot parse the provided API endpoint.")
def info(self, *, max_time_ms: Optional[int] = None) -> AdminDatabaseInfo:
"""
Query the DevOps API for the full info on this database.
Args:
max_time_ms: a timeout, in milliseconds, for the DevOps API request.
Returns:
An AdminDatabaseInfo object.
Example:
>>> my_db_info = admin_for_my_db.info()
>>> my_db_info.status
'ACTIVE'
>>> my_db_info.info.region
'us-east1'
"""
logger.info(f"getting info ('{self._database_id}')")
req_response = self._astra_db_admin.database_info(
id=self._database_id,
max_time_ms=max_time_ms,
)
logger.info(f"finished getting info ('{self._database_id}')")
return req_response # type: ignore[no-any-return]
async def async_info(
self, *, max_time_ms: Optional[int] = None
) -> AdminDatabaseInfo:
"""
Query the DevOps API for the full info on this database.
Async version of the method, for use in an asyncio context.
Args:
max_time_ms: a timeout, in milliseconds, for the DevOps API request.
Returns:
An AdminDatabaseInfo object.
Example:
>>> async def wait_until_active(db_admin: AstraDBDatabaseAdmin) -> None:
... while True:
... info = await db_admin.async_info()
... if info.status == "ACTIVE":
... return
...
>>> asyncio.run(wait_until_active(admin_for_my_db))
"""
logger.info(f"getting info ('{self._database_id}'), async")
req_response = await self._astra_db_admin.async_database_info(
id=self._database_id,
max_time_ms=max_time_ms,
)
logger.info(f"finished getting info ('{self._database_id}'), async")
return req_response # type: ignore[no-any-return]
def list_namespaces(self, *, max_time_ms: Optional[int] = None) -> List[str]:
"""
Query the DevOps API for a list of the namespaces in the database.
Args:
max_time_ms: a timeout, in milliseconds, for the DevOps API request.
Returns:
A list of the namespaces, each a string, in no particular order.
Example:
>>> admin_for_my_db.list_namespaces()
['default_keyspace', 'staging_namespace']
"""
logger.info(f"getting namespaces ('{self._database_id}')")
info = self.info(max_time_ms=max_time_ms)
logger.info(f"finished getting namespaces ('{self._database_id}')")
if info.raw_info is None:
raise DevOpsAPIException("Could not get the namespace list.")
else:
return info.raw_info["info"]["keyspaces"] # type: ignore[no-any-return]
async def async_list_namespaces(
self, *, max_time_ms: Optional[int] = None
) -> List[str]:
"""
Query the DevOps API for a list of the namespaces in the database.
Async version of the method, for use in an asyncio context.
Args:
max_time_ms: a timeout, in milliseconds, for the DevOps API request.
Returns:
A list of the namespaces, each a string, in no particular order.
Example:
>>> async def check_if_ns_exists(
... db_admin: AstraDBDatabaseAdmin, namespace: str
... ) -> bool:
... ns_list = await db_admin.async_list_namespaces()
... return namespace in ns_list
...
>>> asyncio.run(check_if_ns_exists(admin_for_my_db, "dragons"))
False
>>> asyncio.run(check_if_db_exists(admin_for_my_db, "app_namespace"))
True
"""
logger.info(f"getting namespaces ('{self._database_id}'), async")
info = await self.async_info(max_time_ms=max_time_ms)
logger.info(f"finished getting namespaces ('{self._database_id}'), async")
if info.raw_info is None:
raise DevOpsAPIException("Could not get the namespace list.")
else:
return info.raw_info["info"]["keyspaces"] # type: ignore[no-any-return]
@ops_recast_method_sync
def create_namespace(
self,
name: str,
*,
wait_until_active: bool = True,
update_db_namespace: Optional[bool] = None,
max_time_ms: Optional[int] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""
Create a namespace in this database as requested,
optionally waiting for it to be ready.
Args:
name: the namespace name. If supplying a namespace that exists
already, the method call proceeds as usual, no errors are
raised, and the whole invocation is a no-op.
wait_until_active: if True (default), the method returns only after
the target database is in ACTIVE state again (a few
seconds, usually). If False, it will return right after issuing the
creation request to the DevOps API, and it will be responsibility
of the caller to check the database status/namespace availability
before working with it.
update_db_namespace: if True, the `Database` or `AsyncDatabase` class
that spawned this DatabaseAdmin, if any, gets updated to work on
the newly-created namespace starting when this method returns.
max_time_ms: a timeout, in milliseconds, for the whole requested
operation to complete.
Note that a timeout is no guarantee that the creation request
has not reached the API server.
Returns:
A dictionary of the form {"ok": 1} in case of success.
Otherwise, an exception is raised.
Example:
>>> my_db_admin.list_namespaces()
['default_keyspace']
>>> my_db_admin.create_namespace("that_other_one")
{'ok': 1}
>>> my_db_admin.list_namespaces()
['default_keyspace', 'that_other_one']
"""
timeout_manager = MultiCallTimeoutManager(
overall_max_time_ms=max_time_ms, exception_type="devops_api"
)
logger.info(f"creating namespace '{name}' on '{self._database_id}'")
cn_response = self._astra_db_admin._astra_db_ops.create_keyspace(
database=self._database_id,
keyspace=name,
timeout_info=base_timeout_info(max_time_ms),
)
logger.info(
f"devops api returned from creating namespace '{name}' on '{self._database_id}'"
)
if cn_response is not None and name == cn_response.get("name"):
if wait_until_active:
last_status_seen = STATUS_MAINTENANCE
while last_status_seen == STATUS_MAINTENANCE:
logger.info(f"sleeping to poll for status of '{self._database_id}'")
time.sleep(DATABASE_POLL_NAMESPACE_SLEEP_TIME)
last_status_seen = self.info(
max_time_ms=timeout_manager.remaining_timeout_ms(),
).status
if last_status_seen != STATUS_ACTIVE:
raise DevOpsAPIException(
f"Database entered unexpected status {last_status_seen} after MAINTENANCE."
)
# is the namespace found?
if name not in self.list_namespaces():
raise DevOpsAPIException("Could not create the namespace.")
logger.info(
f"finished creating namespace '{name}' on '{self._database_id}'"
)
if update_db_namespace:
self.spawner_database.use_namespace(name)
return {"ok": 1}
else:
raise DevOpsAPIException(
f"Could not issue a successful create-namespace DevOps API request for {name}."
)
# the 'override' is because the error-recast decorator washes out the signature
@ops_recast_method_async
async def async_create_namespace( # type: ignore[override]
self,
name: str,
*,
wait_until_active: bool = True,
update_db_namespace: Optional[bool] = None,
max_time_ms: Optional[int] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""
Create a namespace in this database as requested,
optionally waiting for it to be ready.
Async version of the method, for use in an asyncio context.
Args:
name: the namespace name. If supplying a namespace that exists
already, the method call proceeds as usual, no errors are
raised, and the whole invocation is a no-op.
wait_until_active: if True (default), the method returns only after
the target database is in ACTIVE state again (a few
seconds, usually). If False, it will return right after issuing the
creation request to the DevOps API, and it will be responsibility
of the caller to check the database status/namespace availability
before working with it.
update_db_namespace: if True, the `Database` or `AsyncDatabase` class
that spawned this DatabaseAdmin, if any, gets updated to work on
the newly-created namespace starting when this method returns.
max_time_ms: a timeout, in milliseconds, for the whole requested
operation to complete.
Note that a timeout is no guarantee that the creation request
has not reached the API server.
Returns:
A dictionary of the form {"ok": 1} in case of success.
Otherwise, an exception is raised.
Example:
>>> asyncio.run(
... my_db_admin.async_create_namespace("app_namespace")
... )
{'ok': 1}
"""
timeout_manager = MultiCallTimeoutManager(
overall_max_time_ms=max_time_ms, exception_type="devops_api"
)
logger.info(f"creating namespace '{name}' on '{self._database_id}', async")
cn_response = await self._astra_db_admin._astra_db_ops.async_create_keyspace(
database=self._database_id,
keyspace=name,
timeout_info=base_timeout_info(max_time_ms),
)
logger.info(
f"devops api returned from creating namespace "
f"'{name}' on '{self._database_id}', async"
)
if cn_response is not None and name == cn_response.get("name"):
if wait_until_active:
last_status_seen = STATUS_MAINTENANCE
while last_status_seen == STATUS_MAINTENANCE:
logger.info(
f"sleeping to poll for status of '{self._database_id}', async"
)
await asyncio.sleep(DATABASE_POLL_NAMESPACE_SLEEP_TIME)
last_db_info = await self.async_info(
max_time_ms=timeout_manager.remaining_timeout_ms(),
)
last_status_seen = last_db_info.status
if last_status_seen != STATUS_ACTIVE:
raise DevOpsAPIException(
f"Database entered unexpected status {last_status_seen} after MAINTENANCE."
)
# is the namespace found?
if name not in await self.async_list_namespaces():
raise DevOpsAPIException("Could not create the namespace.")
logger.info(
f"finished creating namespace '{name}' on '{self._database_id}', async"
)
if update_db_namespace:
self.spawner_database.use_namespace(name)
return {"ok": 1}
else:
raise DevOpsAPIException(
f"Could not issue a successful create-namespace DevOps API request for {name}."
)
@ops_recast_method_sync
def drop_namespace(
self,
name: str,
*,
wait_until_active: bool = True,
max_time_ms: Optional[int] = None,
) -> Dict[str, Any]:
"""
Delete a namespace from the database, optionally waiting for it
to become active again.
Args:
name: the namespace to delete. If it does not exist in this database,
an error is raised.
wait_until_active: if True (default), the method returns only after
the target database is in ACTIVE state again (a few
seconds, usually). If False, it will return right after issuing the
deletion request to the DevOps API, and it will be responsibility
of the caller to check the database status/namespace availability
before working with it.
max_time_ms: a timeout, in milliseconds, for the whole requested
operation to complete.
Note that a timeout is no guarantee that the deletion request
has not reached the API server.
Returns:
A dictionary of the form {"ok": 1} in case of success.
Otherwise, an exception is raised.
Example:
>>> my_db_admin.list_namespaces()
['default_keyspace', 'that_other_one']
>>> my_db_admin.drop_namespace("that_other_one")
{'ok': 1}
>>> my_db_admin.list_namespaces()
['default_keyspace']
"""
timeout_manager = MultiCallTimeoutManager(
overall_max_time_ms=max_time_ms, exception_type="devops_api"
)
logger.info(f"dropping namespace '{name}' on '{self._database_id}'")
dk_response = self._astra_db_admin._astra_db_ops.delete_keyspace(
database=self._database_id,
keyspace=name,
timeout_info=base_timeout_info(max_time_ms),
)
logger.info(
f"devops api returned from dropping namespace '{name}' on '{self._database_id}'"
)
if dk_response == name:
if wait_until_active:
last_status_seen = STATUS_MAINTENANCE
while last_status_seen == STATUS_MAINTENANCE:
logger.info(f"sleeping to poll for status of '{self._database_id}'")
time.sleep(DATABASE_POLL_NAMESPACE_SLEEP_TIME)
last_status_seen = self.info(
max_time_ms=timeout_manager.remaining_timeout_ms(),
).status
if last_status_seen != STATUS_ACTIVE:
raise DevOpsAPIException(
f"Database entered unexpected status {last_status_seen} after MAINTENANCE."
)
# is the namespace found?
if name in self.list_namespaces():
raise DevOpsAPIException("Could not drop the namespace.")
logger.info(
f"finished dropping namespace '{name}' on '{self._database_id}'"
)
return {"ok": 1}
else:
raise DevOpsAPIException(
f"Could not issue a successful delete-namespace DevOps API request for {name}."
)
# the 'override' is because the error-recast decorator washes out the signature
@ops_recast_method_async
async def async_drop_namespace( # type: ignore[override]
self,
name: str,
*,
wait_until_active: bool = True,
max_time_ms: Optional[int] = None,
) -> Dict[str, Any]:
"""
Delete a namespace from the database, optionally waiting for it
to become active again.
Async version of the method, for use in an asyncio context.
Args:
name: the namespace to delete. If it does not exist in this database,
an error is raised.
wait_until_active: if True (default), the method returns only after
the target database is in ACTIVE state again (a few
seconds, usually). If False, it will return right after issuing the
deletion request to the DevOps API, and it will be responsibility
of the caller to check the database status/namespace availability
before working with it.
max_time_ms: a timeout, in milliseconds, for the whole requested
operation to complete.
Note that a timeout is no guarantee that the deletion request
has not reached the API server.
Returns:
A dictionary of the form {"ok": 1} in case of success.
Otherwise, an exception is raised.
Example:
>>> asyncio.run(
... my_db_admin.async_drop_namespace("app_namespace")
... )
{'ok': 1}
"""
timeout_manager = MultiCallTimeoutManager(
overall_max_time_ms=max_time_ms, exception_type="devops_api"
)
logger.info(f"dropping namespace '{name}' on '{self._database_id}', async")
dk_response = await self._astra_db_admin._astra_db_ops.async_delete_keyspace(
database=self._database_id,
keyspace=name,
timeout_info=base_timeout_info(max_time_ms),
)
logger.info(
f"devops api returned from dropping namespace "
f"'{name}' on '{self._database_id}', async"
)
if dk_response == name:
if wait_until_active:
last_status_seen = STATUS_MAINTENANCE
while last_status_seen == STATUS_MAINTENANCE:
logger.info(
f"sleeping to poll for status of '{self._database_id}', async"
)
await asyncio.sleep(DATABASE_POLL_NAMESPACE_SLEEP_TIME)
last_db_info = await self.async_info(
max_time_ms=timeout_manager.remaining_timeout_ms(),
)
last_status_seen = last_db_info.status
if last_status_seen != STATUS_ACTIVE:
raise DevOpsAPIException(
f"Database entered unexpected status {last_status_seen} after MAINTENANCE."
)
# is the namespace found?
if name in await self.async_list_namespaces():
raise DevOpsAPIException("Could not drop the namespace.")
logger.info(
f"finished dropping namespace '{name}' on '{self._database_id}', async"
)
return {"ok": 1}
else:
raise DevOpsAPIException(
f"Could not issue a successful delete-namespace DevOps API request for {name}."
)
def drop(
self,
*,
wait_until_active: bool = True,
max_time_ms: Optional[int] = None,
) -> Dict[str, Any]:
"""
Drop this database, i.e. delete it completely and permanently with all its data.
This method wraps the `drop_database` method of the AstraDBAdmin class,
where more information may be found.
Args:
wait_until_active: if True (default), the method returns only after
the database has actually been deleted (generally a few minutes).
If False, it will return right after issuing the
drop request to the DevOps API, and it will be responsibility
of the caller to check the database status/availability
after that, if desired.
max_time_ms: a timeout, in milliseconds, for the whole requested
operation to complete.
Note that a timeout is no guarantee that the deletion request
has not reached the API server.
Returns:
A dictionary of the form {"ok": 1} in case of success.
Otherwise, an exception is raised.
Example:
>>> my_db_admin.list_namespaces()
['default_keyspace', 'that_other_one']
>>> my_db_admin.drop()
{'ok': 1}
>>> my_db_admin.list_namespaces() # raises a 404 Not Found http error
Note:
Once the method succeeds, methods on this object -- such as `info()`,
or `list_namespaces()` -- can still be invoked: however, this hardly
makes sense as the underlying actual database is no more.
It is responsibility of the developer to design a correct flow
which avoids using a deceased database any further.
"""
logger.info(f"dropping this database ('{self._database_id}')")
return self._astra_db_admin.drop_database( # type: ignore[no-any-return]
id=self._database_id,
wait_until_active=wait_until_active,
max_time_ms=max_time_ms,
)
logger.info(f"finished dropping this database ('{self._database_id}')")
async def async_drop(
self,
*,
wait_until_active: bool = True,
max_time_ms: Optional[int] = None,
) -> Dict[str, Any]:
"""
Drop this database, i.e. delete it completely and permanently with all its data.
Async version of the method, for use in an asyncio context.
This method wraps the `drop_database` method of the AstraDBAdmin class,
where more information may be found.
Args:
wait_until_active: if True (default), the method returns only after
the database has actually been deleted (generally a few minutes).
If False, it will return right after issuing the
drop request to the DevOps API, and it will be responsibility
of the caller to check the database status/availability
after that, if desired.
max_time_ms: a timeout, in milliseconds, for the whole requested
operation to complete.
Note that a timeout is no guarantee that the deletion request
has not reached the API server.
Returns:
A dictionary of the form {"ok": 1} in case of success.
Otherwise, an exception is raised.
Example:
>>> asyncio.run(my_db_admin.async_drop())
{'ok': 1}
Note:
Once the method succeeds, methods on this object -- such as `info()`,
or `list_namespaces()` -- can still be invoked: however, this hardly
makes sense as the underlying actual database is no more.
It is responsibility of the developer to design a correct flow
which avoids using a deceased database any further.
"""
logger.info(f"dropping this database ('{self._database_id}'), async")
return await self._astra_db_admin.async_drop_database( # type: ignore[no-any-return]
id=self._database_id,
wait_until_active=wait_until_active,
max_time_ms=max_time_ms,
)
logger.info(f"finished dropping this database ('{self._database_id}'), async")
def get_database(
self,
*,
token: Optional[Union[str, TokenProvider]] = None,
namespace: Optional[str] = None,
region: Optional[str] = None,
api_path: Optional[str] = None,
api_version: Optional[str] = None,
max_time_ms: Optional[int] = None,
) -> Database:
"""
Create a Database instance from this database admin, for data-related tasks.
Args:
token: if supplied, is passed to the Database instead of
the one set for this object. Useful if one wants to work in
a least-privilege manner, limiting the permissions for non-admin work.
This can be either a literal token string or a subclass of
`astrapy.authentication.TokenProvider`.
namespace: an optional namespace to set in the resulting Database.
The same default logic as for `AstraDBAdmin.get_database` applies.
region: *This parameter is deprecated and should not be used.*
Ignored in the method.
api_path: path to append to the API Endpoint. In typical usage, this
should be left to its default of "/api/json".
api_version: version specifier to append to the API path. In typical
usage, this should be left to its default of "v1".
Returns:
A Database object, ready to be used for working with data and collections.
Example:
>>> my_db = my_db_admin.get_database()
>>> my_db.list_collection_names()
['movies', 'another_collection']
Note:
creating an instance of Database does not trigger actual creation
of the database itself, which should exist beforehand. To create databases,
see the AstraDBAdmin class.
"""
if region is not None:
the_warning = DeprecatedWarning(
"The 'region' parameter is deprecated in this method and will be ignored.",
deprecated_in="1.3.2",
removed_in="2.0.0",
details="The database class whose method is invoked already has a region set.",
)
warnings.warn(
the_warning,
stacklevel=2,
)
return self._astra_db_admin.get_database(
id=self.api_endpoint,
token=token,
namespace=namespace,
api_path=api_path,
api_version=api_version,
max_time_ms=max_time_ms,
)
def get_async_database(
self,
*,
token: Optional[Union[str, TokenProvider]] = None,
namespace: Optional[str] = None,
region: Optional[str] = None,
api_path: Optional[str] = None,
api_version: Optional[str] = None,
max_time_ms: Optional[int] = None,
) -> AsyncDatabase:
"""
Create an AsyncDatabase instance out of this class for working
with the data in it.
This method has identical behavior and signature as the sync
counterpart `get_database`: please see that one for more details.
"""
return self.get_database(
token=token,
namespace=namespace,
region=region,
api_path=api_path,
api_version=api_version,
max_time_ms=max_time_ms,
).to_async()
def find_embedding_providers(
self, *, max_time_ms: Optional[int] = None
) -> FindEmbeddingProvidersResult:
"""
Query the API for the full information on available embedding providers.
Args:
max_time_ms: a timeout, in milliseconds, for the DevOps API request.
Returns:
A `FindEmbeddingProvidersResult` object with the complete information
returned by the API about available embedding providers
Example (output abridged and indented for clarity):
>>> admin_for_my_db.find_embedding_providers()
FindEmbeddingProvidersResult(embedding_providers=..., openai, ...)
>>> admin_for_my_db.find_embedding_providers().embedding_providers
{
'openai': EmbeddingProvider(
display_name='OpenAI',
models=[
EmbeddingProviderModel(name='text-embedding-3-small'),
...
]
),
...
}
"""
logger.info("getting list of embedding providers")
fe_response = self._api_commander.request(
payload={"findEmbeddingProviders": {}},
timeout_info=base_timeout_info(max_time_ms),
)
if "embeddingProviders" not in fe_response.get("status", {}):
raise DataAPIFaultyResponseException(
text="Faulty response from findEmbeddingProviders API command.",
raw_response=fe_response,
)
else:
logger.info("finished getting list of embedding providers")
return FindEmbeddingProvidersResult.from_dict(fe_response["status"])
async def async_find_embedding_providers(
self, *, max_time_ms: Optional[int] = None
) -> FindEmbeddingProvidersResult:
"""
Query the API for the full information on available embedding providers.
Async version of the method, for use in an asyncio context.
Args:
max_time_ms: a timeout, in milliseconds, for the DevOps API request.
Returns:
A `FindEmbeddingProvidersResult` object with the complete information
returned by the API about available embedding providers
Example (output abridged and indented for clarity):
>>> admin_for_my_db.find_embedding_providers()
FindEmbeddingProvidersResult(embedding_providers=..., openai, ...)
>>> admin_for_my_db.find_embedding_providers().embedding_providers
{
'openai': EmbeddingProvider(
display_name='OpenAI',
models=[
EmbeddingProviderModel(name='text-embedding-3-small'),
...
]
),
...
}
"""
logger.info("getting list of embedding providers, async")
fe_response = await self._api_commander.async_request(
payload={"findEmbeddingProviders": {}},
timeout_info=base_timeout_info(max_time_ms),
)
if "embeddingProviders" not in fe_response.get("status", {}):
raise DataAPIFaultyResponseException(
text="Faulty response from findEmbeddingProviders API command.",
raw_response=fe_response,
)
else:
logger.info("finished getting list of embedding providers, async")
return FindEmbeddingProvidersResult.from_dict(fe_response["status"])
class DataAPIDatabaseAdmin(DatabaseAdmin):
"""
An "admin" object for non-Astra Data API environments, to perform administrative
tasks at the namespaces level such as creating/listing/dropping namespaces.
Conforming to the architecture of non-Astra deployments of the Data API,
this object works within the one existing database. It is within that database
that the namespace CRUD operations (and possibly other admin operations)
are performed. Since non-Astra environment lack the concept of an overall
admin (such as the all-databases AstraDBAdmin class), a `DataAPIDatabaseAdmin`
is generally created by invoking the `get_database_admin` method of the
corresponding `Database` object (which in turn is spawned by a DataAPIClient).
Args:
api_endpoint: the full URI to access the Data API,
e.g. "http://localhost:8181".
token: an access token with enough permission to perform admin tasks.
This can be either a literal token string or a subclass of
`astrapy.authentication.TokenProvider`.
environment: a label, whose value is one of Environment.OTHER (default)
or other non-Astra environment values in the `Environment` enum.
api_path: path to append to the API Endpoint. In typical usage, this
class is created by a method such as `Database.get_database_admin()`,
which passes the matching value. Defaults to this portion of the path
being absent.
api_version: version specifier to append to the API path. In typical
usage, this class is created by a method such as
`Database.get_database_admin()`, which passes the matching value.
Defaults to this portion of the path being absent.
caller_name: name of the application, or framework, on behalf of which
the admin API calls are performed. This ends up in the request user-agent.
caller_version: version of the caller.
spawner_database: either a Database or an AsyncDatabase instance. This represents
the database class which spawns this admin object, so that, if required,
a namespace creation can retroactively "use" the new namespace in the spawner.
Used to enable the Async/Database.get_admin_database().create_namespace() pattern.
Example:
>>> from astrapy import DataAPIClient
>>> from astrapy.constants import Environment
>>> from astrapy.authentication import UsernamePasswordTokenProvider
>>>
>>> token_provider = UsernamePasswordTokenProvider("username", "password")
>>> endpoint = "http://localhost:8181"
>>>
>>> client = DataAPIClient(
>>> token=token_provider,
>>> environment=Environment.OTHER,
>>> )
>>> database = client.get_database(endpoint)
>>> admin_for_my_db = database.get_database_admin()
>>>
>>> admin_for_my_db.list_namespaces()
['namespace1', 'namespace2']
"""
def __init__(
self,
api_endpoint: str,
*,
token: Optional[Union[str, TokenProvider]] = None,
environment: Optional[str] = None,
api_path: Optional[str] = None,
api_version: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
spawner_database: Optional[Union[Database, AsyncDatabase]] = None,
) -> None:
# lazy import here to avoid circular dependency
from astrapy.database import Database
self.environment = (environment or Environment.OTHER).lower()
self.token_provider = coerce_token_provider(token)
self.api_endpoint = api_endpoint
#
self.caller_name = caller_name
self.caller_version = caller_version
#
self.api_path = api_path if api_path is not None else ""
self.api_version = api_version if api_version is not None else ""
#
self._commander_headers = {
DEFAULT_AUTH_HEADER: self.token_provider.get_token(),
}
self._api_commander = APICommander(
api_endpoint=self.api_endpoint,
path="/".join(comp for comp in [self.api_path, self.api_version] if comp),
headers=self._commander_headers,
callers=[(self.caller_name, self.caller_version)],
)
if spawner_database is not None:
self.spawner_database = spawner_database
else:
# leaving the namespace to its per-environment default
# (a task for the Database)
self.spawner_database = Database(
api_endpoint=self.api_endpoint,
token=self.token_provider,
namespace=None,
caller_name=self.caller_name,
caller_version=self.caller_version,
environment=self.environment,
api_path=self.api_path,
api_version=self.api_version,
)
def __repr__(self) -> str:
env_desc = f', environment="{self.environment}"'
return (
f'{self.__class__.__name__}(endpoint="{self.api_endpoint}", '
f'"{str(self.token_provider)[:12]}..."{env_desc})'
)
def __eq__(self, other: Any) -> bool:
if isinstance(other, DataAPIDatabaseAdmin):
return all(
[
self.environment == other.environment,
self._api_commander == other._api_commander,
]
)
else:
return False
def _copy(
self,
api_endpoint: Optional[str] = None,
token: Optional[Union[str, TokenProvider]] = None,
environment: Optional[str] = None,
api_path: Optional[str] = None,
api_version: Optional[str] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> DataAPIDatabaseAdmin:
return DataAPIDatabaseAdmin(
api_endpoint=api_endpoint or self.api_endpoint,
token=coerce_token_provider(token) or self.token_provider,
environment=environment or self.environment,
api_path=api_path or self.api_path,
api_version=api_version or self.api_version,
caller_name=caller_name or self.caller_name,
caller_version=caller_version or self.caller_version,
)
def with_options(
self,
*,
api_endpoint: Optional[str] = None,
token: Optional[Union[str, TokenProvider]] = None,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> DataAPIDatabaseAdmin:
"""
Create a clone of this DataAPIDatabaseAdmin with some changed attributes.
Args:
api_endpoint: the full URI to access the Data API,
e.g. "http://localhost:8181".
token: an access token with enough permission to perform admin tasks.
This can be either a literal token string or a subclass of
`astrapy.authentication.TokenProvider`.
caller_name: name of the application, or framework, on behalf of which
the admin API calls are performed. This ends up in the request user-agent.
caller_version: version of the caller.
Returns:
a new DataAPIDatabaseAdmin instance.
Example:
>>> admin_for_my_other_db = admin_for_my_db.with_options(
... api_endpoint="http://10.1.1.5:8181",
... )
"""
return self._copy(
api_endpoint=api_endpoint,
token=token,
caller_name=caller_name,
caller_version=caller_version,
)
def set_caller(
self,
caller_name: Optional[str] = None,
caller_version: Optional[str] = None,
) -> None:
"""
Set a new identity for the application/framework on behalf of which
the DevOps API calls will be performed (the "caller").
New objects spawned from this client afterwards will inherit the new settings.
Args:
caller_name: name of the application, or framework, on behalf of which
the DevOps API calls are performed. This ends up in the request user-agent.
caller_version: version of the caller.
Example:
>>> admin_for_my_db.set_caller(
... caller_name="the_caller",
... caller_version="0.1.0",
... )
"""
logger.info(f"setting caller to {caller_name}/{caller_version}")
self.caller_name = caller_name
self.caller_version = caller_version
self._api_commander = APICommander(
api_endpoint=self.api_endpoint,
path="/".join(comp for comp in [self.api_path, self.api_version] if comp),
headers=self._commander_headers,
callers=[(self.caller_name, self.caller_version)],
)
def list_namespaces(self, *, max_time_ms: Optional[int] = None) -> List[str]:
"""
Query the API for a list of the namespaces in the database.
Args:
max_time_ms: a timeout, in milliseconds, for the DevOps API request.
Returns:
A list of the namespaces, each a string, in no particular order.
Example:
>>> admin_for_my_db.list_namespaces()
['default_keyspace', 'staging_namespace']
"""
logger.info("getting list of namespaces")
fn_response = self._api_commander.request(
payload={"findNamespaces": {}},
timeout_info=base_timeout_info(max_time_ms),
)
if "namespaces" not in fn_response.get("status", {}):
raise DataAPIFaultyResponseException(
text="Faulty response from findNamespaces API command.",
raw_response=fn_response,
)
else:
logger.info("finished getting list of namespaces")
return fn_response["status"]["namespaces"] # type: ignore[no-any-return]
def create_namespace(
self,
name: str,
*,
replication_options: Optional[Dict[str, Any]] = None,
update_db_namespace: Optional[bool] = None,
max_time_ms: Optional[int] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""
Create a namespace in the database, returning {'ok': 1} if successful.
Args:
name: the namespace name. If supplying a namespace that exists
already, the method call proceeds as usual, no errors are
raised, and the whole invocation is a no-op.
replication_options: this dictionary can specify the options about
replication of the namespace (across database nodes). If provided,
it must have a structure similar to:
`{"class": "SimpleStrategy", "replication_factor": 1}`.
update_db_namespace: if True, the `Database` or `AsyncDatabase` class
that spawned this DatabaseAdmin, if any, gets updated to work on
the newly-created namespace starting when this method returns.
max_time_ms: a timeout, in milliseconds, for the whole requested
operation to complete.
Note that a timeout is no guarantee that the creation request
has not reached the API server.
Returns:
A dictionary of the form {"ok": 1} in case of success.
Otherwise, an exception is raised.
Example:
>>> admin_for_my_db.list_namespaces()
['default_keyspace']
>>> admin_for_my_db.create_namespace("that_other_one")
{'ok': 1}
>>> admin_for_my_db.list_namespaces()
['default_keyspace', 'that_other_one']
"""
options = {
k: v
for k, v in {
"replication": replication_options,
}.items()
if v
}
payload = {
"createNamespace": {
**{"name": name},
**({"options": options} if options else {}),
}
}
logger.info("creating namespace")
cn_response = self._api_commander.request(
payload=payload,
timeout_info=base_timeout_info(max_time_ms),
)
if (cn_response.get("status") or {}).get("ok") != 1:
raise DataAPIFaultyResponseException(
text="Faulty response from createNamespace API command.",
raw_response=cn_response,
)
else:
logger.info("finished creating namespace")
if update_db_namespace:
self.spawner_database.use_namespace(name)
return cn_response["status"] # type: ignore[no-any-return]
def drop_namespace(
self,
name: str,
*,
max_time_ms: Optional[int] = None,
) -> Dict[str, Any]:
"""
Drop (delete) a namespace from the database.
Args:
name: the namespace to delete. If it does not exist in this database,
an error is raised.
max_time_ms: a timeout, in milliseconds, for the whole requested
operation to complete.
Note that a timeout is no guarantee that the deletion request
has not reached the API server.
Returns:
A dictionary of the form {"ok": 1} in case of success.
Otherwise, an exception is raised.
Example:
>>> admin_for_my_db.list_namespaces()
['default_keyspace', 'that_other_one']
>>> admin_for_my_db.drop_namespace("that_other_one")
{'ok': 1}
>>> admin_for_my_db.list_namespaces()
['default_keyspace']
"""
logger.info("dropping namespace")
dn_response = self._api_commander.request(
payload={"dropNamespace": {"name": name}},
timeout_info=base_timeout_info(max_time_ms),
)
if (dn_response.get("status") or {}).get("ok") != 1:
raise DataAPIFaultyResponseException(
text="Faulty response from dropNamespace API command.",
raw_response=dn_response,
)
else:
logger.info("finished dropping namespace")
return dn_response["status"] # type: ignore[no-any-return]
async def async_list_namespaces(
self, *, max_time_ms: Optional[int] = None
) -> List[str]:
"""
Query the API for a list of the namespaces in the database.
Async version of the method, for use in an asyncio context.
Args:
max_time_ms: a timeout, in milliseconds, for the DevOps API request.
Returns:
A list of the namespaces, each a string, in no particular order.
Example:
>>> asyncio.run(admin_for_my_db.async_list_namespaces())
['default_keyspace', 'staging_namespace']
"""
logger.info("getting list of namespaces, async")
fn_response = await self._api_commander.async_request(
payload={"findNamespaces": {}},
timeout_info=base_timeout_info(max_time_ms),
)
if "namespaces" not in fn_response.get("status", {}):
raise DataAPIFaultyResponseException(
text="Faulty response from findNamespaces API command.",
raw_response=fn_response,
)
else:
logger.info("finished getting list of namespaces, async")
return fn_response["status"]["namespaces"] # type: ignore[no-any-return]
async def async_create_namespace(
self,
name: str,
*,
replication_options: Optional[Dict[str, Any]] = None,
update_db_namespace: Optional[bool] = None,
max_time_ms: Optional[int] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""
Create a namespace in the database, returning {'ok': 1} if successful.
Async version of the method, for use in an asyncio context.
Args:
name: the namespace name. If supplying a namespace that exists
already, the method call proceeds as usual, no errors are
raised, and the whole invocation is a no-op.
replication_options: this dictionary can specify the options about
replication of the namespace (across database nodes). If provided,
it must have a structure similar to:
`{"class": "SimpleStrategy", "replication_factor": 1}`.
update_db_namespace: if True, the `Database` or `AsyncDatabase` class
that spawned this DatabaseAdmin, if any, gets updated to work on
the newly-created namespace starting when this method returns.
max_time_ms: a timeout, in milliseconds, for the whole requested
operation to complete.
Note that a timeout is no guarantee that the creation request
has not reached the API server.
Returns:
A dictionary of the form {"ok": 1} in case of success.
Otherwise, an exception is raised.
Example:
>>> admin_for_my_db.list_namespaces()
['default_keyspace']
>>> asyncio.run(admin_for_my_db.async_create_namespace(
... "that_other_one"
... ))
{'ok': 1}
>>> admin_for_my_db.list_namespaces()
['default_keyspace', 'that_other_one']
"""
options = {
k: v
for k, v in {
"replication": replication_options,
}.items()
if v
}
payload = {
"createNamespace": {
**{"name": name},
**({"options": options} if options else {}),
}
}
logger.info("creating namespace, async")
cn_response = await self._api_commander.async_request(
payload=payload,
timeout_info=base_timeout_info(max_time_ms),
)
if (cn_response.get("status") or {}).get("ok") != 1:
raise DataAPIFaultyResponseException(
text="Faulty response from createNamespace API command.",
raw_response=cn_response,
)
else:
logger.info("finished creating namespace, async")
if update_db_namespace:
self.spawner_database.use_namespace(name)
return cn_response["status"] # type: ignore[no-any-return]
async def async_drop_namespace(
self,
name: str,
*,
max_time_ms: Optional[int] = None,
) -> Dict[str, Any]:
"""
Drop (delete) a namespace from the database.
Async version of the method, for use in an asyncio context.
Args:
name: the namespace to delete. If it does not exist in this database,
an error is raised.
max_time_ms: a timeout, in milliseconds, for the whole requested
operation to complete.
Note that a timeout is no guarantee that the deletion request
has not reached the API server.
Returns:
A dictionary of the form {"ok": 1} in case of success.
Otherwise, an exception is raised.
Example:
>>> admin_for_my_db.list_namespaces()
['that_other_one', 'default_keyspace']
>>> asyncio.run(admin_for_my_db.async_drop_namespace(
... "that_other_one"
... ))
{'ok': 1}
>>> admin_for_my_db.list_namespaces()
['default_keyspace']
"""
logger.info("dropping namespace, async")
dn_response = await self._api_commander.async_request(
payload={"dropNamespace": {"name": name}},
timeout_info=base_timeout_info(max_time_ms),
)
if (dn_response.get("status") or {}).get("ok") != 1:
raise DataAPIFaultyResponseException(
text="Faulty response from dropNamespace API command.",
raw_response=dn_response,
)
else:
logger.info("finished dropping namespace, async")
return dn_response["status"] # type: ignore[no-any-return]
def get_database(
self,
*,
token: Optional[Union[str, TokenProvider]] = None,
namespace: Optional[str] = None,
api_path: Optional[str] = None,
api_version: Optional[str] = None,
) -> Database:
"""
Create a Database instance out of this class for working with the data in it.
Args:
token: if supplied, is passed to the Database instead of
the one set for this object. Useful if one wants to work in
a least-privilege manner, limiting the permissions for non-admin work.
This can be either a literal token string or a subclass of
`astrapy.authentication.TokenProvider`.
namespace: an optional namespace to set in the resulting Database.
If not provided, no namespace is set, limiting what the Database
can do until setting it with e.g. a `useNamespace` method call.
api_path: path to append to the API Endpoint. In typical usage, this
should be left to its default of "".
api_version: version specifier to append to the API path. In typical
usage, this should be left to its default of "v1".
Returns:
A Database object, ready to be used for working with data and collections.
Example:
>>> my_db = admin_for_my_db.get_database()
>>> my_db.list_collection_names()
['movies', 'another_collection']
Note:
creating an instance of Database does not trigger actual creation
of the database itself, which should exist beforehand.
"""
# lazy importing here to avoid circular dependency
from astrapy import Database
return Database(
api_endpoint=self.api_endpoint,
token=coerce_token_provider(token) or self.token_provider,
namespace=namespace,
caller_name=self.caller_name,
caller_version=self.caller_version,
environment=self.environment,
api_path=api_path,
api_version=api_version,
)
def get_async_database(
self,
*,
token: Optional[Union[str, TokenProvider]] = None,
namespace: Optional[str] = None,
api_path: Optional[str] = None,
api_version: Optional[str] = None,
) -> AsyncDatabase:
"""
Create an AsyncDatabase instance for the database, to be used
when doing data-level work (such as creating/managing collections).
This method has identical behavior and signature as the sync
counterpart `get_database`: please see that one for more details.
"""
return self.get_database(
token=token,
namespace=namespace,
api_path=api_path,
api_version=api_version,
).to_async()
def find_embedding_providers(
self, *, max_time_ms: Optional[int] = None
) -> FindEmbeddingProvidersResult:
"""
Query the API for the full information on available embedding providers.
Args:
max_time_ms: a timeout, in milliseconds, for the DevOps API request.
Returns:
A `FindEmbeddingProvidersResult` object with the complete information
returned by the API about available embedding providers
Example (output abridged and indented for clarity):
>>> admin_for_my_db.find_embedding_providers()
FindEmbeddingProvidersResult(embedding_providers=..., openai, ...)
>>> admin_for_my_db.find_embedding_providers().embedding_providers
{
'openai': EmbeddingProvider(
display_name='OpenAI',
models=[
EmbeddingProviderModel(name='text-embedding-3-small'),
...
]
),
...
}
"""
logger.info("getting list of embedding providers")
fe_response = self._api_commander.request(
payload={"findEmbeddingProviders": {}},
timeout_info=base_timeout_info(max_time_ms),
)
if "embeddingProviders" not in fe_response.get("status", {}):
raise DataAPIFaultyResponseException(
text="Faulty response from findEmbeddingProviders API command.",
raw_response=fe_response,
)
else:
logger.info("finished getting list of embedding providers")
return FindEmbeddingProvidersResult.from_dict(fe_response["status"])
async def async_find_embedding_providers(
self, *, max_time_ms: Optional[int] = None
) -> FindEmbeddingProvidersResult:
"""
Query the API for the full information on available embedding providers.
Async version of the method, for use in an asyncio context.
Args:
max_time_ms: a timeout, in milliseconds, for the DevOps API request.
Returns:
A `FindEmbeddingProvidersResult` object with the complete information
returned by the API about available embedding providers
Example (output abridged and indented for clarity):
>>> admin_for_my_db.find_embedding_providers()
FindEmbeddingProvidersResult(embedding_providers=..., openai, ...)
>>> admin_for_my_db.find_embedding_providers().embedding_providers
{
'openai': EmbeddingProvider(
display_name='OpenAI',
models=[
EmbeddingProviderModel(name='text-embedding-3-small'),
...
]
),
...
}
"""
logger.info("getting list of embedding providers, async")
fe_response = await self._api_commander.async_request(
payload={"findEmbeddingProviders": {}},
timeout_info=base_timeout_info(max_time_ms),
)
if "embeddingProviders" not in fe_response.get("status", {}):
raise DataAPIFaultyResponseException(
text="Faulty response from findEmbeddingProviders API command.",
raw_response=fe_response,
)
else:
logger.info("finished getting list of embedding providers, async")
return FindEmbeddingProvidersResult.from_dict(fe_response["status"])
Functions
async def async_fetch_database_info(api_endpoint: str, token: Optional[str], namespace: Optional[str], max_time_ms: Optional[int] = None) ‑> Optional[DatabaseInfo]
-
Fetch database information through the DevOps API. Async version of the function, for use in an asyncio context.
Args
api_endpoint
- a full API endpoint for the Data Api.
token
- a valid token to access the database information.
namespace
- the desired namespace that will be used in the result. If not specified, the resulting database info will show it as None.
max_time_ms
- a timeout, in milliseconds, for waiting on a response.
Returns
A DatabaseInfo object. If the API endpoint fails to be parsed, None is returned. For valid-looking endpoints, if something goes wrong an exception is raised.
Expand source code
async def async_fetch_database_info( api_endpoint: str, token: Optional[str], namespace: Optional[str], max_time_ms: Optional[int] = None, ) -> Optional[DatabaseInfo]: """ Fetch database information through the DevOps API. Async version of the function, for use in an asyncio context. Args: api_endpoint: a full API endpoint for the Data Api. token: a valid token to access the database information. namespace: the desired namespace that will be used in the result. If not specified, the resulting database info will show it as None. max_time_ms: a timeout, in milliseconds, for waiting on a response. Returns: A DatabaseInfo object. If the API endpoint fails to be parsed, None is returned. For valid-looking endpoints, if something goes wrong an exception is raised. """ parsed_endpoint = parse_api_endpoint(api_endpoint) if parsed_endpoint: gd_response = await async_fetch_raw_database_info_from_id_token( id=parsed_endpoint.database_id, token=token, environment=parsed_endpoint.environment, max_time_ms=max_time_ms, ) raw_info = gd_response["info"] if namespace is not None and namespace not in raw_info["keyspaces"]: raise DevOpsAPIException(f"Namespace {namespace} not found on DB.") else: return DatabaseInfo( id=parsed_endpoint.database_id, region=parsed_endpoint.region, namespace=namespace, name=raw_info["name"], environment=parsed_endpoint.environment, raw_info=raw_info, ) else: return None
async def async_fetch_raw_database_info_from_id_token(id: str, *, token: Optional[str], environment: str = 'prod', max_time_ms: Optional[int] = None) ‑> Dict[str, Any]
-
Fetch database information through the DevOps API and return it in full, exactly like the API gives it back. Async version of the function, for use in an asyncio context.
Args
id
- e. g. "01234567-89ab-cdef-0123-456789abcdef".
token
- a valid token to access the database information.
max_time_ms
- a timeout, in milliseconds, for waiting on a response.
Returns
The full response from the DevOps API about the database.
Expand source code
async def async_fetch_raw_database_info_from_id_token( id: str, *, token: Optional[str], environment: str = Environment.PROD, max_time_ms: Optional[int] = None, ) -> Dict[str, Any]: """ Fetch database information through the DevOps API and return it in full, exactly like the API gives it back. Async version of the function, for use in an asyncio context. Args: id: e. g. "01234567-89ab-cdef-0123-456789abcdef". token: a valid token to access the database information. max_time_ms: a timeout, in milliseconds, for waiting on a response. Returns: The full response from the DevOps API about the database. """ astra_db_ops = AstraDBOps( token=token, dev_ops_url=DEV_OPS_URL_MAP[environment], ) try: gd_response = await astra_db_ops.async_get_database( database=id, timeout_info=base_timeout_info(max_time_ms), ) return gd_response except httpx.TimeoutException as texc: raise to_dataapi_timeout_exception(texc)
def build_api_endpoint(environment: str, database_id: str, region: str) ‑> str
-
Build the API Endpoint full strings from database parameters.
Args
environment
- a label, whose value can be Environment.PROD or another of Environment.* for which this operation makes sense.
database_id
- e. g. "01234567-89ab-cdef-0123-456789abcdef".
region
- a region ID, such as "us-west1".
Returns
the endpoint string, such as "https://01234567-…-eu-west1.apps.datastax.com"
Expand source code
def build_api_endpoint(environment: str, database_id: str, region: str) -> str: """ Build the API Endpoint full strings from database parameters. Args: environment: a label, whose value can be Environment.PROD or another of Environment.* for which this operation makes sense. database_id: e. g. "01234567-89ab-cdef-0123-456789abcdef". region: a region ID, such as "us-west1". Returns: the endpoint string, such as "https://01234567-...-eu-west1.apps.datastax.com" """ return API_ENDPOINT_TEMPLATE_MAP[environment].format( database_id=database_id, region=region, )
def fetch_database_info(api_endpoint: str, token: Optional[str], namespace: Optional[str], max_time_ms: Optional[int] = None) ‑> Optional[DatabaseInfo]
-
Fetch database information through the DevOps API.
Args
api_endpoint
- a full API endpoint for the Data Api.
token
- a valid token to access the database information.
namespace
- the desired namespace that will be used in the result. If not specified, the resulting database info will show it as None.
max_time_ms
- a timeout, in milliseconds, for waiting on a response.
Returns
A DatabaseInfo object. If the API endpoint fails to be parsed, None is returned. For valid-looking endpoints, if something goes wrong an exception is raised.
Expand source code
def fetch_database_info( api_endpoint: str, token: Optional[str], namespace: Optional[str], max_time_ms: Optional[int] = None, ) -> Optional[DatabaseInfo]: """ Fetch database information through the DevOps API. Args: api_endpoint: a full API endpoint for the Data Api. token: a valid token to access the database information. namespace: the desired namespace that will be used in the result. If not specified, the resulting database info will show it as None. max_time_ms: a timeout, in milliseconds, for waiting on a response. Returns: A DatabaseInfo object. If the API endpoint fails to be parsed, None is returned. For valid-looking endpoints, if something goes wrong an exception is raised. """ parsed_endpoint = parse_api_endpoint(api_endpoint) if parsed_endpoint: gd_response = fetch_raw_database_info_from_id_token( id=parsed_endpoint.database_id, token=token, environment=parsed_endpoint.environment, max_time_ms=max_time_ms, ) raw_info = gd_response["info"] if namespace is not None and namespace not in raw_info["keyspaces"]: raise DevOpsAPIException(f"Namespace {namespace} not found on DB.") else: return DatabaseInfo( id=parsed_endpoint.database_id, region=parsed_endpoint.region, namespace=namespace, name=raw_info["name"], environment=parsed_endpoint.environment, raw_info=raw_info, ) else: return None
def fetch_raw_database_info_from_id_token(id: str, *, token: Optional[str], environment: str = 'prod', max_time_ms: Optional[int] = None) ‑> Dict[str, Any]
-
Fetch database information through the DevOps API and return it in full, exactly like the API gives it back.
Args
id
- e. g. "01234567-89ab-cdef-0123-456789abcdef".
token
- a valid token to access the database information.
max_time_ms
- a timeout, in milliseconds, for waiting on a response.
Returns
The full response from the DevOps API about the database.
Expand source code
def fetch_raw_database_info_from_id_token( id: str, *, token: Optional[str], environment: str = Environment.PROD, max_time_ms: Optional[int] = None, ) -> Dict[str, Any]: """ Fetch database information through the DevOps API and return it in full, exactly like the API gives it back. Args: id: e. g. "01234567-89ab-cdef-0123-456789abcdef". token: a valid token to access the database information. max_time_ms: a timeout, in milliseconds, for waiting on a response. Returns: The full response from the DevOps API about the database. """ astra_db_ops = AstraDBOps( token=token, dev_ops_url=DEV_OPS_URL_MAP[environment], ) try: gd_response = astra_db_ops.get_database( database=id, timeout_info=base_timeout_info(max_time_ms), ) return gd_response except httpx.TimeoutException as texc: raise to_dataapi_timeout_exception(texc)
def normalize_api_endpoint(id_or_endpoint: str, region: Optional[str], token: TokenProvider, environment: str, max_time_ms: Optional[int] = None) ‑> str
-
Ensure that a id(+region) / endpoint init signature is normalized into an api_endpoint string.
This is an impure function: if necessary, attempt a DevOps API call to integrate the information (i.e. if a DB ID without region is passed).
This function is tasked with raising an exception if region is passed along with an API endpoint (and they do not match).
Args
id_or_endpoint
- either the Database ID or a full standard endpoint.
region
- a string with the database region.
token
- a TokenProvider for the possible DevOps request to issue.
environment
- one of the Astra DB
Environment
values. max_time_ms
- used in case the DevOps API request is necessary.
Returns
a normalized API Endpoint string (unless it raises an exception).
Expand source code
def normalize_api_endpoint( id_or_endpoint: str, region: Optional[str], token: TokenProvider, environment: str, max_time_ms: Optional[int] = None, ) -> str: """ Ensure that a id(+region) / endpoint init signature is normalized into an api_endpoint string. This is an impure function: if necessary, attempt a DevOps API call to integrate the information (i.e. if a DB ID without region is passed). This function is tasked with raising an exception if region is passed along with an API endpoint (and they do not match). Args: id_or_endpoint: either the Database ID or a full standard endpoint. region: a string with the database region. token: a TokenProvider for the possible DevOps request to issue. environment: one of the Astra DB `astrapy.constants.Environment` values. max_time_ms: used in case the DevOps API request is necessary. Returns: a normalized API Endpoint string (unless it raises an exception). """ _api_endpoint: str parsed_endpoint = parse_api_endpoint(id_or_endpoint) if parsed_endpoint is not None: if region is not None and region != parsed_endpoint.region: raise ValueError( "An explicit `region` parameter is provided, which does not match " "the supplied API endpoint. Please refrain from specifying `region`." ) _api_endpoint = id_or_endpoint else: # it's a genuine ID _region: str if region: _region = region else: logger.info(f"fetching raw database info for {id_or_endpoint}") this_db_info = fetch_raw_database_info_from_id_token( id=id_or_endpoint, token=token.get_token(), environment=environment, max_time_ms=max_time_ms, ) logger.info(f"finished fetching raw database info for {id_or_endpoint}") _region = this_db_info["info"]["region"] _api_endpoint = build_api_endpoint( environment=environment, database_id=id_or_endpoint, region=_region, ) return _api_endpoint.strip("/")
def normalize_id_endpoint_parameters(id: Optional[str], api_endpoint: Optional[str]) ‑> str
-
Expand source code
def normalize_id_endpoint_parameters( id: Optional[str], api_endpoint: Optional[str] ) -> str: if id is None: if api_endpoint is None: raise ValueError( "Exactly one of the `id` and `api_endpoint` " "synonymous parameters must be passed." ) else: return api_endpoint else: if api_endpoint is not None: raise ValueError( "The `id` and `api_endpoint` synonymous parameters " "cannot be supplied at the same time." ) else: return id
def parse_api_endpoint(api_endpoint: str) ‑> Optional[ParsedAPIEndpoint]
-
Parse an API Endpoint into a ParsedAPIEndpoint structure.
Args
api_endpoint
- a full API endpoint for the Data Api.
Returns
The parsed ParsedAPIEndpoint. If parsing fails, return None.
Expand source code
def parse_api_endpoint(api_endpoint: str) -> Optional[ParsedAPIEndpoint]: """ Parse an API Endpoint into a ParsedAPIEndpoint structure. Args: api_endpoint: a full API endpoint for the Data Api. Returns: The parsed ParsedAPIEndpoint. If parsing fails, return None. """ match = api_endpoint_parser.match(api_endpoint) if match and match.groups(): d_id, d_re, d_en_x = match.groups() return ParsedAPIEndpoint( database_id=d_id, region=d_re, environment=d_en_x if d_en_x else "prod", ) else: return None
def parse_generic_api_url(api_endpoint: str) ‑> Optional[str]
-
Validate a generic API Endpoint string, such as
http://10.1.1.1:123
orhttps://my.domain
.Args
api_endpoint
- a string supposedly expressing a valid API Endpoint
(not necessarily an Astra DB one).
Returns
a normalized (stripped) version of the endpoint if valid. If invalid, return None.
Expand source code
def parse_generic_api_url(api_endpoint: str) -> Optional[str]: """ Validate a generic API Endpoint string, such as `http://10.1.1.1:123` or `https://my.domain`. Args: api_endpoint: a string supposedly expressing a valid API Endpoint (not necessarily an Astra DB one). Returns: a normalized (stripped) version of the endpoint if valid. If invalid, return None. """ if api_endpoint and api_endpoint[-1] == "/": _api_endpoint = api_endpoint[:-1] else: _api_endpoint = api_endpoint match = generic_api_url_matcher.match(_api_endpoint) if match: return match[0].rstrip("/") else: return None
Classes
class AstraDBAdmin (token: Optional[Union[str, TokenProvider]] = None, *, environment: Optional[str] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, dev_ops_url: Optional[str] = None, dev_ops_api_version: Optional[str] = None)
-
An "admin" object, able to perform administrative tasks at the databases level, such as creating, listing or dropping databases.
Args
token
- an access token with enough permission to perform admin tasks.
This can be either a literal token string or a subclass of
TokenProvider
. environment
- a label, whose value is one of Environment.PROD (default), Environment.DEV or Environment.TEST.
caller_name
- name of the application, or framework, on behalf of which the DevOps API calls are performed. This ends up in the request user-agent.
caller_version
- version of the caller.
dev_ops_url
- in case of custom deployments, this can be used to specify the URL to the DevOps API, such as "https://api.astra.datastax.com". Generally it can be omitted. The environment (prod/dev/…) is determined from the API Endpoint.
dev_ops_api_version
- this can specify a custom version of the DevOps API (such as "v2"). Generally not needed.
Example
>>> from astrapy import DataAPIClient >>> my_client = DataAPIClient("AstraCS:...") >>> my_astra_db_admin = my_client.get_admin() >>> database_list = my_astra_db_admin.list_databases() >>> len(database_list) 3 >>> database_list[2].id '01234567-...' >>> my_db_admin = my_astra_db_admin.get_database_admin("01234567-...") >>> my_db_admin.list_namespaces() ['default_keyspace', 'staging_namespace']
Expand source code
class AstraDBAdmin: """ An "admin" object, able to perform administrative tasks at the databases level, such as creating, listing or dropping databases. Args: token: an access token with enough permission to perform admin tasks. This can be either a literal token string or a subclass of `astrapy.authentication.TokenProvider`. environment: a label, whose value is one of Environment.PROD (default), Environment.DEV or Environment.TEST. caller_name: name of the application, or framework, on behalf of which the DevOps API calls are performed. This ends up in the request user-agent. caller_version: version of the caller. dev_ops_url: in case of custom deployments, this can be used to specify the URL to the DevOps API, such as "https://api.astra.datastax.com". Generally it can be omitted. The environment (prod/dev/...) is determined from the API Endpoint. dev_ops_api_version: this can specify a custom version of the DevOps API (such as "v2"). Generally not needed. Example: >>> from astrapy import DataAPIClient >>> my_client = DataAPIClient("AstraCS:...") >>> my_astra_db_admin = my_client.get_admin() >>> database_list = my_astra_db_admin.list_databases() >>> len(database_list) 3 >>> database_list[2].id '01234567-...' >>> my_db_admin = my_astra_db_admin.get_database_admin("01234567-...") >>> my_db_admin.list_namespaces() ['default_keyspace', 'staging_namespace'] """ def __init__( self, token: Optional[Union[str, TokenProvider]] = None, *, environment: Optional[str] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, dev_ops_url: Optional[str] = None, dev_ops_api_version: Optional[str] = None, ) -> None: self.token_provider = coerce_token_provider(token) self.environment = (environment or Environment.PROD).lower() if dev_ops_url is None: self.dev_ops_url = DEV_OPS_URL_MAP[self.environment] else: self.dev_ops_url = dev_ops_url self._caller_name = caller_name self._caller_version = caller_version self._dev_ops_url = dev_ops_url self._dev_ops_api_version = dev_ops_api_version self._astra_db_ops = AstraDBOps( token=self.token_provider.get_token(), dev_ops_url=self.dev_ops_url, dev_ops_api_version=dev_ops_api_version, caller_name=caller_name, caller_version=caller_version, ) def __repr__(self) -> str: env_desc: str if self.environment == Environment.PROD: env_desc = "" else: env_desc = f', environment="{self.environment}"' return ( f'{self.__class__.__name__}("{str(self.token_provider)[:12]}..."{env_desc})' ) def __eq__(self, other: Any) -> bool: if isinstance(other, AstraDBAdmin): return all( [ self.token_provider == other.token_provider, self.environment == other.environment, self.dev_ops_url == other.dev_ops_url, self.dev_ops_url == other.dev_ops_url, self._caller_name == other._caller_name, self._caller_version == other._caller_version, self._dev_ops_url == other._dev_ops_url, self._dev_ops_api_version == other._dev_ops_api_version, self._astra_db_ops == other._astra_db_ops, ] ) else: return False def _copy( self, *, token: Optional[Union[str, TokenProvider]] = None, environment: Optional[str] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, dev_ops_url: Optional[str] = None, dev_ops_api_version: Optional[str] = None, ) -> AstraDBAdmin: return AstraDBAdmin( token=coerce_token_provider(token) or self.token_provider, environment=environment or self.environment, caller_name=caller_name or self._caller_name, caller_version=caller_version or self._caller_version, dev_ops_url=dev_ops_url or self._dev_ops_url, dev_ops_api_version=dev_ops_api_version or self._dev_ops_api_version, ) def with_options( self, *, token: Optional[Union[str, TokenProvider]] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, ) -> AstraDBAdmin: """ Create a clone of this AstraDBAdmin with some changed attributes. Args: token: an Access Token to the database. Example: `"AstraCS:xyz..."`. This can be either a literal token string or a subclass of `astrapy.authentication.TokenProvider`. caller_name: name of the application, or framework, on behalf of which the Data API and DevOps API calls are performed. This ends up in the request user-agent. caller_version: version of the caller. Returns: a new AstraDBAdmin instance. Example: >>> another_astra_db_admin = my_astra_db_admin.with_options( ... caller_name="caller_identity", ... caller_version="1.2.0", ... ) """ return self._copy( token=token, caller_name=caller_name, caller_version=caller_version, ) def set_caller( self, caller_name: Optional[str] = None, caller_version: Optional[str] = None, ) -> None: """ Set a new identity for the application/framework on behalf of which the DevOps API calls will be performed (the "caller"). New objects spawned from this client afterwards will inherit the new settings. Args: caller_name: name of the application, or framework, on behalf of which the DevOps API calls are performed. This ends up in the request user-agent. caller_version: version of the caller. Example: >>> my_astra_db_admin.set_caller( ... caller_name="the_caller", ... caller_version="0.1.0", ... ) """ logger.info(f"setting caller to {caller_name}/{caller_version}") self._caller_name = caller_name self._caller_version = caller_version self._astra_db_ops.set_caller(caller_name, caller_version) @ops_recast_method_sync def list_databases( self, *, max_time_ms: Optional[int] = None, ) -> CommandCursor[AdminDatabaseInfo]: """ Get the list of databases, as obtained with a request to the DevOps API. Args: max_time_ms: a timeout, in milliseconds, for the API request. Returns: A CommandCursor to iterate over the detected databases, represented as AdminDatabaseInfo objects. Example: >>> database_cursor = my_astra_db_admin.list_databases() >>> database_list = list(database_cursor) >>> len(database_list) 3 >>> database_list[2].id '01234567-...' >>> database_list[2].status 'ACTIVE' >>> database_list[2].info.region 'eu-west-1' """ logger.info("getting databases") gd_list_response = self._astra_db_ops.get_databases( timeout_info=base_timeout_info(max_time_ms) ) logger.info("finished getting databases") if not isinstance(gd_list_response, list): raise DevOpsAPIException( "Faulty response from get-databases DevOps API command.", ) else: # we know this is a list of dicts which need a little adjusting return CommandCursor( address=self._astra_db_ops.base_url, items=[ _recast_as_admin_database_info( db_dict, environment=self.environment, ) for db_dict in gd_list_response ], ) @ops_recast_method_async async def async_list_databases( self, *, max_time_ms: Optional[int] = None, ) -> CommandCursor[AdminDatabaseInfo]: """ Get the list of databases, as obtained with a request to the DevOps API. Async version of the method, for use in an asyncio context. Args: max_time_ms: a timeout, in milliseconds, for the API request. Returns: A CommandCursor to iterate over the detected databases, represented as AdminDatabaseInfo objects. Note that the return type is not an awaitable, rather a regular iterable, e.g. for use in ordinary "for" loops. Example: >>> async def check_if_db_exists(db_id: str) -> bool: ... db_cursor = await my_astra_db_admin.async_list_databases() ... db_list = list(dd_cursor) ... return db_id in db_list ... >>> asyncio.run(check_if_db_exists("xyz")) True >>> asyncio.run(check_if_db_exists("01234567-...")) False """ logger.info("getting databases, async") gd_list_response = await self._astra_db_ops.async_get_databases( timeout_info=base_timeout_info(max_time_ms) ) logger.info("finished getting databases, async") if not isinstance(gd_list_response, list): raise DevOpsAPIException( "Faulty response from get-databases DevOps API command.", ) else: # we know this is a list of dicts which need a little adjusting return CommandCursor( address=self._astra_db_ops.base_url, items=[ _recast_as_admin_database_info( db_dict, environment=self.environment, ) for db_dict in gd_list_response ], ) @ops_recast_method_sync def database_info( self, id: str, *, max_time_ms: Optional[int] = None ) -> AdminDatabaseInfo: """ Get the full information on a given database, through a request to the DevOps API. Args: id: the ID of the target database, e. g. "01234567-89ab-cdef-0123-456789abcdef". max_time_ms: a timeout, in milliseconds, for the API request. Returns: An AdminDatabaseInfo object. Example: >>> details_of_my_db = my_astra_db_admin.database_info("01234567-...") >>> details_of_my_db.id '01234567-...' >>> details_of_my_db.status 'ACTIVE' >>> details_of_my_db.info.region 'eu-west-1' """ logger.info(f"getting database info for '{id}'") gd_response = self._astra_db_ops.get_database( database=id, timeout_info=base_timeout_info(max_time_ms), ) logger.info(f"finished getting database info for '{id}'") if not isinstance(gd_response, dict): raise DevOpsAPIException( "Faulty response from get-database DevOps API command.", ) else: return _recast_as_admin_database_info( gd_response, environment=self.environment, ) @ops_recast_method_async async def async_database_info( self, id: str, *, max_time_ms: Optional[int] = None ) -> AdminDatabaseInfo: """ Get the full information on a given database, through a request to the DevOps API. This is an awaitable method suitable for use within an asyncio event loop. Args: id: the ID of the target database, e. g. "01234567-89ab-cdef-0123-456789abcdef". max_time_ms: a timeout, in milliseconds, for the API request. Returns: An AdminDatabaseInfo object. Example: >>> async def check_if_db_active(db_id: str) -> bool: ... db_info = await my_astra_db_admin.async_database_info(db_id) ... return db_info.status == "ACTIVE" ... >>> asyncio.run(check_if_db_active("01234567-...")) True """ logger.info(f"getting database info for '{id}', async") gd_response = await self._astra_db_ops.async_get_database( database=id, timeout_info=base_timeout_info(max_time_ms), ) logger.info(f"finished getting database info for '{id}', async") if not isinstance(gd_response, dict): raise DevOpsAPIException( "Faulty response from get-database DevOps API command.", ) else: return _recast_as_admin_database_info( gd_response, environment=self.environment, ) @ops_recast_method_sync def create_database( self, name: str, *, cloud_provider: str, region: str, namespace: Optional[str] = None, wait_until_active: bool = True, max_time_ms: Optional[int] = None, ) -> AstraDBDatabaseAdmin: """ Create a database as requested, optionally waiting for it to be ready. Args: name: the desired name for the database. cloud_provider: one of 'aws', 'gcp' or 'azure'. region: any of the available cloud regions. namespace: name for the one namespace the database starts with. If omitted, DevOps API will use its default. wait_until_active: if True (default), the method returns only after the newly-created database is in ACTIVE state (a few minutes, usually). If False, it will return right after issuing the creation request to the DevOps API, and it will be responsibility of the caller to check the database status before working with it. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request has not reached the API server. Returns: An AstraDBDatabaseAdmin instance. Example: >>> my_new_db_admin = my_astra_db_admin.create_database( ... "new_database", ... cloud_provider="aws", ... region="ap-south-1", ... ) >>> my_new_db = my_new_db_admin.get_database() >>> my_coll = my_new_db.create_collection("movies", dimension=2) >>> my_coll.insert_one({"title": "The Title", "$vector": [0.1, 0.2]}) """ database_definition = { k: v for k, v in { "name": name, "tier": "serverless", "cloudProvider": cloud_provider, "region": region, "capacityUnits": 1, "dbType": "vector", "keyspace": namespace, }.items() if v is not None } timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) logger.info(f"creating database {name}/({cloud_provider}, {region})") cd_response = self._astra_db_ops.create_database( database_definition=database_definition, timeout_info=base_timeout_info(max_time_ms), ) logger.info( "devops api returned from creating database " f"{name}/({cloud_provider}, {region})" ) if cd_response is not None and "id" in cd_response: new_database_id = cd_response["id"] if wait_until_active: last_status_seen = STATUS_PENDING while last_status_seen in {STATUS_PENDING, STATUS_INITIALIZING}: logger.info(f"sleeping to poll for status of '{new_database_id}'") time.sleep(DATABASE_POLL_SLEEP_TIME) last_db_info = self.database_info( id=new_database_id, max_time_ms=timeout_manager.remaining_timeout_ms(), ) last_status_seen = last_db_info.status if last_status_seen != STATUS_ACTIVE: raise DevOpsAPIException( f"Database {name} entered unexpected status {last_status_seen} after PENDING" ) # return the database instance logger.info( f"finished creating database '{new_database_id}' = " f"{name}/({cloud_provider}, {region})" ) return AstraDBDatabaseAdmin.from_astra_db_admin( id=new_database_id, region=region, astra_db_admin=self, ) else: raise DevOpsAPIException("Could not create the database.") @ops_recast_method_async async def async_create_database( self, name: str, *, cloud_provider: str, region: str, namespace: Optional[str] = None, wait_until_active: bool = True, max_time_ms: Optional[int] = None, ) -> AstraDBDatabaseAdmin: """ Create a database as requested, optionally waiting for it to be ready. This is an awaitable method suitable for use within an asyncio event loop. Args: name: the desired name for the database. cloud_provider: one of 'aws', 'gcp' or 'azure'. region: any of the available cloud regions. namespace: name for the one namespace the database starts with. If omitted, DevOps API will use its default. wait_until_active: if True (default), the method returns only after the newly-created database is in ACTIVE state (a few minutes, usually). If False, it will return right after issuing the creation request to the DevOps API, and it will be responsibility of the caller to check the database status before working with it. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request has not reached the API server. Returns: An AstraDBDatabaseAdmin instance. Example: >>> asyncio.run( ... my_astra_db_admin.async_create_database( ... "new_database", ... cloud_provider="aws", ... region="ap-south-1", .... ) ... ) AstraDBDatabaseAdmin(id=...) """ database_definition = { k: v for k, v in { "name": name, "tier": "serverless", "cloudProvider": cloud_provider, "region": region, "capacityUnits": 1, "dbType": "vector", "keyspace": namespace, }.items() if v is not None } timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) logger.info(f"creating database {name}/({cloud_provider}, {region}), async") cd_response = await self._astra_db_ops.async_create_database( database_definition=database_definition, timeout_info=base_timeout_info(max_time_ms), ) logger.info( "devops api returned from creating database " f"{name}/({cloud_provider}, {region}), async" ) if cd_response is not None and "id" in cd_response: new_database_id = cd_response["id"] if wait_until_active: last_status_seen = STATUS_PENDING while last_status_seen in {STATUS_PENDING, STATUS_INITIALIZING}: logger.info( f"sleeping to poll for status of '{new_database_id}', async" ) await asyncio.sleep(DATABASE_POLL_SLEEP_TIME) last_db_info = await self.async_database_info( id=new_database_id, max_time_ms=timeout_manager.remaining_timeout_ms(), ) last_status_seen = last_db_info.status if last_status_seen != STATUS_ACTIVE: raise DevOpsAPIException( f"Database {name} entered unexpected status {last_status_seen} after PENDING" ) # return the database instance logger.info( f"finished creating database '{new_database_id}' = " f"{name}/({cloud_provider}, {region}), async" ) return AstraDBDatabaseAdmin.from_astra_db_admin( id=new_database_id, region=region, astra_db_admin=self, ) else: raise DevOpsAPIException("Could not create the database.") @ops_recast_method_sync def drop_database( self, id: str, *, wait_until_active: bool = True, max_time_ms: Optional[int] = None, ) -> Dict[str, Any]: """ Drop a database, i.e. delete it completely and permanently with all its data. Args: id: The ID of the database to drop, e. g. "01234567-89ab-cdef-0123-456789abcdef". wait_until_active: if True (default), the method returns only after the database has actually been deleted (generally a few minutes). If False, it will return right after issuing the drop request to the DevOps API, and it will be responsibility of the caller to check the database status/availability after that, if desired. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> database_list_pre = my_astra_db_admin.list_databases() >>> len(database_list_pre) 3 >>> my_astra_db_admin.drop_database("01234567-...") {'ok': 1} >>> database_list_post = my_astra_db_admin.list_databases() >>> len(database_list_post) 2 """ timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) logger.info(f"dropping database '{id}'") te_response = self._astra_db_ops.terminate_database( database=id, timeout_info=base_timeout_info(max_time_ms), ) logger.info(f"devops api returned from dropping database '{id}'") if te_response == id: if wait_until_active: last_status_seen: Optional[str] = STATUS_TERMINATING _db_name: Optional[str] = None while last_status_seen == STATUS_TERMINATING: logger.info(f"sleeping to poll for status of '{id}'") time.sleep(DATABASE_POLL_SLEEP_TIME) # detected_databases = [ a_db_info for a_db_info in self.list_databases( max_time_ms=timeout_manager.remaining_timeout_ms(), ) if a_db_info.id == id ] if detected_databases: last_status_seen = detected_databases[0].status _db_name = detected_databases[0].info.name else: last_status_seen = None if last_status_seen is not None: _name_desc = f" ({_db_name})" if _db_name else "" raise DevOpsAPIException( f"Database {id}{_name_desc} entered unexpected status {last_status_seen} after PENDING" ) logger.info(f"finished dropping database '{id}'") return {"ok": 1} else: raise DevOpsAPIException( f"Could not issue a successful terminate-database DevOps API request for {id}." ) @ops_recast_method_async async def async_drop_database( self, id: str, *, wait_until_active: bool = True, max_time_ms: Optional[int] = None, ) -> Dict[str, Any]: """ Drop a database, i.e. delete it completely and permanently with all its data. Async version of the method, for use in an asyncio context. Args: id: The ID of the database to drop, e. g. "01234567-89ab-cdef-0123-456789abcdef". wait_until_active: if True (default), the method returns only after the database has actually been deleted (generally a few minutes). If False, it will return right after issuing the drop request to the DevOps API, and it will be responsibility of the caller to check the database status/availability after that, if desired. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> asyncio.run( ... my_astra_db_admin.async_drop_database("01234567-...") ... ) {'ok': 1} """ timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) logger.info(f"dropping database '{id}', async") te_response = await self._astra_db_ops.async_terminate_database( database=id, timeout_info=base_timeout_info(max_time_ms), ) logger.info(f"devops api returned from dropping database '{id}', async") if te_response == id: if wait_until_active: last_status_seen: Optional[str] = STATUS_TERMINATING _db_name: Optional[str] = None while last_status_seen == STATUS_TERMINATING: logger.info(f"sleeping to poll for status of '{id}', async") await asyncio.sleep(DATABASE_POLL_SLEEP_TIME) # detected_databases = [ a_db_info for a_db_info in await self.async_list_databases( max_time_ms=timeout_manager.remaining_timeout_ms(), ) if a_db_info.id == id ] if detected_databases: last_status_seen = detected_databases[0].status _db_name = detected_databases[0].info.name else: last_status_seen = None if last_status_seen is not None: _name_desc = f" ({_db_name})" if _db_name else "" raise DevOpsAPIException( f"Database {id}{_name_desc} entered unexpected status {last_status_seen} after PENDING" ) logger.info(f"finished dropping database '{id}', async") return {"ok": 1} else: raise DevOpsAPIException( f"Could not issue a successful terminate-database DevOps API request for {id}." ) def get_database_admin( self, id: Optional[str] = None, *, api_endpoint: Optional[str] = None, region: Optional[str] = None, max_time_ms: Optional[int] = None, ) -> AstraDBDatabaseAdmin: """ Create an AstraDBDatabaseAdmin object for admin work within a certain database. Args: id: the target database ID (e.g. `01234567-89ab-cdef-0123-456789abcdef`) or the corresponding API Endpoint (e.g. `https://<ID>-<REGION>.apps.astra.datastax.com`). api_endpoint: a named alias for the `id` first (positional) parameter, with the same meaning. It cannot be passed together with `id`. region: the region to use for connecting to the database. The database must be located in that region. The region cannot be specified when the API endoint is used as `id`. Note that if this parameter is not passed, and cannot be inferred from the API endpoint, an additional DevOps API request is made to determine the default region and use it subsequently. max_time_ms: a timeout, in milliseconds, for the DevOps API HTTP request should it be necessary (see the `region` argument). Returns: An AstraDBDatabaseAdmin instance representing the requested database. Example: >>> my_db_admin = my_astra_db_admin.get_database_admin("01234567-...") >>> my_db_admin.list_namespaces() ['default_keyspace'] >>> my_db_admin.create_namespace("that_other_one") {'ok': 1} >>> my_db_admin.list_namespaces() ['default_keyspace', 'that_other_one'] Note: This method does not perform any admin-level operation through the DevOps API. For actual creation of a database, see the `create_database` method. """ _id_or_endpoint = normalize_id_endpoint_parameters(id, api_endpoint) return AstraDBDatabaseAdmin.from_astra_db_admin( id=_id_or_endpoint, region=region, astra_db_admin=self, max_time_ms=max_time_ms, ) def get_database( self, id: Optional[str] = None, *, api_endpoint: Optional[str] = None, token: Optional[Union[str, TokenProvider]] = None, namespace: Optional[str] = None, region: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, max_time_ms: Optional[int] = None, ) -> Database: """ Create a Database instance for a specific database, to be used when doing data-level work (such as creating/managing collections). Args: id: the target database ID (e.g. `01234567-89ab-cdef-0123-456789abcdef`) or the corresponding API Endpoint (e.g. `https://<ID>-<REGION>.apps.astra.datastax.com`). api_endpoint: a named alias for the `id` first (positional) parameter, with the same meaning. It cannot be passed together with `id`. token: if supplied, is passed to the Database instead of the one set for this object. This can be either a literal token string or a subclass of `astrapy.authentication.TokenProvider`. namespace: used to specify a certain namespace the resulting Database will primarily work on. If not specified, similar as for `region`, an additional DevOps API call reveals the default namespace for the target database. region: the region to use for connecting to the database. The database must be located in that region. The region cannot be specified when the API endoint is used as `id`. Note that if this parameter is not passed, and cannot be inferred from the API endpoint, an additional DevOps API request is made to determine the default region and use it subsequently. api_path: path to append to the API Endpoint. In typical usage, this should be left to its default of "/api/json". api_version: version specifier to append to the API path. In typical usage, this should be left to its default of "v1". max_time_ms: a timeout, in milliseconds, for the DevOps API HTTP request should it be necessary (see the `region` argument). Returns: A Database object ready to be used. Example: >>> my_db = my_astra_db_admin.get_database( ... "01234567-...", ... region="us-east1", ... ) >>> coll = my_db.create_collection("movies", dimension=2) >>> my_coll.insert_one({"title": "The Title", "$vector": [0.3, 0.4]}) Note: This method does not perform any admin-level operation through the DevOps API. For actual creation of a database, see the `create_database` method of class AstraDBAdmin. """ # lazy importing here to avoid circular dependency from astrapy import Database _id_or_endpoint = normalize_id_endpoint_parameters(id, api_endpoint) _token = coerce_token_provider(token) or self.token_provider normalized_api_endpoint = normalize_api_endpoint( id_or_endpoint=_id_or_endpoint, region=region, token=_token, environment=self.environment, max_time_ms=max_time_ms, ) _namespace: str if namespace: _namespace = namespace else: parsed_api_endpoint = parse_api_endpoint(normalized_api_endpoint) if parsed_api_endpoint is None: raise ValueError( f"Cannot parse the API endpoint ({normalized_api_endpoint})." ) this_db_info = self.database_info( parsed_api_endpoint.database_id, max_time_ms=max_time_ms, ) _namespace = this_db_info.info.namespace return Database( api_endpoint=normalized_api_endpoint, token=_token, namespace=_namespace, caller_name=self._caller_name, caller_version=self._caller_version, environment=self.environment, api_path=api_path, api_version=api_version, ) def get_async_database( self, id: Optional[str] = None, *, api_endpoint: Optional[str] = None, token: Optional[Union[str, TokenProvider]] = None, namespace: Optional[str] = None, region: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, ) -> AsyncDatabase: """ Create an AsyncDatabase instance for a specific database, to be used when doing data-level work (such as creating/managing collections). This method has identical behavior and signature as the sync counterpart `get_database`: please see that one for more details. """ return self.get_database( id=id, api_endpoint=api_endpoint, token=token, namespace=namespace, region=region, api_path=api_path, api_version=api_version, ).to_async()
Methods
async def async_create_database(self, name: str, *, cloud_provider: str, region: str, namespace: Optional[str] = None, wait_until_active: bool = True, max_time_ms: Optional[int] = None) ‑> AstraDBDatabaseAdmin
-
Create a database as requested, optionally waiting for it to be ready. This is an awaitable method suitable for use within an asyncio event loop.
Args
name
- the desired name for the database.
cloud_provider
- one of 'aws', 'gcp' or 'azure'.
region
- any of the available cloud regions.
namespace
- name for the one namespace the database starts with. If omitted, DevOps API will use its default.
wait_until_active
- if True (default), the method returns only after the newly-created database is in ACTIVE state (a few minutes, usually). If False, it will return right after issuing the creation request to the DevOps API, and it will be responsibility of the caller to check the database status before working with it.
max_time_ms
- a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request has not reached the API server.
Returns
An AstraDBDatabaseAdmin instance.
Example
>>> asyncio.run( ... my_astra_db_admin.async_create_database( ... "new_database", ... cloud_provider="aws", ... region="ap-south-1", .... ) ... ) AstraDBDatabaseAdmin(id=...)
Expand source code
@ops_recast_method_async async def async_create_database( self, name: str, *, cloud_provider: str, region: str, namespace: Optional[str] = None, wait_until_active: bool = True, max_time_ms: Optional[int] = None, ) -> AstraDBDatabaseAdmin: """ Create a database as requested, optionally waiting for it to be ready. This is an awaitable method suitable for use within an asyncio event loop. Args: name: the desired name for the database. cloud_provider: one of 'aws', 'gcp' or 'azure'. region: any of the available cloud regions. namespace: name for the one namespace the database starts with. If omitted, DevOps API will use its default. wait_until_active: if True (default), the method returns only after the newly-created database is in ACTIVE state (a few minutes, usually). If False, it will return right after issuing the creation request to the DevOps API, and it will be responsibility of the caller to check the database status before working with it. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request has not reached the API server. Returns: An AstraDBDatabaseAdmin instance. Example: >>> asyncio.run( ... my_astra_db_admin.async_create_database( ... "new_database", ... cloud_provider="aws", ... region="ap-south-1", .... ) ... ) AstraDBDatabaseAdmin(id=...) """ database_definition = { k: v for k, v in { "name": name, "tier": "serverless", "cloudProvider": cloud_provider, "region": region, "capacityUnits": 1, "dbType": "vector", "keyspace": namespace, }.items() if v is not None } timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) logger.info(f"creating database {name}/({cloud_provider}, {region}), async") cd_response = await self._astra_db_ops.async_create_database( database_definition=database_definition, timeout_info=base_timeout_info(max_time_ms), ) logger.info( "devops api returned from creating database " f"{name}/({cloud_provider}, {region}), async" ) if cd_response is not None and "id" in cd_response: new_database_id = cd_response["id"] if wait_until_active: last_status_seen = STATUS_PENDING while last_status_seen in {STATUS_PENDING, STATUS_INITIALIZING}: logger.info( f"sleeping to poll for status of '{new_database_id}', async" ) await asyncio.sleep(DATABASE_POLL_SLEEP_TIME) last_db_info = await self.async_database_info( id=new_database_id, max_time_ms=timeout_manager.remaining_timeout_ms(), ) last_status_seen = last_db_info.status if last_status_seen != STATUS_ACTIVE: raise DevOpsAPIException( f"Database {name} entered unexpected status {last_status_seen} after PENDING" ) # return the database instance logger.info( f"finished creating database '{new_database_id}' = " f"{name}/({cloud_provider}, {region}), async" ) return AstraDBDatabaseAdmin.from_astra_db_admin( id=new_database_id, region=region, astra_db_admin=self, ) else: raise DevOpsAPIException("Could not create the database.")
async def async_database_info(self, id: str, *, max_time_ms: Optional[int] = None) ‑> AdminDatabaseInfo
-
Get the full information on a given database, through a request to the DevOps API. This is an awaitable method suitable for use within an asyncio event loop.
Args
id
- the ID of the target database, e. g. "01234567-89ab-cdef-0123-456789abcdef".
max_time_ms
- a timeout, in milliseconds, for the API request.
Returns
An AdminDatabaseInfo object.
Example
>>> async def check_if_db_active(db_id: str) -> bool: ... db_info = await my_astra_db_admin.async_database_info(db_id) ... return db_info.status == "ACTIVE" ... >>> asyncio.run(check_if_db_active("01234567-...")) True
Expand source code
@ops_recast_method_async async def async_database_info( self, id: str, *, max_time_ms: Optional[int] = None ) -> AdminDatabaseInfo: """ Get the full information on a given database, through a request to the DevOps API. This is an awaitable method suitable for use within an asyncio event loop. Args: id: the ID of the target database, e. g. "01234567-89ab-cdef-0123-456789abcdef". max_time_ms: a timeout, in milliseconds, for the API request. Returns: An AdminDatabaseInfo object. Example: >>> async def check_if_db_active(db_id: str) -> bool: ... db_info = await my_astra_db_admin.async_database_info(db_id) ... return db_info.status == "ACTIVE" ... >>> asyncio.run(check_if_db_active("01234567-...")) True """ logger.info(f"getting database info for '{id}', async") gd_response = await self._astra_db_ops.async_get_database( database=id, timeout_info=base_timeout_info(max_time_ms), ) logger.info(f"finished getting database info for '{id}', async") if not isinstance(gd_response, dict): raise DevOpsAPIException( "Faulty response from get-database DevOps API command.", ) else: return _recast_as_admin_database_info( gd_response, environment=self.environment, )
async def async_drop_database(self, id: str, *, wait_until_active: bool = True, max_time_ms: Optional[int] = None) ‑> Dict[str, Any]
-
Drop a database, i.e. delete it completely and permanently with all its data. Async version of the method, for use in an asyncio context.
Args
id
- The ID of the database to drop, e. g. "01234567-89ab-cdef-0123-456789abcdef".
wait_until_active
- if True (default), the method returns only after the database has actually been deleted (generally a few minutes). If False, it will return right after issuing the drop request to the DevOps API, and it will be responsibility of the caller to check the database status/availability after that, if desired.
max_time_ms
- a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server.
Returns
A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised.
Example
>>> asyncio.run( ... my_astra_db_admin.async_drop_database("01234567-...") ... ) {'ok': 1}
Expand source code
@ops_recast_method_async async def async_drop_database( self, id: str, *, wait_until_active: bool = True, max_time_ms: Optional[int] = None, ) -> Dict[str, Any]: """ Drop a database, i.e. delete it completely and permanently with all its data. Async version of the method, for use in an asyncio context. Args: id: The ID of the database to drop, e. g. "01234567-89ab-cdef-0123-456789abcdef". wait_until_active: if True (default), the method returns only after the database has actually been deleted (generally a few minutes). If False, it will return right after issuing the drop request to the DevOps API, and it will be responsibility of the caller to check the database status/availability after that, if desired. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> asyncio.run( ... my_astra_db_admin.async_drop_database("01234567-...") ... ) {'ok': 1} """ timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) logger.info(f"dropping database '{id}', async") te_response = await self._astra_db_ops.async_terminate_database( database=id, timeout_info=base_timeout_info(max_time_ms), ) logger.info(f"devops api returned from dropping database '{id}', async") if te_response == id: if wait_until_active: last_status_seen: Optional[str] = STATUS_TERMINATING _db_name: Optional[str] = None while last_status_seen == STATUS_TERMINATING: logger.info(f"sleeping to poll for status of '{id}', async") await asyncio.sleep(DATABASE_POLL_SLEEP_TIME) # detected_databases = [ a_db_info for a_db_info in await self.async_list_databases( max_time_ms=timeout_manager.remaining_timeout_ms(), ) if a_db_info.id == id ] if detected_databases: last_status_seen = detected_databases[0].status _db_name = detected_databases[0].info.name else: last_status_seen = None if last_status_seen is not None: _name_desc = f" ({_db_name})" if _db_name else "" raise DevOpsAPIException( f"Database {id}{_name_desc} entered unexpected status {last_status_seen} after PENDING" ) logger.info(f"finished dropping database '{id}', async") return {"ok": 1} else: raise DevOpsAPIException( f"Could not issue a successful terminate-database DevOps API request for {id}." )
async def async_list_databases(self, *, max_time_ms: Optional[int] = None) ‑> CommandCursor[AdminDatabaseInfo]
-
Get the list of databases, as obtained with a request to the DevOps API. Async version of the method, for use in an asyncio context.
Args
max_time_ms
- a timeout, in milliseconds, for the API request.
Returns
A CommandCursor to iterate over the detected databases, represented as AdminDatabaseInfo objects. Note that the return type is not an awaitable, rather a regular iterable, e.g. for use in ordinary "for" loops.
Example
>>> async def check_if_db_exists(db_id: str) -> bool: ... db_cursor = await my_astra_db_admin.async_list_databases() ... db_list = list(dd_cursor) ... return db_id in db_list ... >>> asyncio.run(check_if_db_exists("xyz")) True >>> asyncio.run(check_if_db_exists("01234567-...")) False
Expand source code
@ops_recast_method_async async def async_list_databases( self, *, max_time_ms: Optional[int] = None, ) -> CommandCursor[AdminDatabaseInfo]: """ Get the list of databases, as obtained with a request to the DevOps API. Async version of the method, for use in an asyncio context. Args: max_time_ms: a timeout, in milliseconds, for the API request. Returns: A CommandCursor to iterate over the detected databases, represented as AdminDatabaseInfo objects. Note that the return type is not an awaitable, rather a regular iterable, e.g. for use in ordinary "for" loops. Example: >>> async def check_if_db_exists(db_id: str) -> bool: ... db_cursor = await my_astra_db_admin.async_list_databases() ... db_list = list(dd_cursor) ... return db_id in db_list ... >>> asyncio.run(check_if_db_exists("xyz")) True >>> asyncio.run(check_if_db_exists("01234567-...")) False """ logger.info("getting databases, async") gd_list_response = await self._astra_db_ops.async_get_databases( timeout_info=base_timeout_info(max_time_ms) ) logger.info("finished getting databases, async") if not isinstance(gd_list_response, list): raise DevOpsAPIException( "Faulty response from get-databases DevOps API command.", ) else: # we know this is a list of dicts which need a little adjusting return CommandCursor( address=self._astra_db_ops.base_url, items=[ _recast_as_admin_database_info( db_dict, environment=self.environment, ) for db_dict in gd_list_response ], )
def create_database(self, name: str, *, cloud_provider: str, region: str, namespace: Optional[str] = None, wait_until_active: bool = True, max_time_ms: Optional[int] = None) ‑> AstraDBDatabaseAdmin
-
Create a database as requested, optionally waiting for it to be ready.
Args
name
- the desired name for the database.
cloud_provider
- one of 'aws', 'gcp' or 'azure'.
region
- any of the available cloud regions.
namespace
- name for the one namespace the database starts with. If omitted, DevOps API will use its default.
wait_until_active
- if True (default), the method returns only after the newly-created database is in ACTIVE state (a few minutes, usually). If False, it will return right after issuing the creation request to the DevOps API, and it will be responsibility of the caller to check the database status before working with it.
max_time_ms
- a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request has not reached the API server.
Returns
An AstraDBDatabaseAdmin instance.
Example
>>> my_new_db_admin = my_astra_db_admin.create_database( ... "new_database", ... cloud_provider="aws", ... region="ap-south-1", ... ) >>> my_new_db = my_new_db_admin.get_database() >>> my_coll = my_new_db.create_collection("movies", dimension=2) >>> my_coll.insert_one({"title": "The Title", "$vector": [0.1, 0.2]})
Expand source code
@ops_recast_method_sync def create_database( self, name: str, *, cloud_provider: str, region: str, namespace: Optional[str] = None, wait_until_active: bool = True, max_time_ms: Optional[int] = None, ) -> AstraDBDatabaseAdmin: """ Create a database as requested, optionally waiting for it to be ready. Args: name: the desired name for the database. cloud_provider: one of 'aws', 'gcp' or 'azure'. region: any of the available cloud regions. namespace: name for the one namespace the database starts with. If omitted, DevOps API will use its default. wait_until_active: if True (default), the method returns only after the newly-created database is in ACTIVE state (a few minutes, usually). If False, it will return right after issuing the creation request to the DevOps API, and it will be responsibility of the caller to check the database status before working with it. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request has not reached the API server. Returns: An AstraDBDatabaseAdmin instance. Example: >>> my_new_db_admin = my_astra_db_admin.create_database( ... "new_database", ... cloud_provider="aws", ... region="ap-south-1", ... ) >>> my_new_db = my_new_db_admin.get_database() >>> my_coll = my_new_db.create_collection("movies", dimension=2) >>> my_coll.insert_one({"title": "The Title", "$vector": [0.1, 0.2]}) """ database_definition = { k: v for k, v in { "name": name, "tier": "serverless", "cloudProvider": cloud_provider, "region": region, "capacityUnits": 1, "dbType": "vector", "keyspace": namespace, }.items() if v is not None } timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) logger.info(f"creating database {name}/({cloud_provider}, {region})") cd_response = self._astra_db_ops.create_database( database_definition=database_definition, timeout_info=base_timeout_info(max_time_ms), ) logger.info( "devops api returned from creating database " f"{name}/({cloud_provider}, {region})" ) if cd_response is not None and "id" in cd_response: new_database_id = cd_response["id"] if wait_until_active: last_status_seen = STATUS_PENDING while last_status_seen in {STATUS_PENDING, STATUS_INITIALIZING}: logger.info(f"sleeping to poll for status of '{new_database_id}'") time.sleep(DATABASE_POLL_SLEEP_TIME) last_db_info = self.database_info( id=new_database_id, max_time_ms=timeout_manager.remaining_timeout_ms(), ) last_status_seen = last_db_info.status if last_status_seen != STATUS_ACTIVE: raise DevOpsAPIException( f"Database {name} entered unexpected status {last_status_seen} after PENDING" ) # return the database instance logger.info( f"finished creating database '{new_database_id}' = " f"{name}/({cloud_provider}, {region})" ) return AstraDBDatabaseAdmin.from_astra_db_admin( id=new_database_id, region=region, astra_db_admin=self, ) else: raise DevOpsAPIException("Could not create the database.")
def database_info(self, id: str, *, max_time_ms: Optional[int] = None) ‑> AdminDatabaseInfo
-
Get the full information on a given database, through a request to the DevOps API.
Args
id
- the ID of the target database, e. g. "01234567-89ab-cdef-0123-456789abcdef".
max_time_ms
- a timeout, in milliseconds, for the API request.
Returns
An AdminDatabaseInfo object.
Example
>>> details_of_my_db = my_astra_db_admin.database_info("01234567-...") >>> details_of_my_db.id '01234567-...' >>> details_of_my_db.status 'ACTIVE' >>> details_of_my_db.info.region 'eu-west-1'
Expand source code
@ops_recast_method_sync def database_info( self, id: str, *, max_time_ms: Optional[int] = None ) -> AdminDatabaseInfo: """ Get the full information on a given database, through a request to the DevOps API. Args: id: the ID of the target database, e. g. "01234567-89ab-cdef-0123-456789abcdef". max_time_ms: a timeout, in milliseconds, for the API request. Returns: An AdminDatabaseInfo object. Example: >>> details_of_my_db = my_astra_db_admin.database_info("01234567-...") >>> details_of_my_db.id '01234567-...' >>> details_of_my_db.status 'ACTIVE' >>> details_of_my_db.info.region 'eu-west-1' """ logger.info(f"getting database info for '{id}'") gd_response = self._astra_db_ops.get_database( database=id, timeout_info=base_timeout_info(max_time_ms), ) logger.info(f"finished getting database info for '{id}'") if not isinstance(gd_response, dict): raise DevOpsAPIException( "Faulty response from get-database DevOps API command.", ) else: return _recast_as_admin_database_info( gd_response, environment=self.environment, )
def drop_database(self, id: str, *, wait_until_active: bool = True, max_time_ms: Optional[int] = None) ‑> Dict[str, Any]
-
Drop a database, i.e. delete it completely and permanently with all its data.
Args
id
- The ID of the database to drop, e. g. "01234567-89ab-cdef-0123-456789abcdef".
wait_until_active
- if True (default), the method returns only after the database has actually been deleted (generally a few minutes). If False, it will return right after issuing the drop request to the DevOps API, and it will be responsibility of the caller to check the database status/availability after that, if desired.
max_time_ms
- a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server.
Returns
A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised.
Example
>>> database_list_pre = my_astra_db_admin.list_databases() >>> len(database_list_pre) 3 >>> my_astra_db_admin.drop_database("01234567-...") {'ok': 1} >>> database_list_post = my_astra_db_admin.list_databases() >>> len(database_list_post) 2
Expand source code
@ops_recast_method_sync def drop_database( self, id: str, *, wait_until_active: bool = True, max_time_ms: Optional[int] = None, ) -> Dict[str, Any]: """ Drop a database, i.e. delete it completely and permanently with all its data. Args: id: The ID of the database to drop, e. g. "01234567-89ab-cdef-0123-456789abcdef". wait_until_active: if True (default), the method returns only after the database has actually been deleted (generally a few minutes). If False, it will return right after issuing the drop request to the DevOps API, and it will be responsibility of the caller to check the database status/availability after that, if desired. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> database_list_pre = my_astra_db_admin.list_databases() >>> len(database_list_pre) 3 >>> my_astra_db_admin.drop_database("01234567-...") {'ok': 1} >>> database_list_post = my_astra_db_admin.list_databases() >>> len(database_list_post) 2 """ timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) logger.info(f"dropping database '{id}'") te_response = self._astra_db_ops.terminate_database( database=id, timeout_info=base_timeout_info(max_time_ms), ) logger.info(f"devops api returned from dropping database '{id}'") if te_response == id: if wait_until_active: last_status_seen: Optional[str] = STATUS_TERMINATING _db_name: Optional[str] = None while last_status_seen == STATUS_TERMINATING: logger.info(f"sleeping to poll for status of '{id}'") time.sleep(DATABASE_POLL_SLEEP_TIME) # detected_databases = [ a_db_info for a_db_info in self.list_databases( max_time_ms=timeout_manager.remaining_timeout_ms(), ) if a_db_info.id == id ] if detected_databases: last_status_seen = detected_databases[0].status _db_name = detected_databases[0].info.name else: last_status_seen = None if last_status_seen is not None: _name_desc = f" ({_db_name})" if _db_name else "" raise DevOpsAPIException( f"Database {id}{_name_desc} entered unexpected status {last_status_seen} after PENDING" ) logger.info(f"finished dropping database '{id}'") return {"ok": 1} else: raise DevOpsAPIException( f"Could not issue a successful terminate-database DevOps API request for {id}." )
def get_async_database(self, id: Optional[str] = None, *, api_endpoint: Optional[str] = None, token: Optional[Union[str, TokenProvider]] = None, namespace: Optional[str] = None, region: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None) ‑> AsyncDatabase
-
Create an AsyncDatabase instance for a specific database, to be used when doing data-level work (such as creating/managing collections).
This method has identical behavior and signature as the sync counterpart
get_database
: please see that one for more details.Expand source code
def get_async_database( self, id: Optional[str] = None, *, api_endpoint: Optional[str] = None, token: Optional[Union[str, TokenProvider]] = None, namespace: Optional[str] = None, region: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, ) -> AsyncDatabase: """ Create an AsyncDatabase instance for a specific database, to be used when doing data-level work (such as creating/managing collections). This method has identical behavior and signature as the sync counterpart `get_database`: please see that one for more details. """ return self.get_database( id=id, api_endpoint=api_endpoint, token=token, namespace=namespace, region=region, api_path=api_path, api_version=api_version, ).to_async()
def get_database(self, id: Optional[str] = None, *, api_endpoint: Optional[str] = None, token: Optional[Union[str, TokenProvider]] = None, namespace: Optional[str] = None, region: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, max_time_ms: Optional[int] = None) ‑> Database
-
Create a Database instance for a specific database, to be used when doing data-level work (such as creating/managing collections).
Args
id
- the target database ID (e.g.
01234567-89ab-cdef-0123-456789abcdef
) or the corresponding API Endpoint (e.g.https://<ID>-<REGION>.apps.astra.datastax.com
). api_endpoint
- a named alias for the
id
first (positional) parameter, with the same meaning. It cannot be passed together withid
. token
- if supplied, is passed to the Database instead of
the one set for this object.
This can be either a literal token string or a subclass of
TokenProvider
. namespace
- used to specify a certain namespace the resulting
Database will primarily work on. If not specified, similar
as for
region
, an additional DevOps API call reveals the default namespace for the target database. region
- the region to use for connecting to the database. The
database must be located in that region.
The region cannot be specified when the API endoint is used as
id
. Note that if this parameter is not passed, and cannot be inferred from the API endpoint, an additional DevOps API request is made to determine the default region and use it subsequently. api_path
- path to append to the API Endpoint. In typical usage, this should be left to its default of "/api/json".
api_version
- version specifier to append to the API path. In typical usage, this should be left to its default of "v1".
max_time_ms
- a timeout, in milliseconds, for the DevOps API
HTTP request should it be necessary (see the
region
argument).
Returns
A Database object ready to be used.
Example
>>> my_db = my_astra_db_admin.get_database( ... "01234567-...", ... region="us-east1", ... ) >>> coll = my_db.create_collection("movies", dimension=2) >>> my_coll.insert_one({"title": "The Title", "$vector": [0.3, 0.4]})
Note
This method does not perform any admin-level operation through the DevOps API. For actual creation of a database, see the
create_database
method of class AstraDBAdmin.Expand source code
def get_database( self, id: Optional[str] = None, *, api_endpoint: Optional[str] = None, token: Optional[Union[str, TokenProvider]] = None, namespace: Optional[str] = None, region: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, max_time_ms: Optional[int] = None, ) -> Database: """ Create a Database instance for a specific database, to be used when doing data-level work (such as creating/managing collections). Args: id: the target database ID (e.g. `01234567-89ab-cdef-0123-456789abcdef`) or the corresponding API Endpoint (e.g. `https://<ID>-<REGION>.apps.astra.datastax.com`). api_endpoint: a named alias for the `id` first (positional) parameter, with the same meaning. It cannot be passed together with `id`. token: if supplied, is passed to the Database instead of the one set for this object. This can be either a literal token string or a subclass of `astrapy.authentication.TokenProvider`. namespace: used to specify a certain namespace the resulting Database will primarily work on. If not specified, similar as for `region`, an additional DevOps API call reveals the default namespace for the target database. region: the region to use for connecting to the database. The database must be located in that region. The region cannot be specified when the API endoint is used as `id`. Note that if this parameter is not passed, and cannot be inferred from the API endpoint, an additional DevOps API request is made to determine the default region and use it subsequently. api_path: path to append to the API Endpoint. In typical usage, this should be left to its default of "/api/json". api_version: version specifier to append to the API path. In typical usage, this should be left to its default of "v1". max_time_ms: a timeout, in milliseconds, for the DevOps API HTTP request should it be necessary (see the `region` argument). Returns: A Database object ready to be used. Example: >>> my_db = my_astra_db_admin.get_database( ... "01234567-...", ... region="us-east1", ... ) >>> coll = my_db.create_collection("movies", dimension=2) >>> my_coll.insert_one({"title": "The Title", "$vector": [0.3, 0.4]}) Note: This method does not perform any admin-level operation through the DevOps API. For actual creation of a database, see the `create_database` method of class AstraDBAdmin. """ # lazy importing here to avoid circular dependency from astrapy import Database _id_or_endpoint = normalize_id_endpoint_parameters(id, api_endpoint) _token = coerce_token_provider(token) or self.token_provider normalized_api_endpoint = normalize_api_endpoint( id_or_endpoint=_id_or_endpoint, region=region, token=_token, environment=self.environment, max_time_ms=max_time_ms, ) _namespace: str if namespace: _namespace = namespace else: parsed_api_endpoint = parse_api_endpoint(normalized_api_endpoint) if parsed_api_endpoint is None: raise ValueError( f"Cannot parse the API endpoint ({normalized_api_endpoint})." ) this_db_info = self.database_info( parsed_api_endpoint.database_id, max_time_ms=max_time_ms, ) _namespace = this_db_info.info.namespace return Database( api_endpoint=normalized_api_endpoint, token=_token, namespace=_namespace, caller_name=self._caller_name, caller_version=self._caller_version, environment=self.environment, api_path=api_path, api_version=api_version, )
def get_database_admin(self, id: Optional[str] = None, *, api_endpoint: Optional[str] = None, region: Optional[str] = None, max_time_ms: Optional[int] = None) ‑> AstraDBDatabaseAdmin
-
Create an AstraDBDatabaseAdmin object for admin work within a certain database.
Args
id
- the target database ID (e.g.
01234567-89ab-cdef-0123-456789abcdef
) or the corresponding API Endpoint (e.g.https://<ID>-<REGION>.apps.astra.datastax.com
). api_endpoint
- a named alias for the
id
first (positional) parameter, with the same meaning. It cannot be passed together withid
. region
- the region to use for connecting to the database. The
database must be located in that region.
The region cannot be specified when the API endoint is used as
id
. Note that if this parameter is not passed, and cannot be inferred from the API endpoint, an additional DevOps API request is made to determine the default region and use it subsequently. max_time_ms
- a timeout, in milliseconds, for the DevOps API
HTTP request should it be necessary (see the
region
argument).
Returns
An AstraDBDatabaseAdmin instance representing the requested database.
Example
>>> my_db_admin = my_astra_db_admin.get_database_admin("01234567-...") >>> my_db_admin.list_namespaces() ['default_keyspace'] >>> my_db_admin.create_namespace("that_other_one") {'ok': 1} >>> my_db_admin.list_namespaces() ['default_keyspace', 'that_other_one']
Note
This method does not perform any admin-level operation through the DevOps API. For actual creation of a database, see the
create_database
method.Expand source code
def get_database_admin( self, id: Optional[str] = None, *, api_endpoint: Optional[str] = None, region: Optional[str] = None, max_time_ms: Optional[int] = None, ) -> AstraDBDatabaseAdmin: """ Create an AstraDBDatabaseAdmin object for admin work within a certain database. Args: id: the target database ID (e.g. `01234567-89ab-cdef-0123-456789abcdef`) or the corresponding API Endpoint (e.g. `https://<ID>-<REGION>.apps.astra.datastax.com`). api_endpoint: a named alias for the `id` first (positional) parameter, with the same meaning. It cannot be passed together with `id`. region: the region to use for connecting to the database. The database must be located in that region. The region cannot be specified when the API endoint is used as `id`. Note that if this parameter is not passed, and cannot be inferred from the API endpoint, an additional DevOps API request is made to determine the default region and use it subsequently. max_time_ms: a timeout, in milliseconds, for the DevOps API HTTP request should it be necessary (see the `region` argument). Returns: An AstraDBDatabaseAdmin instance representing the requested database. Example: >>> my_db_admin = my_astra_db_admin.get_database_admin("01234567-...") >>> my_db_admin.list_namespaces() ['default_keyspace'] >>> my_db_admin.create_namespace("that_other_one") {'ok': 1} >>> my_db_admin.list_namespaces() ['default_keyspace', 'that_other_one'] Note: This method does not perform any admin-level operation through the DevOps API. For actual creation of a database, see the `create_database` method. """ _id_or_endpoint = normalize_id_endpoint_parameters(id, api_endpoint) return AstraDBDatabaseAdmin.from_astra_db_admin( id=_id_or_endpoint, region=region, astra_db_admin=self, max_time_ms=max_time_ms, )
def list_databases(self, *, max_time_ms: Optional[int] = None) ‑> CommandCursor[AdminDatabaseInfo]
-
Get the list of databases, as obtained with a request to the DevOps API.
Args
max_time_ms
- a timeout, in milliseconds, for the API request.
Returns
A CommandCursor to iterate over the detected databases, represented as AdminDatabaseInfo objects.
Example
>>> database_cursor = my_astra_db_admin.list_databases() >>> database_list = list(database_cursor) >>> len(database_list) 3 >>> database_list[2].id '01234567-...' >>> database_list[2].status 'ACTIVE' >>> database_list[2].info.region 'eu-west-1'
Expand source code
@ops_recast_method_sync def list_databases( self, *, max_time_ms: Optional[int] = None, ) -> CommandCursor[AdminDatabaseInfo]: """ Get the list of databases, as obtained with a request to the DevOps API. Args: max_time_ms: a timeout, in milliseconds, for the API request. Returns: A CommandCursor to iterate over the detected databases, represented as AdminDatabaseInfo objects. Example: >>> database_cursor = my_astra_db_admin.list_databases() >>> database_list = list(database_cursor) >>> len(database_list) 3 >>> database_list[2].id '01234567-...' >>> database_list[2].status 'ACTIVE' >>> database_list[2].info.region 'eu-west-1' """ logger.info("getting databases") gd_list_response = self._astra_db_ops.get_databases( timeout_info=base_timeout_info(max_time_ms) ) logger.info("finished getting databases") if not isinstance(gd_list_response, list): raise DevOpsAPIException( "Faulty response from get-databases DevOps API command.", ) else: # we know this is a list of dicts which need a little adjusting return CommandCursor( address=self._astra_db_ops.base_url, items=[ _recast_as_admin_database_info( db_dict, environment=self.environment, ) for db_dict in gd_list_response ], )
def set_caller(self, caller_name: Optional[str] = None, caller_version: Optional[str] = None) ‑> None
-
Set a new identity for the application/framework on behalf of which the DevOps API calls will be performed (the "caller").
New objects spawned from this client afterwards will inherit the new settings.
Args
caller_name
- name of the application, or framework, on behalf of which the DevOps API calls are performed. This ends up in the request user-agent.
caller_version
- version of the caller.
Example
>>> my_astra_db_admin.set_caller( ... caller_name="the_caller", ... caller_version="0.1.0", ... )
Expand source code
def set_caller( self, caller_name: Optional[str] = None, caller_version: Optional[str] = None, ) -> None: """ Set a new identity for the application/framework on behalf of which the DevOps API calls will be performed (the "caller"). New objects spawned from this client afterwards will inherit the new settings. Args: caller_name: name of the application, or framework, on behalf of which the DevOps API calls are performed. This ends up in the request user-agent. caller_version: version of the caller. Example: >>> my_astra_db_admin.set_caller( ... caller_name="the_caller", ... caller_version="0.1.0", ... ) """ logger.info(f"setting caller to {caller_name}/{caller_version}") self._caller_name = caller_name self._caller_version = caller_version self._astra_db_ops.set_caller(caller_name, caller_version)
def with_options(self, *, token: Optional[Union[str, TokenProvider]] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None) ‑> AstraDBAdmin
-
Create a clone of this AstraDBAdmin with some changed attributes.
Args
token
- an Access Token to the database. Example:
"AstraCS:xyz..."
. This can be either a literal token string or a subclass ofTokenProvider
. caller_name
- name of the application, or framework, on behalf of which the Data API and DevOps API calls are performed. This ends up in the request user-agent.
caller_version
- version of the caller.
Returns
a new AstraDBAdmin instance.
Example
>>> another_astra_db_admin = my_astra_db_admin.with_options( ... caller_name="caller_identity", ... caller_version="1.2.0", ... )
Expand source code
def with_options( self, *, token: Optional[Union[str, TokenProvider]] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, ) -> AstraDBAdmin: """ Create a clone of this AstraDBAdmin with some changed attributes. Args: token: an Access Token to the database. Example: `"AstraCS:xyz..."`. This can be either a literal token string or a subclass of `astrapy.authentication.TokenProvider`. caller_name: name of the application, or framework, on behalf of which the Data API and DevOps API calls are performed. This ends up in the request user-agent. caller_version: version of the caller. Returns: a new AstraDBAdmin instance. Example: >>> another_astra_db_admin = my_astra_db_admin.with_options( ... caller_name="caller_identity", ... caller_version="1.2.0", ... ) """ return self._copy( token=token, caller_name=caller_name, caller_version=caller_version, )
class AstraDBDatabaseAdmin (id: Optional[str] = None, *, api_endpoint: Optional[str] = None, token: Optional[Union[str, TokenProvider]] = None, region: Optional[str] = None, environment: Optional[str] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, dev_ops_url: Optional[str] = None, dev_ops_api_version: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, spawner_database: Optional[Union[Database, AsyncDatabase]] = None, max_time_ms: Optional[int] = None)
-
An "admin" object, able to perform administrative tasks at the namespaces level (i.e. within a certain database), such as creating/listing/dropping namespaces.
This is one layer below the AstraDBAdmin concept, in that it is tied to a single database and enables admin work within it. As such, it is generally created by a method call on an AstraDBAdmin.
Args
id
- the target database ID (e.g.
01234567-89ab-cdef-0123-456789abcdef
) or the corresponding API Endpoint (e.g.https://<ID>-<REGION>.apps.astra.datastax.com
). api_endpoint
- a named alias for the
id
first (positional) parameter, with the same meaning. It cannot be passed together withid
. token
- an access token with enough permission to perform admin tasks.
This can be either a literal token string or a subclass of
TokenProvider
. region
- the region to use for connecting to the database. The
database must be located in that region.
The region cannot be specified when the API endoint is used as
id
. Note that if this parameter is not passed, and cannot be inferred from the API endpoint, an additional DevOps API request is made to determine the default region and use it subsequently. environment
- a label, whose value is one of Environment.PROD (default), Environment.DEV or Environment.TEST.
caller_name
- name of the application, or framework, on behalf of which the DevOps API calls are performed. This ends up in the request user-agent.
caller_version
- version of the caller.
dev_ops_url
- in case of custom deployments, this can be used to specify the URL to the DevOps API, such as "https://api.astra.datastax.com". Generally it can be omitted. The environment (prod/dev/…) is determined from the API Endpoint.
dev_ops_api_version
- this can specify a custom version of the DevOps API (such as "v2"). Generally not needed.
api_path
- path to append to the API Endpoint. In typical usage, this
class is created by a method such as
Database.get_database_admin()
, which passes the matching value. Generally to be left to its Astra DB default of "/api/json". api_version
- version specifier to append to the API path. In typical
usage, this class is created by a method such as
Database.get_database_admin()
, which passes the matching value. Generally to be left to its Astra DB default of "/v1". spawner_database
- either a Database or an AsyncDatabase instance. This represents the database class which spawns this admin object, so that, if required, a namespace creation can retroactively "use" the new namespace in the spawner. Used to enable the Async/Database.get_admin_database().create_namespace() pattern.
max_time_ms
- a timeout, in milliseconds, for the DevOps API
HTTP request should it be necessary (see the
region
argument).
Example
>>> from astrapy import DataAPIClient >>> my_client = DataAPIClient("AstraCS:...") >>> admin_for_my_db = my_client.get_admin().get_database_admin("01234567-...") >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'staging_namespace'] >>> admin_for_my_db.info().status 'ACTIVE'
Note
creating an instance of AstraDBDatabaseAdmin does not trigger actual creation of the database itself, which should exist beforehand. To create databases, see the AstraDBAdmin class.
Expand source code
class AstraDBDatabaseAdmin(DatabaseAdmin): """ An "admin" object, able to perform administrative tasks at the namespaces level (i.e. within a certain database), such as creating/listing/dropping namespaces. This is one layer below the AstraDBAdmin concept, in that it is tied to a single database and enables admin work within it. As such, it is generally created by a method call on an AstraDBAdmin. Args: id: the target database ID (e.g. `01234567-89ab-cdef-0123-456789abcdef`) or the corresponding API Endpoint (e.g. `https://<ID>-<REGION>.apps.astra.datastax.com`). api_endpoint: a named alias for the `id` first (positional) parameter, with the same meaning. It cannot be passed together with `id`. token: an access token with enough permission to perform admin tasks. This can be either a literal token string or a subclass of `astrapy.authentication.TokenProvider`. region: the region to use for connecting to the database. The database must be located in that region. The region cannot be specified when the API endoint is used as `id`. Note that if this parameter is not passed, and cannot be inferred from the API endpoint, an additional DevOps API request is made to determine the default region and use it subsequently. environment: a label, whose value is one of Environment.PROD (default), Environment.DEV or Environment.TEST. caller_name: name of the application, or framework, on behalf of which the DevOps API calls are performed. This ends up in the request user-agent. caller_version: version of the caller. dev_ops_url: in case of custom deployments, this can be used to specify the URL to the DevOps API, such as "https://api.astra.datastax.com". Generally it can be omitted. The environment (prod/dev/...) is determined from the API Endpoint. dev_ops_api_version: this can specify a custom version of the DevOps API (such as "v2"). Generally not needed. api_path: path to append to the API Endpoint. In typical usage, this class is created by a method such as `Database.get_database_admin()`, which passes the matching value. Generally to be left to its Astra DB default of "/api/json". api_version: version specifier to append to the API path. In typical usage, this class is created by a method such as `Database.get_database_admin()`, which passes the matching value. Generally to be left to its Astra DB default of "/v1". spawner_database: either a Database or an AsyncDatabase instance. This represents the database class which spawns this admin object, so that, if required, a namespace creation can retroactively "use" the new namespace in the spawner. Used to enable the Async/Database.get_admin_database().create_namespace() pattern. max_time_ms: a timeout, in milliseconds, for the DevOps API HTTP request should it be necessary (see the `region` argument). Example: >>> from astrapy import DataAPIClient >>> my_client = DataAPIClient("AstraCS:...") >>> admin_for_my_db = my_client.get_admin().get_database_admin("01234567-...") >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'staging_namespace'] >>> admin_for_my_db.info().status 'ACTIVE' Note: creating an instance of AstraDBDatabaseAdmin does not trigger actual creation of the database itself, which should exist beforehand. To create databases, see the AstraDBAdmin class. """ def __init__( self, id: Optional[str] = None, *, api_endpoint: Optional[str] = None, token: Optional[Union[str, TokenProvider]] = None, region: Optional[str] = None, environment: Optional[str] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, dev_ops_url: Optional[str] = None, dev_ops_api_version: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, spawner_database: Optional[Union[Database, AsyncDatabase]] = None, max_time_ms: Optional[int] = None, ) -> None: # lazy import here to avoid circular dependency from astrapy.database import Database self.token_provider = coerce_token_provider(token) self.environment = (environment or Environment.PROD).lower() _id_or_endpoint = normalize_id_endpoint_parameters(id, api_endpoint) normalized_api_endpoint = normalize_api_endpoint( id_or_endpoint=_id_or_endpoint, region=region, token=self.token_provider, environment=self.environment, max_time_ms=max_time_ms, ) self.api_endpoint = normalized_api_endpoint parsed_api_endpoint = parse_api_endpoint(self.api_endpoint) if parsed_api_endpoint is None: raise ValueError( f"Cannot parse the provided API endpoint ({self.api_endpoint})." ) self._database_id = parsed_api_endpoint.database_id self._region = parsed_api_endpoint.region if parsed_api_endpoint.environment != self.environment: raise ValueError( "Environment mismatch between client and provided " "API endpoint. You can try adding " f'`environment="{parsed_api_endpoint.environment}"` ' "to the class constructor." ) # self.caller_name = caller_name self.caller_version = caller_version self._astra_db_admin = AstraDBAdmin( token=self.token_provider, environment=self.environment, caller_name=self.caller_name, caller_version=self.caller_version, dev_ops_url=dev_ops_url, dev_ops_api_version=dev_ops_api_version, ) # API Commander (for the vectorizeOps invocations) self.api_path = ( api_path if api_path is not None else API_PATH_ENV_MAP[self.environment] ) self.api_version = ( api_version if api_version is not None else API_VERSION_ENV_MAP[self.environment] ) self._commander_headers = { DEFAULT_AUTH_HEADER: self.token_provider.get_token(), } self._api_commander = APICommander( api_endpoint=self.api_endpoint, path="/".join(comp for comp in [self.api_path, self.api_version] if comp), headers=self._commander_headers, callers=[(self.caller_name, self.caller_version)], ) if spawner_database is not None: self.spawner_database = spawner_database else: # leaving the namespace to its per-environment default # (a task for the Database) self.spawner_database = Database( api_endpoint=self.api_endpoint, token=self.token_provider, namespace=None, caller_name=self.caller_name, caller_version=self.caller_version, environment=self.environment, api_path=self.api_path, api_version=self.api_version, ) def __repr__(self) -> str: env_desc: str if self.environment == Environment.PROD: env_desc = "" else: env_desc = f', environment="{self.environment}"' return ( f'{self.__class__.__name__}(api_endpoint="{self.api_endpoint}", ' f'"{str(self.token_provider)[:12]}..."{env_desc})' ) def __eq__(self, other: Any) -> bool: if isinstance(other, AstraDBDatabaseAdmin): return all( [ self.api_endpoint == other.api_endpoint, self.token_provider == other.token_provider, self.environment == other.environment, self._astra_db_admin == other._astra_db_admin, ] ) else: return False def _copy( self, id: Optional[str] = None, token: Optional[Union[str, TokenProvider]] = None, region: Optional[str] = None, environment: Optional[str] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, dev_ops_url: Optional[str] = None, dev_ops_api_version: Optional[str] = None, ) -> AstraDBDatabaseAdmin: return AstraDBDatabaseAdmin( id=id or self._database_id, token=coerce_token_provider(token) or self.token_provider, region=region or self._region, environment=environment or self.environment, caller_name=caller_name or self._astra_db_admin._caller_name, caller_version=caller_version or self._astra_db_admin._caller_version, dev_ops_url=dev_ops_url or self._astra_db_admin._dev_ops_url, dev_ops_api_version=dev_ops_api_version or self._astra_db_admin._dev_ops_api_version, ) def with_options( self, *, id: Optional[str] = None, token: Optional[Union[str, TokenProvider]] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, ) -> AstraDBDatabaseAdmin: """ Create a clone of this AstraDBDatabaseAdmin with some changed attributes. Args: id: e. g. "01234567-89ab-cdef-0123-456789abcdef". token: an Access Token to the database. Example: `"AstraCS:xyz..."`. This can be either a literal token string or a subclass of `astrapy.authentication.TokenProvider`. caller_name: name of the application, or framework, on behalf of which the Data API and DevOps API calls are performed. This ends up in the request user-agent. caller_version: version of the caller. Returns: a new AstraDBDatabaseAdmin instance. Example: >>> admin_for_my_other_db = admin_for_my_db.with_options( ... id="abababab-0101-2323-4545-6789abcdef01", ... ) """ return self._copy( id=id, token=token, caller_name=caller_name, caller_version=caller_version, ) def set_caller( self, caller_name: Optional[str] = None, caller_version: Optional[str] = None, ) -> None: """ Set a new identity for the application/framework on behalf of which the DevOps API calls will be performed (the "caller"). New objects spawned from this client afterwards will inherit the new settings. Args: caller_name: name of the application, or framework, on behalf of which the DevOps API calls are performed. This ends up in the request user-agent. caller_version: version of the caller. Example: >>> admin_for_my_db.set_caller( ... caller_name="the_caller", ... caller_version="0.1.0", ... ) """ logger.info(f"setting caller to {caller_name}/{caller_version}") self._astra_db_admin.set_caller(caller_name, caller_version) @property def id(self) -> str: """ The ID of this database admin. Example: >>> my_db_admin.id '01234567-89ab-cdef-0123-456789abcdef' """ return self._database_id @property def region(self) -> str: """ The region for this database admin. Example: >>> my_db_admin.region 'us-east-1' """ return self._region @staticmethod def from_astra_db_admin( id: str, *, region: Optional[str], astra_db_admin: AstraDBAdmin, max_time_ms: Optional[int] = None, ) -> AstraDBDatabaseAdmin: """ Create an AstraDBDatabaseAdmin from an AstraDBAdmin and a database ID. Args: id: the target database ID (e.g. `01234567-89ab-cdef-0123-456789abcdef`) or the corresponding API Endpoint (e.g. `https://<ID>-<REGION>.apps.astra.datastax.com`). region: the region to use for connecting to the database. The database must be located in that region. The region cannot be specified when the API endoint is used as `id`. Note that if this parameter is not passed, and cannot be inferred from the API endpoint, an additional DevOps API request is made to determine the default region and use it subsequently. astra_db_admin: an AstraDBAdmin object that has visibility over the target database. max_time_ms: a timeout, in milliseconds, for the DevOps API HTTP request should it be necessary (see the `region` argument). Returns: An AstraDBDatabaseAdmin object, for admin work within the database. Example: >>> from astrapy import DataAPIClient, AstraDBDatabaseAdmin >>> admin_for_my_db = AstraDBDatabaseAdmin.from_astra_db_admin( ... id="01234567-...", ... astra_db_admin=DataAPIClient("AstraCS:...").get_admin(), ... ) >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'staging_namespace'] >>> admin_for_my_db.info().status 'ACTIVE' Note: Creating an instance of AstraDBDatabaseAdmin does not trigger actual creation of the database itself, which should exist beforehand. To create databases, see the AstraDBAdmin class. """ return AstraDBDatabaseAdmin( id=id, token=astra_db_admin.token_provider, region=region, environment=astra_db_admin.environment, caller_name=astra_db_admin._caller_name, caller_version=astra_db_admin._caller_version, dev_ops_url=astra_db_admin._dev_ops_url, dev_ops_api_version=astra_db_admin._dev_ops_api_version, max_time_ms=max_time_ms, ) @staticmethod def from_api_endpoint( api_endpoint: str, *, token: Optional[Union[str, TokenProvider]] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, dev_ops_url: Optional[str] = None, dev_ops_api_version: Optional[str] = None, ) -> AstraDBDatabaseAdmin: """ Create an AstraDBDatabaseAdmin from an API Endpoint and optionally a token. Args: api_endpoint: a full API endpoint for the Data Api. token: an access token with enough permissions to do admin work. This can be either a literal token string or a subclass of `astrapy.authentication.TokenProvider`. caller_name: name of the application, or framework, on behalf of which the DevOps API calls are performed. This ends up in the request user-agent. caller_version: version of the caller. dev_ops_url: in case of custom deployments, this can be used to specify the URL to the DevOps API, such as "https://api.astra.datastax.com". Generally it can be omitted. The environment (prod/dev/...) is determined from the API Endpoint. dev_ops_api_version: this can specify a custom version of the DevOps API (such as "v2"). Generally not needed. Returns: An AstraDBDatabaseAdmin object, for admin work within the database. Example: >>> from astrapy import AstraDBDatabaseAdmin >>> admin_for_my_db = AstraDBDatabaseAdmin.from_api_endpoint( ... api_endpoint="https://01234567-....apps.astra.datastax.com", ... token="AstraCS:...", ... ) >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'another_namespace'] >>> admin_for_my_db.info().status 'ACTIVE' Note: Creating an instance of AstraDBDatabaseAdmin does not trigger actual creation of the database itself, which should exist beforehand. To create databases, see the AstraDBAdmin class. """ parsed_api_endpoint = parse_api_endpoint(api_endpoint) if parsed_api_endpoint: return AstraDBDatabaseAdmin( id=parsed_api_endpoint.database_id, token=token, region=parsed_api_endpoint.region, environment=parsed_api_endpoint.environment, caller_name=caller_name, caller_version=caller_version, dev_ops_url=dev_ops_url, dev_ops_api_version=dev_ops_api_version, ) else: raise ValueError("Cannot parse the provided API endpoint.") def info(self, *, max_time_ms: Optional[int] = None) -> AdminDatabaseInfo: """ Query the DevOps API for the full info on this database. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: An AdminDatabaseInfo object. Example: >>> my_db_info = admin_for_my_db.info() >>> my_db_info.status 'ACTIVE' >>> my_db_info.info.region 'us-east1' """ logger.info(f"getting info ('{self._database_id}')") req_response = self._astra_db_admin.database_info( id=self._database_id, max_time_ms=max_time_ms, ) logger.info(f"finished getting info ('{self._database_id}')") return req_response # type: ignore[no-any-return] async def async_info( self, *, max_time_ms: Optional[int] = None ) -> AdminDatabaseInfo: """ Query the DevOps API for the full info on this database. Async version of the method, for use in an asyncio context. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: An AdminDatabaseInfo object. Example: >>> async def wait_until_active(db_admin: AstraDBDatabaseAdmin) -> None: ... while True: ... info = await db_admin.async_info() ... if info.status == "ACTIVE": ... return ... >>> asyncio.run(wait_until_active(admin_for_my_db)) """ logger.info(f"getting info ('{self._database_id}'), async") req_response = await self._astra_db_admin.async_database_info( id=self._database_id, max_time_ms=max_time_ms, ) logger.info(f"finished getting info ('{self._database_id}'), async") return req_response # type: ignore[no-any-return] def list_namespaces(self, *, max_time_ms: Optional[int] = None) -> List[str]: """ Query the DevOps API for a list of the namespaces in the database. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: A list of the namespaces, each a string, in no particular order. Example: >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'staging_namespace'] """ logger.info(f"getting namespaces ('{self._database_id}')") info = self.info(max_time_ms=max_time_ms) logger.info(f"finished getting namespaces ('{self._database_id}')") if info.raw_info is None: raise DevOpsAPIException("Could not get the namespace list.") else: return info.raw_info["info"]["keyspaces"] # type: ignore[no-any-return] async def async_list_namespaces( self, *, max_time_ms: Optional[int] = None ) -> List[str]: """ Query the DevOps API for a list of the namespaces in the database. Async version of the method, for use in an asyncio context. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: A list of the namespaces, each a string, in no particular order. Example: >>> async def check_if_ns_exists( ... db_admin: AstraDBDatabaseAdmin, namespace: str ... ) -> bool: ... ns_list = await db_admin.async_list_namespaces() ... return namespace in ns_list ... >>> asyncio.run(check_if_ns_exists(admin_for_my_db, "dragons")) False >>> asyncio.run(check_if_db_exists(admin_for_my_db, "app_namespace")) True """ logger.info(f"getting namespaces ('{self._database_id}'), async") info = await self.async_info(max_time_ms=max_time_ms) logger.info(f"finished getting namespaces ('{self._database_id}'), async") if info.raw_info is None: raise DevOpsAPIException("Could not get the namespace list.") else: return info.raw_info["info"]["keyspaces"] # type: ignore[no-any-return] @ops_recast_method_sync def create_namespace( self, name: str, *, wait_until_active: bool = True, update_db_namespace: Optional[bool] = None, max_time_ms: Optional[int] = None, **kwargs: Any, ) -> Dict[str, Any]: """ Create a namespace in this database as requested, optionally waiting for it to be ready. Args: name: the namespace name. If supplying a namespace that exists already, the method call proceeds as usual, no errors are raised, and the whole invocation is a no-op. wait_until_active: if True (default), the method returns only after the target database is in ACTIVE state again (a few seconds, usually). If False, it will return right after issuing the creation request to the DevOps API, and it will be responsibility of the caller to check the database status/namespace availability before working with it. update_db_namespace: if True, the `Database` or `AsyncDatabase` class that spawned this DatabaseAdmin, if any, gets updated to work on the newly-created namespace starting when this method returns. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> my_db_admin.list_namespaces() ['default_keyspace'] >>> my_db_admin.create_namespace("that_other_one") {'ok': 1} >>> my_db_admin.list_namespaces() ['default_keyspace', 'that_other_one'] """ timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) logger.info(f"creating namespace '{name}' on '{self._database_id}'") cn_response = self._astra_db_admin._astra_db_ops.create_keyspace( database=self._database_id, keyspace=name, timeout_info=base_timeout_info(max_time_ms), ) logger.info( f"devops api returned from creating namespace '{name}' on '{self._database_id}'" ) if cn_response is not None and name == cn_response.get("name"): if wait_until_active: last_status_seen = STATUS_MAINTENANCE while last_status_seen == STATUS_MAINTENANCE: logger.info(f"sleeping to poll for status of '{self._database_id}'") time.sleep(DATABASE_POLL_NAMESPACE_SLEEP_TIME) last_status_seen = self.info( max_time_ms=timeout_manager.remaining_timeout_ms(), ).status if last_status_seen != STATUS_ACTIVE: raise DevOpsAPIException( f"Database entered unexpected status {last_status_seen} after MAINTENANCE." ) # is the namespace found? if name not in self.list_namespaces(): raise DevOpsAPIException("Could not create the namespace.") logger.info( f"finished creating namespace '{name}' on '{self._database_id}'" ) if update_db_namespace: self.spawner_database.use_namespace(name) return {"ok": 1} else: raise DevOpsAPIException( f"Could not issue a successful create-namespace DevOps API request for {name}." ) # the 'override' is because the error-recast decorator washes out the signature @ops_recast_method_async async def async_create_namespace( # type: ignore[override] self, name: str, *, wait_until_active: bool = True, update_db_namespace: Optional[bool] = None, max_time_ms: Optional[int] = None, **kwargs: Any, ) -> Dict[str, Any]: """ Create a namespace in this database as requested, optionally waiting for it to be ready. Async version of the method, for use in an asyncio context. Args: name: the namespace name. If supplying a namespace that exists already, the method call proceeds as usual, no errors are raised, and the whole invocation is a no-op. wait_until_active: if True (default), the method returns only after the target database is in ACTIVE state again (a few seconds, usually). If False, it will return right after issuing the creation request to the DevOps API, and it will be responsibility of the caller to check the database status/namespace availability before working with it. update_db_namespace: if True, the `Database` or `AsyncDatabase` class that spawned this DatabaseAdmin, if any, gets updated to work on the newly-created namespace starting when this method returns. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> asyncio.run( ... my_db_admin.async_create_namespace("app_namespace") ... ) {'ok': 1} """ timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) logger.info(f"creating namespace '{name}' on '{self._database_id}', async") cn_response = await self._astra_db_admin._astra_db_ops.async_create_keyspace( database=self._database_id, keyspace=name, timeout_info=base_timeout_info(max_time_ms), ) logger.info( f"devops api returned from creating namespace " f"'{name}' on '{self._database_id}', async" ) if cn_response is not None and name == cn_response.get("name"): if wait_until_active: last_status_seen = STATUS_MAINTENANCE while last_status_seen == STATUS_MAINTENANCE: logger.info( f"sleeping to poll for status of '{self._database_id}', async" ) await asyncio.sleep(DATABASE_POLL_NAMESPACE_SLEEP_TIME) last_db_info = await self.async_info( max_time_ms=timeout_manager.remaining_timeout_ms(), ) last_status_seen = last_db_info.status if last_status_seen != STATUS_ACTIVE: raise DevOpsAPIException( f"Database entered unexpected status {last_status_seen} after MAINTENANCE." ) # is the namespace found? if name not in await self.async_list_namespaces(): raise DevOpsAPIException("Could not create the namespace.") logger.info( f"finished creating namespace '{name}' on '{self._database_id}', async" ) if update_db_namespace: self.spawner_database.use_namespace(name) return {"ok": 1} else: raise DevOpsAPIException( f"Could not issue a successful create-namespace DevOps API request for {name}." ) @ops_recast_method_sync def drop_namespace( self, name: str, *, wait_until_active: bool = True, max_time_ms: Optional[int] = None, ) -> Dict[str, Any]: """ Delete a namespace from the database, optionally waiting for it to become active again. Args: name: the namespace to delete. If it does not exist in this database, an error is raised. wait_until_active: if True (default), the method returns only after the target database is in ACTIVE state again (a few seconds, usually). If False, it will return right after issuing the deletion request to the DevOps API, and it will be responsibility of the caller to check the database status/namespace availability before working with it. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> my_db_admin.list_namespaces() ['default_keyspace', 'that_other_one'] >>> my_db_admin.drop_namespace("that_other_one") {'ok': 1} >>> my_db_admin.list_namespaces() ['default_keyspace'] """ timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) logger.info(f"dropping namespace '{name}' on '{self._database_id}'") dk_response = self._astra_db_admin._astra_db_ops.delete_keyspace( database=self._database_id, keyspace=name, timeout_info=base_timeout_info(max_time_ms), ) logger.info( f"devops api returned from dropping namespace '{name}' on '{self._database_id}'" ) if dk_response == name: if wait_until_active: last_status_seen = STATUS_MAINTENANCE while last_status_seen == STATUS_MAINTENANCE: logger.info(f"sleeping to poll for status of '{self._database_id}'") time.sleep(DATABASE_POLL_NAMESPACE_SLEEP_TIME) last_status_seen = self.info( max_time_ms=timeout_manager.remaining_timeout_ms(), ).status if last_status_seen != STATUS_ACTIVE: raise DevOpsAPIException( f"Database entered unexpected status {last_status_seen} after MAINTENANCE." ) # is the namespace found? if name in self.list_namespaces(): raise DevOpsAPIException("Could not drop the namespace.") logger.info( f"finished dropping namespace '{name}' on '{self._database_id}'" ) return {"ok": 1} else: raise DevOpsAPIException( f"Could not issue a successful delete-namespace DevOps API request for {name}." ) # the 'override' is because the error-recast decorator washes out the signature @ops_recast_method_async async def async_drop_namespace( # type: ignore[override] self, name: str, *, wait_until_active: bool = True, max_time_ms: Optional[int] = None, ) -> Dict[str, Any]: """ Delete a namespace from the database, optionally waiting for it to become active again. Async version of the method, for use in an asyncio context. Args: name: the namespace to delete. If it does not exist in this database, an error is raised. wait_until_active: if True (default), the method returns only after the target database is in ACTIVE state again (a few seconds, usually). If False, it will return right after issuing the deletion request to the DevOps API, and it will be responsibility of the caller to check the database status/namespace availability before working with it. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> asyncio.run( ... my_db_admin.async_drop_namespace("app_namespace") ... ) {'ok': 1} """ timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) logger.info(f"dropping namespace '{name}' on '{self._database_id}', async") dk_response = await self._astra_db_admin._astra_db_ops.async_delete_keyspace( database=self._database_id, keyspace=name, timeout_info=base_timeout_info(max_time_ms), ) logger.info( f"devops api returned from dropping namespace " f"'{name}' on '{self._database_id}', async" ) if dk_response == name: if wait_until_active: last_status_seen = STATUS_MAINTENANCE while last_status_seen == STATUS_MAINTENANCE: logger.info( f"sleeping to poll for status of '{self._database_id}', async" ) await asyncio.sleep(DATABASE_POLL_NAMESPACE_SLEEP_TIME) last_db_info = await self.async_info( max_time_ms=timeout_manager.remaining_timeout_ms(), ) last_status_seen = last_db_info.status if last_status_seen != STATUS_ACTIVE: raise DevOpsAPIException( f"Database entered unexpected status {last_status_seen} after MAINTENANCE." ) # is the namespace found? if name in await self.async_list_namespaces(): raise DevOpsAPIException("Could not drop the namespace.") logger.info( f"finished dropping namespace '{name}' on '{self._database_id}', async" ) return {"ok": 1} else: raise DevOpsAPIException( f"Could not issue a successful delete-namespace DevOps API request for {name}." ) def drop( self, *, wait_until_active: bool = True, max_time_ms: Optional[int] = None, ) -> Dict[str, Any]: """ Drop this database, i.e. delete it completely and permanently with all its data. This method wraps the `drop_database` method of the AstraDBAdmin class, where more information may be found. Args: wait_until_active: if True (default), the method returns only after the database has actually been deleted (generally a few minutes). If False, it will return right after issuing the drop request to the DevOps API, and it will be responsibility of the caller to check the database status/availability after that, if desired. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> my_db_admin.list_namespaces() ['default_keyspace', 'that_other_one'] >>> my_db_admin.drop() {'ok': 1} >>> my_db_admin.list_namespaces() # raises a 404 Not Found http error Note: Once the method succeeds, methods on this object -- such as `info()`, or `list_namespaces()` -- can still be invoked: however, this hardly makes sense as the underlying actual database is no more. It is responsibility of the developer to design a correct flow which avoids using a deceased database any further. """ logger.info(f"dropping this database ('{self._database_id}')") return self._astra_db_admin.drop_database( # type: ignore[no-any-return] id=self._database_id, wait_until_active=wait_until_active, max_time_ms=max_time_ms, ) logger.info(f"finished dropping this database ('{self._database_id}')") async def async_drop( self, *, wait_until_active: bool = True, max_time_ms: Optional[int] = None, ) -> Dict[str, Any]: """ Drop this database, i.e. delete it completely and permanently with all its data. Async version of the method, for use in an asyncio context. This method wraps the `drop_database` method of the AstraDBAdmin class, where more information may be found. Args: wait_until_active: if True (default), the method returns only after the database has actually been deleted (generally a few minutes). If False, it will return right after issuing the drop request to the DevOps API, and it will be responsibility of the caller to check the database status/availability after that, if desired. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> asyncio.run(my_db_admin.async_drop()) {'ok': 1} Note: Once the method succeeds, methods on this object -- such as `info()`, or `list_namespaces()` -- can still be invoked: however, this hardly makes sense as the underlying actual database is no more. It is responsibility of the developer to design a correct flow which avoids using a deceased database any further. """ logger.info(f"dropping this database ('{self._database_id}'), async") return await self._astra_db_admin.async_drop_database( # type: ignore[no-any-return] id=self._database_id, wait_until_active=wait_until_active, max_time_ms=max_time_ms, ) logger.info(f"finished dropping this database ('{self._database_id}'), async") def get_database( self, *, token: Optional[Union[str, TokenProvider]] = None, namespace: Optional[str] = None, region: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, max_time_ms: Optional[int] = None, ) -> Database: """ Create a Database instance from this database admin, for data-related tasks. Args: token: if supplied, is passed to the Database instead of the one set for this object. Useful if one wants to work in a least-privilege manner, limiting the permissions for non-admin work. This can be either a literal token string or a subclass of `astrapy.authentication.TokenProvider`. namespace: an optional namespace to set in the resulting Database. The same default logic as for `AstraDBAdmin.get_database` applies. region: *This parameter is deprecated and should not be used.* Ignored in the method. api_path: path to append to the API Endpoint. In typical usage, this should be left to its default of "/api/json". api_version: version specifier to append to the API path. In typical usage, this should be left to its default of "v1". Returns: A Database object, ready to be used for working with data and collections. Example: >>> my_db = my_db_admin.get_database() >>> my_db.list_collection_names() ['movies', 'another_collection'] Note: creating an instance of Database does not trigger actual creation of the database itself, which should exist beforehand. To create databases, see the AstraDBAdmin class. """ if region is not None: the_warning = DeprecatedWarning( "The 'region' parameter is deprecated in this method and will be ignored.", deprecated_in="1.3.2", removed_in="2.0.0", details="The database class whose method is invoked already has a region set.", ) warnings.warn( the_warning, stacklevel=2, ) return self._astra_db_admin.get_database( id=self.api_endpoint, token=token, namespace=namespace, api_path=api_path, api_version=api_version, max_time_ms=max_time_ms, ) def get_async_database( self, *, token: Optional[Union[str, TokenProvider]] = None, namespace: Optional[str] = None, region: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, max_time_ms: Optional[int] = None, ) -> AsyncDatabase: """ Create an AsyncDatabase instance out of this class for working with the data in it. This method has identical behavior and signature as the sync counterpart `get_database`: please see that one for more details. """ return self.get_database( token=token, namespace=namespace, region=region, api_path=api_path, api_version=api_version, max_time_ms=max_time_ms, ).to_async() def find_embedding_providers( self, *, max_time_ms: Optional[int] = None ) -> FindEmbeddingProvidersResult: """ Query the API for the full information on available embedding providers. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: A `FindEmbeddingProvidersResult` object with the complete information returned by the API about available embedding providers Example (output abridged and indented for clarity): >>> admin_for_my_db.find_embedding_providers() FindEmbeddingProvidersResult(embedding_providers=..., openai, ...) >>> admin_for_my_db.find_embedding_providers().embedding_providers { 'openai': EmbeddingProvider( display_name='OpenAI', models=[ EmbeddingProviderModel(name='text-embedding-3-small'), ... ] ), ... } """ logger.info("getting list of embedding providers") fe_response = self._api_commander.request( payload={"findEmbeddingProviders": {}}, timeout_info=base_timeout_info(max_time_ms), ) if "embeddingProviders" not in fe_response.get("status", {}): raise DataAPIFaultyResponseException( text="Faulty response from findEmbeddingProviders API command.", raw_response=fe_response, ) else: logger.info("finished getting list of embedding providers") return FindEmbeddingProvidersResult.from_dict(fe_response["status"]) async def async_find_embedding_providers( self, *, max_time_ms: Optional[int] = None ) -> FindEmbeddingProvidersResult: """ Query the API for the full information on available embedding providers. Async version of the method, for use in an asyncio context. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: A `FindEmbeddingProvidersResult` object with the complete information returned by the API about available embedding providers Example (output abridged and indented for clarity): >>> admin_for_my_db.find_embedding_providers() FindEmbeddingProvidersResult(embedding_providers=..., openai, ...) >>> admin_for_my_db.find_embedding_providers().embedding_providers { 'openai': EmbeddingProvider( display_name='OpenAI', models=[ EmbeddingProviderModel(name='text-embedding-3-small'), ... ] ), ... } """ logger.info("getting list of embedding providers, async") fe_response = await self._api_commander.async_request( payload={"findEmbeddingProviders": {}}, timeout_info=base_timeout_info(max_time_ms), ) if "embeddingProviders" not in fe_response.get("status", {}): raise DataAPIFaultyResponseException( text="Faulty response from findEmbeddingProviders API command.", raw_response=fe_response, ) else: logger.info("finished getting list of embedding providers, async") return FindEmbeddingProvidersResult.from_dict(fe_response["status"])
Ancestors
- DatabaseAdmin
- abc.ABC
Static methods
def from_api_endpoint(api_endpoint: str, *, token: Optional[Union[str, TokenProvider]] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, dev_ops_url: Optional[str] = None, dev_ops_api_version: Optional[str] = None) ‑> AstraDBDatabaseAdmin
-
Create an AstraDBDatabaseAdmin from an API Endpoint and optionally a token.
Args
api_endpoint
- a full API endpoint for the Data Api.
token
- an access token with enough permissions to do admin work.
This can be either a literal token string or a subclass of
TokenProvider
. caller_name
- name of the application, or framework, on behalf of which the DevOps API calls are performed. This ends up in the request user-agent.
caller_version
- version of the caller.
dev_ops_url
- in case of custom deployments, this can be used to specify the URL to the DevOps API, such as "https://api.astra.datastax.com". Generally it can be omitted. The environment (prod/dev/…) is determined from the API Endpoint.
dev_ops_api_version
- this can specify a custom version of the DevOps API (such as "v2"). Generally not needed.
Returns
An AstraDBDatabaseAdmin object, for admin work within the database.
Example
>>> from astrapy import AstraDBDatabaseAdmin >>> admin_for_my_db = AstraDBDatabaseAdmin.from_api_endpoint( ... api_endpoint="https://01234567-....apps.astra.datastax.com", ... token="AstraCS:...", ... ) >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'another_namespace'] >>> admin_for_my_db.info().status 'ACTIVE'
Note
Creating an instance of AstraDBDatabaseAdmin does not trigger actual creation of the database itself, which should exist beforehand. To create databases, see the AstraDBAdmin class.
Expand source code
@staticmethod def from_api_endpoint( api_endpoint: str, *, token: Optional[Union[str, TokenProvider]] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, dev_ops_url: Optional[str] = None, dev_ops_api_version: Optional[str] = None, ) -> AstraDBDatabaseAdmin: """ Create an AstraDBDatabaseAdmin from an API Endpoint and optionally a token. Args: api_endpoint: a full API endpoint for the Data Api. token: an access token with enough permissions to do admin work. This can be either a literal token string or a subclass of `astrapy.authentication.TokenProvider`. caller_name: name of the application, or framework, on behalf of which the DevOps API calls are performed. This ends up in the request user-agent. caller_version: version of the caller. dev_ops_url: in case of custom deployments, this can be used to specify the URL to the DevOps API, such as "https://api.astra.datastax.com". Generally it can be omitted. The environment (prod/dev/...) is determined from the API Endpoint. dev_ops_api_version: this can specify a custom version of the DevOps API (such as "v2"). Generally not needed. Returns: An AstraDBDatabaseAdmin object, for admin work within the database. Example: >>> from astrapy import AstraDBDatabaseAdmin >>> admin_for_my_db = AstraDBDatabaseAdmin.from_api_endpoint( ... api_endpoint="https://01234567-....apps.astra.datastax.com", ... token="AstraCS:...", ... ) >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'another_namespace'] >>> admin_for_my_db.info().status 'ACTIVE' Note: Creating an instance of AstraDBDatabaseAdmin does not trigger actual creation of the database itself, which should exist beforehand. To create databases, see the AstraDBAdmin class. """ parsed_api_endpoint = parse_api_endpoint(api_endpoint) if parsed_api_endpoint: return AstraDBDatabaseAdmin( id=parsed_api_endpoint.database_id, token=token, region=parsed_api_endpoint.region, environment=parsed_api_endpoint.environment, caller_name=caller_name, caller_version=caller_version, dev_ops_url=dev_ops_url, dev_ops_api_version=dev_ops_api_version, ) else: raise ValueError("Cannot parse the provided API endpoint.")
def from_astra_db_admin(id: str, *, region: Optional[str], astra_db_admin: AstraDBAdmin, max_time_ms: Optional[int] = None) ‑> AstraDBDatabaseAdmin
-
Create an AstraDBDatabaseAdmin from an AstraDBAdmin and a database ID.
Args
id
- the target database ID (e.g.
01234567-89ab-cdef-0123-456789abcdef
) or the corresponding API Endpoint (e.g.https://<ID>-<REGION>.apps.astra.datastax.com
). region
- the region to use for connecting to the database. The
database must be located in that region.
The region cannot be specified when the API endoint is used as
id
. Note that if this parameter is not passed, and cannot be inferred from the API endpoint, an additional DevOps API request is made to determine the default region and use it subsequently. astra_db_admin
- an AstraDBAdmin object that has visibility over the target database.
max_time_ms
- a timeout, in milliseconds, for the DevOps API
HTTP request should it be necessary (see the
region
argument).
Returns
An AstraDBDatabaseAdmin object, for admin work within the database.
Example
>>> from astrapy import DataAPIClient, AstraDBDatabaseAdmin >>> admin_for_my_db = AstraDBDatabaseAdmin.from_astra_db_admin( ... id="01234567-...", ... astra_db_admin=DataAPIClient("AstraCS:...").get_admin(), ... ) >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'staging_namespace'] >>> admin_for_my_db.info().status 'ACTIVE'
Note
Creating an instance of AstraDBDatabaseAdmin does not trigger actual creation of the database itself, which should exist beforehand. To create databases, see the AstraDBAdmin class.
Expand source code
@staticmethod def from_astra_db_admin( id: str, *, region: Optional[str], astra_db_admin: AstraDBAdmin, max_time_ms: Optional[int] = None, ) -> AstraDBDatabaseAdmin: """ Create an AstraDBDatabaseAdmin from an AstraDBAdmin and a database ID. Args: id: the target database ID (e.g. `01234567-89ab-cdef-0123-456789abcdef`) or the corresponding API Endpoint (e.g. `https://<ID>-<REGION>.apps.astra.datastax.com`). region: the region to use for connecting to the database. The database must be located in that region. The region cannot be specified when the API endoint is used as `id`. Note that if this parameter is not passed, and cannot be inferred from the API endpoint, an additional DevOps API request is made to determine the default region and use it subsequently. astra_db_admin: an AstraDBAdmin object that has visibility over the target database. max_time_ms: a timeout, in milliseconds, for the DevOps API HTTP request should it be necessary (see the `region` argument). Returns: An AstraDBDatabaseAdmin object, for admin work within the database. Example: >>> from astrapy import DataAPIClient, AstraDBDatabaseAdmin >>> admin_for_my_db = AstraDBDatabaseAdmin.from_astra_db_admin( ... id="01234567-...", ... astra_db_admin=DataAPIClient("AstraCS:...").get_admin(), ... ) >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'staging_namespace'] >>> admin_for_my_db.info().status 'ACTIVE' Note: Creating an instance of AstraDBDatabaseAdmin does not trigger actual creation of the database itself, which should exist beforehand. To create databases, see the AstraDBAdmin class. """ return AstraDBDatabaseAdmin( id=id, token=astra_db_admin.token_provider, region=region, environment=astra_db_admin.environment, caller_name=astra_db_admin._caller_name, caller_version=astra_db_admin._caller_version, dev_ops_url=astra_db_admin._dev_ops_url, dev_ops_api_version=astra_db_admin._dev_ops_api_version, max_time_ms=max_time_ms, )
Instance variables
var id : str
-
The ID of this database admin.
Example
>>> my_db_admin.id '01234567-89ab-cdef-0123-456789abcdef'
Expand source code
@property def id(self) -> str: """ The ID of this database admin. Example: >>> my_db_admin.id '01234567-89ab-cdef-0123-456789abcdef' """ return self._database_id
var region : str
-
The region for this database admin.
Example
>>> my_db_admin.region 'us-east-1'
Expand source code
@property def region(self) -> str: """ The region for this database admin. Example: >>> my_db_admin.region 'us-east-1' """ return self._region
Methods
async def async_create_namespace(self, name: str, *, wait_until_active: bool = True, update_db_namespace: Optional[bool] = None, max_time_ms: Optional[int] = None, **kwargs: Any) ‑> Dict[str, Any]
-
Create a namespace in this database as requested, optionally waiting for it to be ready. Async version of the method, for use in an asyncio context.
Args
name
- the namespace name. If supplying a namespace that exists already, the method call proceeds as usual, no errors are raised, and the whole invocation is a no-op.
wait_until_active
- if True (default), the method returns only after the target database is in ACTIVE state again (a few seconds, usually). If False, it will return right after issuing the creation request to the DevOps API, and it will be responsibility of the caller to check the database status/namespace availability before working with it.
update_db_namespace
- if True, the
Database
orAsyncDatabase
class that spawned this DatabaseAdmin, if any, gets updated to work on the newly-created namespace starting when this method returns. max_time_ms
- a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request has not reached the API server.
Returns
A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised.
Example
>>> asyncio.run( ... my_db_admin.async_create_namespace("app_namespace") ... ) {'ok': 1}
Expand source code
@ops_recast_method_async async def async_create_namespace( # type: ignore[override] self, name: str, *, wait_until_active: bool = True, update_db_namespace: Optional[bool] = None, max_time_ms: Optional[int] = None, **kwargs: Any, ) -> Dict[str, Any]: """ Create a namespace in this database as requested, optionally waiting for it to be ready. Async version of the method, for use in an asyncio context. Args: name: the namespace name. If supplying a namespace that exists already, the method call proceeds as usual, no errors are raised, and the whole invocation is a no-op. wait_until_active: if True (default), the method returns only after the target database is in ACTIVE state again (a few seconds, usually). If False, it will return right after issuing the creation request to the DevOps API, and it will be responsibility of the caller to check the database status/namespace availability before working with it. update_db_namespace: if True, the `Database` or `AsyncDatabase` class that spawned this DatabaseAdmin, if any, gets updated to work on the newly-created namespace starting when this method returns. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> asyncio.run( ... my_db_admin.async_create_namespace("app_namespace") ... ) {'ok': 1} """ timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) logger.info(f"creating namespace '{name}' on '{self._database_id}', async") cn_response = await self._astra_db_admin._astra_db_ops.async_create_keyspace( database=self._database_id, keyspace=name, timeout_info=base_timeout_info(max_time_ms), ) logger.info( f"devops api returned from creating namespace " f"'{name}' on '{self._database_id}', async" ) if cn_response is not None and name == cn_response.get("name"): if wait_until_active: last_status_seen = STATUS_MAINTENANCE while last_status_seen == STATUS_MAINTENANCE: logger.info( f"sleeping to poll for status of '{self._database_id}', async" ) await asyncio.sleep(DATABASE_POLL_NAMESPACE_SLEEP_TIME) last_db_info = await self.async_info( max_time_ms=timeout_manager.remaining_timeout_ms(), ) last_status_seen = last_db_info.status if last_status_seen != STATUS_ACTIVE: raise DevOpsAPIException( f"Database entered unexpected status {last_status_seen} after MAINTENANCE." ) # is the namespace found? if name not in await self.async_list_namespaces(): raise DevOpsAPIException("Could not create the namespace.") logger.info( f"finished creating namespace '{name}' on '{self._database_id}', async" ) if update_db_namespace: self.spawner_database.use_namespace(name) return {"ok": 1} else: raise DevOpsAPIException( f"Could not issue a successful create-namespace DevOps API request for {name}." )
async def async_drop(self, *, wait_until_active: bool = True, max_time_ms: Optional[int] = None) ‑> Dict[str, Any]
-
Drop this database, i.e. delete it completely and permanently with all its data. Async version of the method, for use in an asyncio context.
This method wraps the
drop_database
method of the AstraDBAdmin class, where more information may be found.Args
wait_until_active
- if True (default), the method returns only after the database has actually been deleted (generally a few minutes). If False, it will return right after issuing the drop request to the DevOps API, and it will be responsibility of the caller to check the database status/availability after that, if desired.
max_time_ms
- a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server.
Returns
A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised.
Example
>>> asyncio.run(my_db_admin.async_drop()) {'ok': 1}
Note
Once the method succeeds, methods on this object – such as
info()
, orlist_namespaces()
– can still be invoked: however, this hardly makes sense as the underlying actual database is no more. It is responsibility of the developer to design a correct flow which avoids using a deceased database any further.Expand source code
async def async_drop( self, *, wait_until_active: bool = True, max_time_ms: Optional[int] = None, ) -> Dict[str, Any]: """ Drop this database, i.e. delete it completely and permanently with all its data. Async version of the method, for use in an asyncio context. This method wraps the `drop_database` method of the AstraDBAdmin class, where more information may be found. Args: wait_until_active: if True (default), the method returns only after the database has actually been deleted (generally a few minutes). If False, it will return right after issuing the drop request to the DevOps API, and it will be responsibility of the caller to check the database status/availability after that, if desired. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> asyncio.run(my_db_admin.async_drop()) {'ok': 1} Note: Once the method succeeds, methods on this object -- such as `info()`, or `list_namespaces()` -- can still be invoked: however, this hardly makes sense as the underlying actual database is no more. It is responsibility of the developer to design a correct flow which avoids using a deceased database any further. """ logger.info(f"dropping this database ('{self._database_id}'), async") return await self._astra_db_admin.async_drop_database( # type: ignore[no-any-return] id=self._database_id, wait_until_active=wait_until_active, max_time_ms=max_time_ms, ) logger.info(f"finished dropping this database ('{self._database_id}'), async")
async def async_drop_namespace(self, name: str, *, wait_until_active: bool = True, max_time_ms: Optional[int] = None) ‑> Dict[str, Any]
-
Delete a namespace from the database, optionally waiting for it to become active again. Async version of the method, for use in an asyncio context.
Args
name
- the namespace to delete. If it does not exist in this database, an error is raised.
wait_until_active
- if True (default), the method returns only after the target database is in ACTIVE state again (a few seconds, usually). If False, it will return right after issuing the deletion request to the DevOps API, and it will be responsibility of the caller to check the database status/namespace availability before working with it.
max_time_ms
- a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server.
Returns
A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised.
Example
>>> asyncio.run( ... my_db_admin.async_drop_namespace("app_namespace") ... ) {'ok': 1}
Expand source code
@ops_recast_method_async async def async_drop_namespace( # type: ignore[override] self, name: str, *, wait_until_active: bool = True, max_time_ms: Optional[int] = None, ) -> Dict[str, Any]: """ Delete a namespace from the database, optionally waiting for it to become active again. Async version of the method, for use in an asyncio context. Args: name: the namespace to delete. If it does not exist in this database, an error is raised. wait_until_active: if True (default), the method returns only after the target database is in ACTIVE state again (a few seconds, usually). If False, it will return right after issuing the deletion request to the DevOps API, and it will be responsibility of the caller to check the database status/namespace availability before working with it. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> asyncio.run( ... my_db_admin.async_drop_namespace("app_namespace") ... ) {'ok': 1} """ timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) logger.info(f"dropping namespace '{name}' on '{self._database_id}', async") dk_response = await self._astra_db_admin._astra_db_ops.async_delete_keyspace( database=self._database_id, keyspace=name, timeout_info=base_timeout_info(max_time_ms), ) logger.info( f"devops api returned from dropping namespace " f"'{name}' on '{self._database_id}', async" ) if dk_response == name: if wait_until_active: last_status_seen = STATUS_MAINTENANCE while last_status_seen == STATUS_MAINTENANCE: logger.info( f"sleeping to poll for status of '{self._database_id}', async" ) await asyncio.sleep(DATABASE_POLL_NAMESPACE_SLEEP_TIME) last_db_info = await self.async_info( max_time_ms=timeout_manager.remaining_timeout_ms(), ) last_status_seen = last_db_info.status if last_status_seen != STATUS_ACTIVE: raise DevOpsAPIException( f"Database entered unexpected status {last_status_seen} after MAINTENANCE." ) # is the namespace found? if name in await self.async_list_namespaces(): raise DevOpsAPIException("Could not drop the namespace.") logger.info( f"finished dropping namespace '{name}' on '{self._database_id}', async" ) return {"ok": 1} else: raise DevOpsAPIException( f"Could not issue a successful delete-namespace DevOps API request for {name}." )
async def async_find_embedding_providers(self, *, max_time_ms: Optional[int] = None) ‑> FindEmbeddingProvidersResult
-
Query the API for the full information on available embedding providers. Async version of the method, for use in an asyncio context.
Args
max_time_ms
- a timeout, in milliseconds, for the DevOps API request.
Returns
A
FindEmbeddingProvidersResult
object with the complete information returned by the API about available embedding providers Example (output abridged and indented for clarity): >>> admin_for_my_db.find_embedding_providers() FindEmbeddingProvidersResult(embedding_providers=…, openai, …) >>> admin_for_my_db.find_embedding_providers().embedding_providers { 'openai': EmbeddingProvider( display_name='OpenAI', models=[ EmbeddingProviderModel(name='text-embedding-3-small'), … ] ), … }Expand source code
async def async_find_embedding_providers( self, *, max_time_ms: Optional[int] = None ) -> FindEmbeddingProvidersResult: """ Query the API for the full information on available embedding providers. Async version of the method, for use in an asyncio context. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: A `FindEmbeddingProvidersResult` object with the complete information returned by the API about available embedding providers Example (output abridged and indented for clarity): >>> admin_for_my_db.find_embedding_providers() FindEmbeddingProvidersResult(embedding_providers=..., openai, ...) >>> admin_for_my_db.find_embedding_providers().embedding_providers { 'openai': EmbeddingProvider( display_name='OpenAI', models=[ EmbeddingProviderModel(name='text-embedding-3-small'), ... ] ), ... } """ logger.info("getting list of embedding providers, async") fe_response = await self._api_commander.async_request( payload={"findEmbeddingProviders": {}}, timeout_info=base_timeout_info(max_time_ms), ) if "embeddingProviders" not in fe_response.get("status", {}): raise DataAPIFaultyResponseException( text="Faulty response from findEmbeddingProviders API command.", raw_response=fe_response, ) else: logger.info("finished getting list of embedding providers, async") return FindEmbeddingProvidersResult.from_dict(fe_response["status"])
async def async_info(self, *, max_time_ms: Optional[int] = None) ‑> AdminDatabaseInfo
-
Query the DevOps API for the full info on this database. Async version of the method, for use in an asyncio context.
Args
max_time_ms
- a timeout, in milliseconds, for the DevOps API request.
Returns
An AdminDatabaseInfo object.
Example
>>> async def wait_until_active(db_admin: AstraDBDatabaseAdmin) -> None: ... while True: ... info = await db_admin.async_info() ... if info.status == "ACTIVE": ... return ... >>> asyncio.run(wait_until_active(admin_for_my_db))
Expand source code
async def async_info( self, *, max_time_ms: Optional[int] = None ) -> AdminDatabaseInfo: """ Query the DevOps API for the full info on this database. Async version of the method, for use in an asyncio context. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: An AdminDatabaseInfo object. Example: >>> async def wait_until_active(db_admin: AstraDBDatabaseAdmin) -> None: ... while True: ... info = await db_admin.async_info() ... if info.status == "ACTIVE": ... return ... >>> asyncio.run(wait_until_active(admin_for_my_db)) """ logger.info(f"getting info ('{self._database_id}'), async") req_response = await self._astra_db_admin.async_database_info( id=self._database_id, max_time_ms=max_time_ms, ) logger.info(f"finished getting info ('{self._database_id}'), async") return req_response # type: ignore[no-any-return]
async def async_list_namespaces(self, *, max_time_ms: Optional[int] = None) ‑> List[str]
-
Query the DevOps API for a list of the namespaces in the database. Async version of the method, for use in an asyncio context.
Args
max_time_ms
- a timeout, in milliseconds, for the DevOps API request.
Returns
A list of the namespaces, each a string, in no particular order.
Example
>>> async def check_if_ns_exists( ... db_admin: AstraDBDatabaseAdmin, namespace: str ... ) -> bool: ... ns_list = await db_admin.async_list_namespaces() ... return namespace in ns_list ... >>> asyncio.run(check_if_ns_exists(admin_for_my_db, "dragons")) False >>> asyncio.run(check_if_db_exists(admin_for_my_db, "app_namespace")) True
Expand source code
async def async_list_namespaces( self, *, max_time_ms: Optional[int] = None ) -> List[str]: """ Query the DevOps API for a list of the namespaces in the database. Async version of the method, for use in an asyncio context. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: A list of the namespaces, each a string, in no particular order. Example: >>> async def check_if_ns_exists( ... db_admin: AstraDBDatabaseAdmin, namespace: str ... ) -> bool: ... ns_list = await db_admin.async_list_namespaces() ... return namespace in ns_list ... >>> asyncio.run(check_if_ns_exists(admin_for_my_db, "dragons")) False >>> asyncio.run(check_if_db_exists(admin_for_my_db, "app_namespace")) True """ logger.info(f"getting namespaces ('{self._database_id}'), async") info = await self.async_info(max_time_ms=max_time_ms) logger.info(f"finished getting namespaces ('{self._database_id}'), async") if info.raw_info is None: raise DevOpsAPIException("Could not get the namespace list.") else: return info.raw_info["info"]["keyspaces"] # type: ignore[no-any-return]
def create_namespace(self, name: str, *, wait_until_active: bool = True, update_db_namespace: Optional[bool] = None, max_time_ms: Optional[int] = None, **kwargs: Any) ‑> Dict[str, Any]
-
Create a namespace in this database as requested, optionally waiting for it to be ready.
Args
name
- the namespace name. If supplying a namespace that exists already, the method call proceeds as usual, no errors are raised, and the whole invocation is a no-op.
wait_until_active
- if True (default), the method returns only after the target database is in ACTIVE state again (a few seconds, usually). If False, it will return right after issuing the creation request to the DevOps API, and it will be responsibility of the caller to check the database status/namespace availability before working with it.
update_db_namespace
- if True, the
Database
orAsyncDatabase
class that spawned this DatabaseAdmin, if any, gets updated to work on the newly-created namespace starting when this method returns. max_time_ms
- a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request has not reached the API server.
Returns
A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised.
Example
>>> my_db_admin.list_namespaces() ['default_keyspace'] >>> my_db_admin.create_namespace("that_other_one") {'ok': 1} >>> my_db_admin.list_namespaces() ['default_keyspace', 'that_other_one']
Expand source code
@ops_recast_method_sync def create_namespace( self, name: str, *, wait_until_active: bool = True, update_db_namespace: Optional[bool] = None, max_time_ms: Optional[int] = None, **kwargs: Any, ) -> Dict[str, Any]: """ Create a namespace in this database as requested, optionally waiting for it to be ready. Args: name: the namespace name. If supplying a namespace that exists already, the method call proceeds as usual, no errors are raised, and the whole invocation is a no-op. wait_until_active: if True (default), the method returns only after the target database is in ACTIVE state again (a few seconds, usually). If False, it will return right after issuing the creation request to the DevOps API, and it will be responsibility of the caller to check the database status/namespace availability before working with it. update_db_namespace: if True, the `Database` or `AsyncDatabase` class that spawned this DatabaseAdmin, if any, gets updated to work on the newly-created namespace starting when this method returns. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> my_db_admin.list_namespaces() ['default_keyspace'] >>> my_db_admin.create_namespace("that_other_one") {'ok': 1} >>> my_db_admin.list_namespaces() ['default_keyspace', 'that_other_one'] """ timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) logger.info(f"creating namespace '{name}' on '{self._database_id}'") cn_response = self._astra_db_admin._astra_db_ops.create_keyspace( database=self._database_id, keyspace=name, timeout_info=base_timeout_info(max_time_ms), ) logger.info( f"devops api returned from creating namespace '{name}' on '{self._database_id}'" ) if cn_response is not None and name == cn_response.get("name"): if wait_until_active: last_status_seen = STATUS_MAINTENANCE while last_status_seen == STATUS_MAINTENANCE: logger.info(f"sleeping to poll for status of '{self._database_id}'") time.sleep(DATABASE_POLL_NAMESPACE_SLEEP_TIME) last_status_seen = self.info( max_time_ms=timeout_manager.remaining_timeout_ms(), ).status if last_status_seen != STATUS_ACTIVE: raise DevOpsAPIException( f"Database entered unexpected status {last_status_seen} after MAINTENANCE." ) # is the namespace found? if name not in self.list_namespaces(): raise DevOpsAPIException("Could not create the namespace.") logger.info( f"finished creating namespace '{name}' on '{self._database_id}'" ) if update_db_namespace: self.spawner_database.use_namespace(name) return {"ok": 1} else: raise DevOpsAPIException( f"Could not issue a successful create-namespace DevOps API request for {name}." )
def drop(self, *, wait_until_active: bool = True, max_time_ms: Optional[int] = None) ‑> Dict[str, Any]
-
Drop this database, i.e. delete it completely and permanently with all its data.
This method wraps the
drop_database
method of the AstraDBAdmin class, where more information may be found.Args
wait_until_active
- if True (default), the method returns only after the database has actually been deleted (generally a few minutes). If False, it will return right after issuing the drop request to the DevOps API, and it will be responsibility of the caller to check the database status/availability after that, if desired.
max_time_ms
- a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server.
Returns
A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised.
Example
>>> my_db_admin.list_namespaces() ['default_keyspace', 'that_other_one'] >>> my_db_admin.drop() {'ok': 1} >>> my_db_admin.list_namespaces() # raises a 404 Not Found http error
Note
Once the method succeeds, methods on this object – such as
info()
, orlist_namespaces()
– can still be invoked: however, this hardly makes sense as the underlying actual database is no more. It is responsibility of the developer to design a correct flow which avoids using a deceased database any further.Expand source code
def drop( self, *, wait_until_active: bool = True, max_time_ms: Optional[int] = None, ) -> Dict[str, Any]: """ Drop this database, i.e. delete it completely and permanently with all its data. This method wraps the `drop_database` method of the AstraDBAdmin class, where more information may be found. Args: wait_until_active: if True (default), the method returns only after the database has actually been deleted (generally a few minutes). If False, it will return right after issuing the drop request to the DevOps API, and it will be responsibility of the caller to check the database status/availability after that, if desired. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> my_db_admin.list_namespaces() ['default_keyspace', 'that_other_one'] >>> my_db_admin.drop() {'ok': 1} >>> my_db_admin.list_namespaces() # raises a 404 Not Found http error Note: Once the method succeeds, methods on this object -- such as `info()`, or `list_namespaces()` -- can still be invoked: however, this hardly makes sense as the underlying actual database is no more. It is responsibility of the developer to design a correct flow which avoids using a deceased database any further. """ logger.info(f"dropping this database ('{self._database_id}')") return self._astra_db_admin.drop_database( # type: ignore[no-any-return] id=self._database_id, wait_until_active=wait_until_active, max_time_ms=max_time_ms, ) logger.info(f"finished dropping this database ('{self._database_id}')")
def drop_namespace(self, name: str, *, wait_until_active: bool = True, max_time_ms: Optional[int] = None) ‑> Dict[str, Any]
-
Delete a namespace from the database, optionally waiting for it to become active again.
Args
name
- the namespace to delete. If it does not exist in this database, an error is raised.
wait_until_active
- if True (default), the method returns only after the target database is in ACTIVE state again (a few seconds, usually). If False, it will return right after issuing the deletion request to the DevOps API, and it will be responsibility of the caller to check the database status/namespace availability before working with it.
max_time_ms
- a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server.
Returns
A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised.
Example
>>> my_db_admin.list_namespaces() ['default_keyspace', 'that_other_one'] >>> my_db_admin.drop_namespace("that_other_one") {'ok': 1} >>> my_db_admin.list_namespaces() ['default_keyspace']
Expand source code
@ops_recast_method_sync def drop_namespace( self, name: str, *, wait_until_active: bool = True, max_time_ms: Optional[int] = None, ) -> Dict[str, Any]: """ Delete a namespace from the database, optionally waiting for it to become active again. Args: name: the namespace to delete. If it does not exist in this database, an error is raised. wait_until_active: if True (default), the method returns only after the target database is in ACTIVE state again (a few seconds, usually). If False, it will return right after issuing the deletion request to the DevOps API, and it will be responsibility of the caller to check the database status/namespace availability before working with it. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> my_db_admin.list_namespaces() ['default_keyspace', 'that_other_one'] >>> my_db_admin.drop_namespace("that_other_one") {'ok': 1} >>> my_db_admin.list_namespaces() ['default_keyspace'] """ timeout_manager = MultiCallTimeoutManager( overall_max_time_ms=max_time_ms, exception_type="devops_api" ) logger.info(f"dropping namespace '{name}' on '{self._database_id}'") dk_response = self._astra_db_admin._astra_db_ops.delete_keyspace( database=self._database_id, keyspace=name, timeout_info=base_timeout_info(max_time_ms), ) logger.info( f"devops api returned from dropping namespace '{name}' on '{self._database_id}'" ) if dk_response == name: if wait_until_active: last_status_seen = STATUS_MAINTENANCE while last_status_seen == STATUS_MAINTENANCE: logger.info(f"sleeping to poll for status of '{self._database_id}'") time.sleep(DATABASE_POLL_NAMESPACE_SLEEP_TIME) last_status_seen = self.info( max_time_ms=timeout_manager.remaining_timeout_ms(), ).status if last_status_seen != STATUS_ACTIVE: raise DevOpsAPIException( f"Database entered unexpected status {last_status_seen} after MAINTENANCE." ) # is the namespace found? if name in self.list_namespaces(): raise DevOpsAPIException("Could not drop the namespace.") logger.info( f"finished dropping namespace '{name}' on '{self._database_id}'" ) return {"ok": 1} else: raise DevOpsAPIException( f"Could not issue a successful delete-namespace DevOps API request for {name}." )
def find_embedding_providers(self, *, max_time_ms: Optional[int] = None) ‑> FindEmbeddingProvidersResult
-
Query the API for the full information on available embedding providers.
Args
max_time_ms
- a timeout, in milliseconds, for the DevOps API request.
Returns
A
FindEmbeddingProvidersResult
object with the complete information returned by the API about available embedding providers Example (output abridged and indented for clarity): >>> admin_for_my_db.find_embedding_providers() FindEmbeddingProvidersResult(embedding_providers=…, openai, …) >>> admin_for_my_db.find_embedding_providers().embedding_providers { 'openai': EmbeddingProvider( display_name='OpenAI', models=[ EmbeddingProviderModel(name='text-embedding-3-small'), … ] ), … }Expand source code
def find_embedding_providers( self, *, max_time_ms: Optional[int] = None ) -> FindEmbeddingProvidersResult: """ Query the API for the full information on available embedding providers. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: A `FindEmbeddingProvidersResult` object with the complete information returned by the API about available embedding providers Example (output abridged and indented for clarity): >>> admin_for_my_db.find_embedding_providers() FindEmbeddingProvidersResult(embedding_providers=..., openai, ...) >>> admin_for_my_db.find_embedding_providers().embedding_providers { 'openai': EmbeddingProvider( display_name='OpenAI', models=[ EmbeddingProviderModel(name='text-embedding-3-small'), ... ] ), ... } """ logger.info("getting list of embedding providers") fe_response = self._api_commander.request( payload={"findEmbeddingProviders": {}}, timeout_info=base_timeout_info(max_time_ms), ) if "embeddingProviders" not in fe_response.get("status", {}): raise DataAPIFaultyResponseException( text="Faulty response from findEmbeddingProviders API command.", raw_response=fe_response, ) else: logger.info("finished getting list of embedding providers") return FindEmbeddingProvidersResult.from_dict(fe_response["status"])
def get_async_database(self, *, token: Optional[Union[str, TokenProvider]] = None, namespace: Optional[str] = None, region: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, max_time_ms: Optional[int] = None) ‑> AsyncDatabase
-
Create an AsyncDatabase instance out of this class for working with the data in it.
This method has identical behavior and signature as the sync counterpart
get_database
: please see that one for more details.Expand source code
def get_async_database( self, *, token: Optional[Union[str, TokenProvider]] = None, namespace: Optional[str] = None, region: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, max_time_ms: Optional[int] = None, ) -> AsyncDatabase: """ Create an AsyncDatabase instance out of this class for working with the data in it. This method has identical behavior and signature as the sync counterpart `get_database`: please see that one for more details. """ return self.get_database( token=token, namespace=namespace, region=region, api_path=api_path, api_version=api_version, max_time_ms=max_time_ms, ).to_async()
def get_database(self, *, token: Optional[Union[str, TokenProvider]] = None, namespace: Optional[str] = None, region: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, max_time_ms: Optional[int] = None) ‑> Database
-
Create a Database instance from this database admin, for data-related tasks.
Args
token
- if supplied, is passed to the Database instead of
the one set for this object. Useful if one wants to work in
a least-privilege manner, limiting the permissions for non-admin work.
This can be either a literal token string or a subclass of
TokenProvider
. namespace
- an optional namespace to set in the resulting Database.
The same default logic as for
AstraDBAdmin.get_database()
applies. region
- This parameter is deprecated and should not be used. Ignored in the method.
api_path
- path to append to the API Endpoint. In typical usage, this should be left to its default of "/api/json".
api_version
- version specifier to append to the API path. In typical usage, this should be left to its default of "v1".
Returns
A Database object, ready to be used for working with data and collections.
Example
>>> my_db = my_db_admin.get_database() >>> my_db.list_collection_names() ['movies', 'another_collection']
Note
creating an instance of Database does not trigger actual creation of the database itself, which should exist beforehand. To create databases, see the AstraDBAdmin class.
Expand source code
def get_database( self, *, token: Optional[Union[str, TokenProvider]] = None, namespace: Optional[str] = None, region: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, max_time_ms: Optional[int] = None, ) -> Database: """ Create a Database instance from this database admin, for data-related tasks. Args: token: if supplied, is passed to the Database instead of the one set for this object. Useful if one wants to work in a least-privilege manner, limiting the permissions for non-admin work. This can be either a literal token string or a subclass of `astrapy.authentication.TokenProvider`. namespace: an optional namespace to set in the resulting Database. The same default logic as for `AstraDBAdmin.get_database` applies. region: *This parameter is deprecated and should not be used.* Ignored in the method. api_path: path to append to the API Endpoint. In typical usage, this should be left to its default of "/api/json". api_version: version specifier to append to the API path. In typical usage, this should be left to its default of "v1". Returns: A Database object, ready to be used for working with data and collections. Example: >>> my_db = my_db_admin.get_database() >>> my_db.list_collection_names() ['movies', 'another_collection'] Note: creating an instance of Database does not trigger actual creation of the database itself, which should exist beforehand. To create databases, see the AstraDBAdmin class. """ if region is not None: the_warning = DeprecatedWarning( "The 'region' parameter is deprecated in this method and will be ignored.", deprecated_in="1.3.2", removed_in="2.0.0", details="The database class whose method is invoked already has a region set.", ) warnings.warn( the_warning, stacklevel=2, ) return self._astra_db_admin.get_database( id=self.api_endpoint, token=token, namespace=namespace, api_path=api_path, api_version=api_version, max_time_ms=max_time_ms, )
def info(self, *, max_time_ms: Optional[int] = None) ‑> AdminDatabaseInfo
-
Query the DevOps API for the full info on this database.
Args
max_time_ms
- a timeout, in milliseconds, for the DevOps API request.
Returns
An AdminDatabaseInfo object.
Example
>>> my_db_info = admin_for_my_db.info() >>> my_db_info.status 'ACTIVE' >>> my_db_info.info.region 'us-east1'
Expand source code
def info(self, *, max_time_ms: Optional[int] = None) -> AdminDatabaseInfo: """ Query the DevOps API for the full info on this database. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: An AdminDatabaseInfo object. Example: >>> my_db_info = admin_for_my_db.info() >>> my_db_info.status 'ACTIVE' >>> my_db_info.info.region 'us-east1' """ logger.info(f"getting info ('{self._database_id}')") req_response = self._astra_db_admin.database_info( id=self._database_id, max_time_ms=max_time_ms, ) logger.info(f"finished getting info ('{self._database_id}')") return req_response # type: ignore[no-any-return]
def list_namespaces(self, *, max_time_ms: Optional[int] = None) ‑> List[str]
-
Query the DevOps API for a list of the namespaces in the database.
Args
max_time_ms
- a timeout, in milliseconds, for the DevOps API request.
Returns
A list of the namespaces, each a string, in no particular order.
Example
>>> admin_for_my_db.list_namespaces() ['default_keyspace', 'staging_namespace']
Expand source code
def list_namespaces(self, *, max_time_ms: Optional[int] = None) -> List[str]: """ Query the DevOps API for a list of the namespaces in the database. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: A list of the namespaces, each a string, in no particular order. Example: >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'staging_namespace'] """ logger.info(f"getting namespaces ('{self._database_id}')") info = self.info(max_time_ms=max_time_ms) logger.info(f"finished getting namespaces ('{self._database_id}')") if info.raw_info is None: raise DevOpsAPIException("Could not get the namespace list.") else: return info.raw_info["info"]["keyspaces"] # type: ignore[no-any-return]
def set_caller(self, caller_name: Optional[str] = None, caller_version: Optional[str] = None) ‑> None
-
Set a new identity for the application/framework on behalf of which the DevOps API calls will be performed (the "caller").
New objects spawned from this client afterwards will inherit the new settings.
Args
caller_name
- name of the application, or framework, on behalf of which the DevOps API calls are performed. This ends up in the request user-agent.
caller_version
- version of the caller.
Example
>>> admin_for_my_db.set_caller( ... caller_name="the_caller", ... caller_version="0.1.0", ... )
Expand source code
def set_caller( self, caller_name: Optional[str] = None, caller_version: Optional[str] = None, ) -> None: """ Set a new identity for the application/framework on behalf of which the DevOps API calls will be performed (the "caller"). New objects spawned from this client afterwards will inherit the new settings. Args: caller_name: name of the application, or framework, on behalf of which the DevOps API calls are performed. This ends up in the request user-agent. caller_version: version of the caller. Example: >>> admin_for_my_db.set_caller( ... caller_name="the_caller", ... caller_version="0.1.0", ... ) """ logger.info(f"setting caller to {caller_name}/{caller_version}") self._astra_db_admin.set_caller(caller_name, caller_version)
def with_options(self, *, id: Optional[str] = None, token: Optional[Union[str, TokenProvider]] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None) ‑> AstraDBDatabaseAdmin
-
Create a clone of this AstraDBDatabaseAdmin with some changed attributes.
Args
id
- e. g. "01234567-89ab-cdef-0123-456789abcdef".
token
- an Access Token to the database. Example:
"AstraCS:xyz..."
. This can be either a literal token string or a subclass ofTokenProvider
. caller_name
- name of the application, or framework, on behalf of which the Data API and DevOps API calls are performed. This ends up in the request user-agent.
caller_version
- version of the caller.
Returns
a new AstraDBDatabaseAdmin instance.
Example
>>> admin_for_my_other_db = admin_for_my_db.with_options( ... id="abababab-0101-2323-4545-6789abcdef01", ... )
Expand source code
def with_options( self, *, id: Optional[str] = None, token: Optional[Union[str, TokenProvider]] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, ) -> AstraDBDatabaseAdmin: """ Create a clone of this AstraDBDatabaseAdmin with some changed attributes. Args: id: e. g. "01234567-89ab-cdef-0123-456789abcdef". token: an Access Token to the database. Example: `"AstraCS:xyz..."`. This can be either a literal token string or a subclass of `astrapy.authentication.TokenProvider`. caller_name: name of the application, or framework, on behalf of which the Data API and DevOps API calls are performed. This ends up in the request user-agent. caller_version: version of the caller. Returns: a new AstraDBDatabaseAdmin instance. Example: >>> admin_for_my_other_db = admin_for_my_db.with_options( ... id="abababab-0101-2323-4545-6789abcdef01", ... ) """ return self._copy( id=id, token=token, caller_name=caller_name, caller_version=caller_version, )
class DataAPIDatabaseAdmin (api_endpoint: str, *, token: Optional[Union[str, TokenProvider]] = None, environment: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, spawner_database: Optional[Union[Database, AsyncDatabase]] = None)
-
An "admin" object for non-Astra Data API environments, to perform administrative tasks at the namespaces level such as creating/listing/dropping namespaces.
Conforming to the architecture of non-Astra deployments of the Data API, this object works within the one existing database. It is within that database that the namespace CRUD operations (and possibly other admin operations) are performed. Since non-Astra environment lack the concept of an overall admin (such as the all-databases AstraDBAdmin class), a
DataAPIDatabaseAdmin
is generally created by invoking theget_database_admin
method of the correspondingDatabase
object (which in turn is spawned by a DataAPIClient).Args
api_endpoint
- the full URI to access the Data API, e.g. "http://localhost:8181".
token
- an access token with enough permission to perform admin tasks.
This can be either a literal token string or a subclass of
TokenProvider
. environment
- a label, whose value is one of Environment.OTHER (default)
or other non-Astra environment values in the
Environment
enum. api_path
- path to append to the API Endpoint. In typical usage, this
class is created by a method such as
Database.get_database_admin()
, which passes the matching value. Defaults to this portion of the path being absent. api_version
- version specifier to append to the API path. In typical
usage, this class is created by a method such as
Database.get_database_admin()
, which passes the matching value. Defaults to this portion of the path being absent. caller_name
- name of the application, or framework, on behalf of which the admin API calls are performed. This ends up in the request user-agent.
caller_version
- version of the caller.
spawner_database
- either a Database or an AsyncDatabase instance. This represents the database class which spawns this admin object, so that, if required, a namespace creation can retroactively "use" the new namespace in the spawner. Used to enable the Async/Database.get_admin_database().create_namespace() pattern.
Example
>>> from astrapy import DataAPIClient >>> from astrapy.constants import Environment >>> from astrapy.authentication import UsernamePasswordTokenProvider >>> >>> token_provider = UsernamePasswordTokenProvider("username", "password") >>> endpoint = "http://localhost:8181" >>> >>> client = DataAPIClient( >>> token=token_provider, >>> environment=Environment.OTHER, >>> ) >>> database = client.get_database(endpoint) >>> admin_for_my_db = database.get_database_admin() >>> >>> admin_for_my_db.list_namespaces() ['namespace1', 'namespace2']
Expand source code
class DataAPIDatabaseAdmin(DatabaseAdmin): """ An "admin" object for non-Astra Data API environments, to perform administrative tasks at the namespaces level such as creating/listing/dropping namespaces. Conforming to the architecture of non-Astra deployments of the Data API, this object works within the one existing database. It is within that database that the namespace CRUD operations (and possibly other admin operations) are performed. Since non-Astra environment lack the concept of an overall admin (such as the all-databases AstraDBAdmin class), a `DataAPIDatabaseAdmin` is generally created by invoking the `get_database_admin` method of the corresponding `Database` object (which in turn is spawned by a DataAPIClient). Args: api_endpoint: the full URI to access the Data API, e.g. "http://localhost:8181". token: an access token with enough permission to perform admin tasks. This can be either a literal token string or a subclass of `astrapy.authentication.TokenProvider`. environment: a label, whose value is one of Environment.OTHER (default) or other non-Astra environment values in the `Environment` enum. api_path: path to append to the API Endpoint. In typical usage, this class is created by a method such as `Database.get_database_admin()`, which passes the matching value. Defaults to this portion of the path being absent. api_version: version specifier to append to the API path. In typical usage, this class is created by a method such as `Database.get_database_admin()`, which passes the matching value. Defaults to this portion of the path being absent. caller_name: name of the application, or framework, on behalf of which the admin API calls are performed. This ends up in the request user-agent. caller_version: version of the caller. spawner_database: either a Database or an AsyncDatabase instance. This represents the database class which spawns this admin object, so that, if required, a namespace creation can retroactively "use" the new namespace in the spawner. Used to enable the Async/Database.get_admin_database().create_namespace() pattern. Example: >>> from astrapy import DataAPIClient >>> from astrapy.constants import Environment >>> from astrapy.authentication import UsernamePasswordTokenProvider >>> >>> token_provider = UsernamePasswordTokenProvider("username", "password") >>> endpoint = "http://localhost:8181" >>> >>> client = DataAPIClient( >>> token=token_provider, >>> environment=Environment.OTHER, >>> ) >>> database = client.get_database(endpoint) >>> admin_for_my_db = database.get_database_admin() >>> >>> admin_for_my_db.list_namespaces() ['namespace1', 'namespace2'] """ def __init__( self, api_endpoint: str, *, token: Optional[Union[str, TokenProvider]] = None, environment: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, spawner_database: Optional[Union[Database, AsyncDatabase]] = None, ) -> None: # lazy import here to avoid circular dependency from astrapy.database import Database self.environment = (environment or Environment.OTHER).lower() self.token_provider = coerce_token_provider(token) self.api_endpoint = api_endpoint # self.caller_name = caller_name self.caller_version = caller_version # self.api_path = api_path if api_path is not None else "" self.api_version = api_version if api_version is not None else "" # self._commander_headers = { DEFAULT_AUTH_HEADER: self.token_provider.get_token(), } self._api_commander = APICommander( api_endpoint=self.api_endpoint, path="/".join(comp for comp in [self.api_path, self.api_version] if comp), headers=self._commander_headers, callers=[(self.caller_name, self.caller_version)], ) if spawner_database is not None: self.spawner_database = spawner_database else: # leaving the namespace to its per-environment default # (a task for the Database) self.spawner_database = Database( api_endpoint=self.api_endpoint, token=self.token_provider, namespace=None, caller_name=self.caller_name, caller_version=self.caller_version, environment=self.environment, api_path=self.api_path, api_version=self.api_version, ) def __repr__(self) -> str: env_desc = f', environment="{self.environment}"' return ( f'{self.__class__.__name__}(endpoint="{self.api_endpoint}", ' f'"{str(self.token_provider)[:12]}..."{env_desc})' ) def __eq__(self, other: Any) -> bool: if isinstance(other, DataAPIDatabaseAdmin): return all( [ self.environment == other.environment, self._api_commander == other._api_commander, ] ) else: return False def _copy( self, api_endpoint: Optional[str] = None, token: Optional[Union[str, TokenProvider]] = None, environment: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, ) -> DataAPIDatabaseAdmin: return DataAPIDatabaseAdmin( api_endpoint=api_endpoint or self.api_endpoint, token=coerce_token_provider(token) or self.token_provider, environment=environment or self.environment, api_path=api_path or self.api_path, api_version=api_version or self.api_version, caller_name=caller_name or self.caller_name, caller_version=caller_version or self.caller_version, ) def with_options( self, *, api_endpoint: Optional[str] = None, token: Optional[Union[str, TokenProvider]] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, ) -> DataAPIDatabaseAdmin: """ Create a clone of this DataAPIDatabaseAdmin with some changed attributes. Args: api_endpoint: the full URI to access the Data API, e.g. "http://localhost:8181". token: an access token with enough permission to perform admin tasks. This can be either a literal token string or a subclass of `astrapy.authentication.TokenProvider`. caller_name: name of the application, or framework, on behalf of which the admin API calls are performed. This ends up in the request user-agent. caller_version: version of the caller. Returns: a new DataAPIDatabaseAdmin instance. Example: >>> admin_for_my_other_db = admin_for_my_db.with_options( ... api_endpoint="http://10.1.1.5:8181", ... ) """ return self._copy( api_endpoint=api_endpoint, token=token, caller_name=caller_name, caller_version=caller_version, ) def set_caller( self, caller_name: Optional[str] = None, caller_version: Optional[str] = None, ) -> None: """ Set a new identity for the application/framework on behalf of which the DevOps API calls will be performed (the "caller"). New objects spawned from this client afterwards will inherit the new settings. Args: caller_name: name of the application, or framework, on behalf of which the DevOps API calls are performed. This ends up in the request user-agent. caller_version: version of the caller. Example: >>> admin_for_my_db.set_caller( ... caller_name="the_caller", ... caller_version="0.1.0", ... ) """ logger.info(f"setting caller to {caller_name}/{caller_version}") self.caller_name = caller_name self.caller_version = caller_version self._api_commander = APICommander( api_endpoint=self.api_endpoint, path="/".join(comp for comp in [self.api_path, self.api_version] if comp), headers=self._commander_headers, callers=[(self.caller_name, self.caller_version)], ) def list_namespaces(self, *, max_time_ms: Optional[int] = None) -> List[str]: """ Query the API for a list of the namespaces in the database. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: A list of the namespaces, each a string, in no particular order. Example: >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'staging_namespace'] """ logger.info("getting list of namespaces") fn_response = self._api_commander.request( payload={"findNamespaces": {}}, timeout_info=base_timeout_info(max_time_ms), ) if "namespaces" not in fn_response.get("status", {}): raise DataAPIFaultyResponseException( text="Faulty response from findNamespaces API command.", raw_response=fn_response, ) else: logger.info("finished getting list of namespaces") return fn_response["status"]["namespaces"] # type: ignore[no-any-return] def create_namespace( self, name: str, *, replication_options: Optional[Dict[str, Any]] = None, update_db_namespace: Optional[bool] = None, max_time_ms: Optional[int] = None, **kwargs: Any, ) -> Dict[str, Any]: """ Create a namespace in the database, returning {'ok': 1} if successful. Args: name: the namespace name. If supplying a namespace that exists already, the method call proceeds as usual, no errors are raised, and the whole invocation is a no-op. replication_options: this dictionary can specify the options about replication of the namespace (across database nodes). If provided, it must have a structure similar to: `{"class": "SimpleStrategy", "replication_factor": 1}`. update_db_namespace: if True, the `Database` or `AsyncDatabase` class that spawned this DatabaseAdmin, if any, gets updated to work on the newly-created namespace starting when this method returns. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> admin_for_my_db.list_namespaces() ['default_keyspace'] >>> admin_for_my_db.create_namespace("that_other_one") {'ok': 1} >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'that_other_one'] """ options = { k: v for k, v in { "replication": replication_options, }.items() if v } payload = { "createNamespace": { **{"name": name}, **({"options": options} if options else {}), } } logger.info("creating namespace") cn_response = self._api_commander.request( payload=payload, timeout_info=base_timeout_info(max_time_ms), ) if (cn_response.get("status") or {}).get("ok") != 1: raise DataAPIFaultyResponseException( text="Faulty response from createNamespace API command.", raw_response=cn_response, ) else: logger.info("finished creating namespace") if update_db_namespace: self.spawner_database.use_namespace(name) return cn_response["status"] # type: ignore[no-any-return] def drop_namespace( self, name: str, *, max_time_ms: Optional[int] = None, ) -> Dict[str, Any]: """ Drop (delete) a namespace from the database. Args: name: the namespace to delete. If it does not exist in this database, an error is raised. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'that_other_one'] >>> admin_for_my_db.drop_namespace("that_other_one") {'ok': 1} >>> admin_for_my_db.list_namespaces() ['default_keyspace'] """ logger.info("dropping namespace") dn_response = self._api_commander.request( payload={"dropNamespace": {"name": name}}, timeout_info=base_timeout_info(max_time_ms), ) if (dn_response.get("status") or {}).get("ok") != 1: raise DataAPIFaultyResponseException( text="Faulty response from dropNamespace API command.", raw_response=dn_response, ) else: logger.info("finished dropping namespace") return dn_response["status"] # type: ignore[no-any-return] async def async_list_namespaces( self, *, max_time_ms: Optional[int] = None ) -> List[str]: """ Query the API for a list of the namespaces in the database. Async version of the method, for use in an asyncio context. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: A list of the namespaces, each a string, in no particular order. Example: >>> asyncio.run(admin_for_my_db.async_list_namespaces()) ['default_keyspace', 'staging_namespace'] """ logger.info("getting list of namespaces, async") fn_response = await self._api_commander.async_request( payload={"findNamespaces": {}}, timeout_info=base_timeout_info(max_time_ms), ) if "namespaces" not in fn_response.get("status", {}): raise DataAPIFaultyResponseException( text="Faulty response from findNamespaces API command.", raw_response=fn_response, ) else: logger.info("finished getting list of namespaces, async") return fn_response["status"]["namespaces"] # type: ignore[no-any-return] async def async_create_namespace( self, name: str, *, replication_options: Optional[Dict[str, Any]] = None, update_db_namespace: Optional[bool] = None, max_time_ms: Optional[int] = None, **kwargs: Any, ) -> Dict[str, Any]: """ Create a namespace in the database, returning {'ok': 1} if successful. Async version of the method, for use in an asyncio context. Args: name: the namespace name. If supplying a namespace that exists already, the method call proceeds as usual, no errors are raised, and the whole invocation is a no-op. replication_options: this dictionary can specify the options about replication of the namespace (across database nodes). If provided, it must have a structure similar to: `{"class": "SimpleStrategy", "replication_factor": 1}`. update_db_namespace: if True, the `Database` or `AsyncDatabase` class that spawned this DatabaseAdmin, if any, gets updated to work on the newly-created namespace starting when this method returns. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> admin_for_my_db.list_namespaces() ['default_keyspace'] >>> asyncio.run(admin_for_my_db.async_create_namespace( ... "that_other_one" ... )) {'ok': 1} >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'that_other_one'] """ options = { k: v for k, v in { "replication": replication_options, }.items() if v } payload = { "createNamespace": { **{"name": name}, **({"options": options} if options else {}), } } logger.info("creating namespace, async") cn_response = await self._api_commander.async_request( payload=payload, timeout_info=base_timeout_info(max_time_ms), ) if (cn_response.get("status") or {}).get("ok") != 1: raise DataAPIFaultyResponseException( text="Faulty response from createNamespace API command.", raw_response=cn_response, ) else: logger.info("finished creating namespace, async") if update_db_namespace: self.spawner_database.use_namespace(name) return cn_response["status"] # type: ignore[no-any-return] async def async_drop_namespace( self, name: str, *, max_time_ms: Optional[int] = None, ) -> Dict[str, Any]: """ Drop (delete) a namespace from the database. Async version of the method, for use in an asyncio context. Args: name: the namespace to delete. If it does not exist in this database, an error is raised. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> admin_for_my_db.list_namespaces() ['that_other_one', 'default_keyspace'] >>> asyncio.run(admin_for_my_db.async_drop_namespace( ... "that_other_one" ... )) {'ok': 1} >>> admin_for_my_db.list_namespaces() ['default_keyspace'] """ logger.info("dropping namespace, async") dn_response = await self._api_commander.async_request( payload={"dropNamespace": {"name": name}}, timeout_info=base_timeout_info(max_time_ms), ) if (dn_response.get("status") or {}).get("ok") != 1: raise DataAPIFaultyResponseException( text="Faulty response from dropNamespace API command.", raw_response=dn_response, ) else: logger.info("finished dropping namespace, async") return dn_response["status"] # type: ignore[no-any-return] def get_database( self, *, token: Optional[Union[str, TokenProvider]] = None, namespace: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, ) -> Database: """ Create a Database instance out of this class for working with the data in it. Args: token: if supplied, is passed to the Database instead of the one set for this object. Useful if one wants to work in a least-privilege manner, limiting the permissions for non-admin work. This can be either a literal token string or a subclass of `astrapy.authentication.TokenProvider`. namespace: an optional namespace to set in the resulting Database. If not provided, no namespace is set, limiting what the Database can do until setting it with e.g. a `useNamespace` method call. api_path: path to append to the API Endpoint. In typical usage, this should be left to its default of "". api_version: version specifier to append to the API path. In typical usage, this should be left to its default of "v1". Returns: A Database object, ready to be used for working with data and collections. Example: >>> my_db = admin_for_my_db.get_database() >>> my_db.list_collection_names() ['movies', 'another_collection'] Note: creating an instance of Database does not trigger actual creation of the database itself, which should exist beforehand. """ # lazy importing here to avoid circular dependency from astrapy import Database return Database( api_endpoint=self.api_endpoint, token=coerce_token_provider(token) or self.token_provider, namespace=namespace, caller_name=self.caller_name, caller_version=self.caller_version, environment=self.environment, api_path=api_path, api_version=api_version, ) def get_async_database( self, *, token: Optional[Union[str, TokenProvider]] = None, namespace: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, ) -> AsyncDatabase: """ Create an AsyncDatabase instance for the database, to be used when doing data-level work (such as creating/managing collections). This method has identical behavior and signature as the sync counterpart `get_database`: please see that one for more details. """ return self.get_database( token=token, namespace=namespace, api_path=api_path, api_version=api_version, ).to_async() def find_embedding_providers( self, *, max_time_ms: Optional[int] = None ) -> FindEmbeddingProvidersResult: """ Query the API for the full information on available embedding providers. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: A `FindEmbeddingProvidersResult` object with the complete information returned by the API about available embedding providers Example (output abridged and indented for clarity): >>> admin_for_my_db.find_embedding_providers() FindEmbeddingProvidersResult(embedding_providers=..., openai, ...) >>> admin_for_my_db.find_embedding_providers().embedding_providers { 'openai': EmbeddingProvider( display_name='OpenAI', models=[ EmbeddingProviderModel(name='text-embedding-3-small'), ... ] ), ... } """ logger.info("getting list of embedding providers") fe_response = self._api_commander.request( payload={"findEmbeddingProviders": {}}, timeout_info=base_timeout_info(max_time_ms), ) if "embeddingProviders" not in fe_response.get("status", {}): raise DataAPIFaultyResponseException( text="Faulty response from findEmbeddingProviders API command.", raw_response=fe_response, ) else: logger.info("finished getting list of embedding providers") return FindEmbeddingProvidersResult.from_dict(fe_response["status"]) async def async_find_embedding_providers( self, *, max_time_ms: Optional[int] = None ) -> FindEmbeddingProvidersResult: """ Query the API for the full information on available embedding providers. Async version of the method, for use in an asyncio context. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: A `FindEmbeddingProvidersResult` object with the complete information returned by the API about available embedding providers Example (output abridged and indented for clarity): >>> admin_for_my_db.find_embedding_providers() FindEmbeddingProvidersResult(embedding_providers=..., openai, ...) >>> admin_for_my_db.find_embedding_providers().embedding_providers { 'openai': EmbeddingProvider( display_name='OpenAI', models=[ EmbeddingProviderModel(name='text-embedding-3-small'), ... ] ), ... } """ logger.info("getting list of embedding providers, async") fe_response = await self._api_commander.async_request( payload={"findEmbeddingProviders": {}}, timeout_info=base_timeout_info(max_time_ms), ) if "embeddingProviders" not in fe_response.get("status", {}): raise DataAPIFaultyResponseException( text="Faulty response from findEmbeddingProviders API command.", raw_response=fe_response, ) else: logger.info("finished getting list of embedding providers, async") return FindEmbeddingProvidersResult.from_dict(fe_response["status"])
Ancestors
- DatabaseAdmin
- abc.ABC
Methods
async def async_create_namespace(self, name: str, *, replication_options: Optional[Dict[str, Any]] = None, update_db_namespace: Optional[bool] = None, max_time_ms: Optional[int] = None, **kwargs: Any) ‑> Dict[str, Any]
-
Create a namespace in the database, returning {'ok': 1} if successful. Async version of the method, for use in an asyncio context.
Args
name
- the namespace name. If supplying a namespace that exists already, the method call proceeds as usual, no errors are raised, and the whole invocation is a no-op.
replication_options
- this dictionary can specify the options about
replication of the namespace (across database nodes). If provided,
it must have a structure similar to:
{"class": "SimpleStrategy", "replication_factor": 1}
. update_db_namespace
- if True, the
Database
orAsyncDatabase
class that spawned this DatabaseAdmin, if any, gets updated to work on the newly-created namespace starting when this method returns. max_time_ms
- a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request has not reached the API server.
Returns
A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised.
Example
>>> admin_for_my_db.list_namespaces() ['default_keyspace'] >>> asyncio.run(admin_for_my_db.async_create_namespace( ... "that_other_one" ... )) {'ok': 1} >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'that_other_one']
Expand source code
async def async_create_namespace( self, name: str, *, replication_options: Optional[Dict[str, Any]] = None, update_db_namespace: Optional[bool] = None, max_time_ms: Optional[int] = None, **kwargs: Any, ) -> Dict[str, Any]: """ Create a namespace in the database, returning {'ok': 1} if successful. Async version of the method, for use in an asyncio context. Args: name: the namespace name. If supplying a namespace that exists already, the method call proceeds as usual, no errors are raised, and the whole invocation is a no-op. replication_options: this dictionary can specify the options about replication of the namespace (across database nodes). If provided, it must have a structure similar to: `{"class": "SimpleStrategy", "replication_factor": 1}`. update_db_namespace: if True, the `Database` or `AsyncDatabase` class that spawned this DatabaseAdmin, if any, gets updated to work on the newly-created namespace starting when this method returns. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> admin_for_my_db.list_namespaces() ['default_keyspace'] >>> asyncio.run(admin_for_my_db.async_create_namespace( ... "that_other_one" ... )) {'ok': 1} >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'that_other_one'] """ options = { k: v for k, v in { "replication": replication_options, }.items() if v } payload = { "createNamespace": { **{"name": name}, **({"options": options} if options else {}), } } logger.info("creating namespace, async") cn_response = await self._api_commander.async_request( payload=payload, timeout_info=base_timeout_info(max_time_ms), ) if (cn_response.get("status") or {}).get("ok") != 1: raise DataAPIFaultyResponseException( text="Faulty response from createNamespace API command.", raw_response=cn_response, ) else: logger.info("finished creating namespace, async") if update_db_namespace: self.spawner_database.use_namespace(name) return cn_response["status"] # type: ignore[no-any-return]
async def async_drop_namespace(self, name: str, *, max_time_ms: Optional[int] = None) ‑> Dict[str, Any]
-
Drop (delete) a namespace from the database. Async version of the method, for use in an asyncio context.
Args
name
- the namespace to delete. If it does not exist in this database, an error is raised.
max_time_ms
- a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server.
Returns
A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised.
Example
>>> admin_for_my_db.list_namespaces() ['that_other_one', 'default_keyspace'] >>> asyncio.run(admin_for_my_db.async_drop_namespace( ... "that_other_one" ... )) {'ok': 1} >>> admin_for_my_db.list_namespaces() ['default_keyspace']
Expand source code
async def async_drop_namespace( self, name: str, *, max_time_ms: Optional[int] = None, ) -> Dict[str, Any]: """ Drop (delete) a namespace from the database. Async version of the method, for use in an asyncio context. Args: name: the namespace to delete. If it does not exist in this database, an error is raised. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> admin_for_my_db.list_namespaces() ['that_other_one', 'default_keyspace'] >>> asyncio.run(admin_for_my_db.async_drop_namespace( ... "that_other_one" ... )) {'ok': 1} >>> admin_for_my_db.list_namespaces() ['default_keyspace'] """ logger.info("dropping namespace, async") dn_response = await self._api_commander.async_request( payload={"dropNamespace": {"name": name}}, timeout_info=base_timeout_info(max_time_ms), ) if (dn_response.get("status") or {}).get("ok") != 1: raise DataAPIFaultyResponseException( text="Faulty response from dropNamespace API command.", raw_response=dn_response, ) else: logger.info("finished dropping namespace, async") return dn_response["status"] # type: ignore[no-any-return]
async def async_find_embedding_providers(self, *, max_time_ms: Optional[int] = None) ‑> FindEmbeddingProvidersResult
-
Query the API for the full information on available embedding providers. Async version of the method, for use in an asyncio context.
Args
max_time_ms
- a timeout, in milliseconds, for the DevOps API request.
Returns
A
FindEmbeddingProvidersResult
object with the complete information returned by the API about available embedding providers Example (output abridged and indented for clarity): >>> admin_for_my_db.find_embedding_providers() FindEmbeddingProvidersResult(embedding_providers=…, openai, …) >>> admin_for_my_db.find_embedding_providers().embedding_providers { 'openai': EmbeddingProvider( display_name='OpenAI', models=[ EmbeddingProviderModel(name='text-embedding-3-small'), … ] ), … }Expand source code
async def async_find_embedding_providers( self, *, max_time_ms: Optional[int] = None ) -> FindEmbeddingProvidersResult: """ Query the API for the full information on available embedding providers. Async version of the method, for use in an asyncio context. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: A `FindEmbeddingProvidersResult` object with the complete information returned by the API about available embedding providers Example (output abridged and indented for clarity): >>> admin_for_my_db.find_embedding_providers() FindEmbeddingProvidersResult(embedding_providers=..., openai, ...) >>> admin_for_my_db.find_embedding_providers().embedding_providers { 'openai': EmbeddingProvider( display_name='OpenAI', models=[ EmbeddingProviderModel(name='text-embedding-3-small'), ... ] ), ... } """ logger.info("getting list of embedding providers, async") fe_response = await self._api_commander.async_request( payload={"findEmbeddingProviders": {}}, timeout_info=base_timeout_info(max_time_ms), ) if "embeddingProviders" not in fe_response.get("status", {}): raise DataAPIFaultyResponseException( text="Faulty response from findEmbeddingProviders API command.", raw_response=fe_response, ) else: logger.info("finished getting list of embedding providers, async") return FindEmbeddingProvidersResult.from_dict(fe_response["status"])
async def async_list_namespaces(self, *, max_time_ms: Optional[int] = None) ‑> List[str]
-
Query the API for a list of the namespaces in the database. Async version of the method, for use in an asyncio context.
Args
max_time_ms
- a timeout, in milliseconds, for the DevOps API request.
Returns
A list of the namespaces, each a string, in no particular order.
Example
>>> asyncio.run(admin_for_my_db.async_list_namespaces()) ['default_keyspace', 'staging_namespace']
Expand source code
async def async_list_namespaces( self, *, max_time_ms: Optional[int] = None ) -> List[str]: """ Query the API for a list of the namespaces in the database. Async version of the method, for use in an asyncio context. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: A list of the namespaces, each a string, in no particular order. Example: >>> asyncio.run(admin_for_my_db.async_list_namespaces()) ['default_keyspace', 'staging_namespace'] """ logger.info("getting list of namespaces, async") fn_response = await self._api_commander.async_request( payload={"findNamespaces": {}}, timeout_info=base_timeout_info(max_time_ms), ) if "namespaces" not in fn_response.get("status", {}): raise DataAPIFaultyResponseException( text="Faulty response from findNamespaces API command.", raw_response=fn_response, ) else: logger.info("finished getting list of namespaces, async") return fn_response["status"]["namespaces"] # type: ignore[no-any-return]
def create_namespace(self, name: str, *, replication_options: Optional[Dict[str, Any]] = None, update_db_namespace: Optional[bool] = None, max_time_ms: Optional[int] = None, **kwargs: Any) ‑> Dict[str, Any]
-
Create a namespace in the database, returning {'ok': 1} if successful.
Args
name
- the namespace name. If supplying a namespace that exists already, the method call proceeds as usual, no errors are raised, and the whole invocation is a no-op.
replication_options
- this dictionary can specify the options about
replication of the namespace (across database nodes). If provided,
it must have a structure similar to:
{"class": "SimpleStrategy", "replication_factor": 1}
. update_db_namespace
- if True, the
Database
orAsyncDatabase
class that spawned this DatabaseAdmin, if any, gets updated to work on the newly-created namespace starting when this method returns. max_time_ms
- a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request has not reached the API server.
Returns
A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised.
Example
>>> admin_for_my_db.list_namespaces() ['default_keyspace'] >>> admin_for_my_db.create_namespace("that_other_one") {'ok': 1} >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'that_other_one']
Expand source code
def create_namespace( self, name: str, *, replication_options: Optional[Dict[str, Any]] = None, update_db_namespace: Optional[bool] = None, max_time_ms: Optional[int] = None, **kwargs: Any, ) -> Dict[str, Any]: """ Create a namespace in the database, returning {'ok': 1} if successful. Args: name: the namespace name. If supplying a namespace that exists already, the method call proceeds as usual, no errors are raised, and the whole invocation is a no-op. replication_options: this dictionary can specify the options about replication of the namespace (across database nodes). If provided, it must have a structure similar to: `{"class": "SimpleStrategy", "replication_factor": 1}`. update_db_namespace: if True, the `Database` or `AsyncDatabase` class that spawned this DatabaseAdmin, if any, gets updated to work on the newly-created namespace starting when this method returns. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the creation request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> admin_for_my_db.list_namespaces() ['default_keyspace'] >>> admin_for_my_db.create_namespace("that_other_one") {'ok': 1} >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'that_other_one'] """ options = { k: v for k, v in { "replication": replication_options, }.items() if v } payload = { "createNamespace": { **{"name": name}, **({"options": options} if options else {}), } } logger.info("creating namespace") cn_response = self._api_commander.request( payload=payload, timeout_info=base_timeout_info(max_time_ms), ) if (cn_response.get("status") or {}).get("ok") != 1: raise DataAPIFaultyResponseException( text="Faulty response from createNamespace API command.", raw_response=cn_response, ) else: logger.info("finished creating namespace") if update_db_namespace: self.spawner_database.use_namespace(name) return cn_response["status"] # type: ignore[no-any-return]
def drop_namespace(self, name: str, *, max_time_ms: Optional[int] = None) ‑> Dict[str, Any]
-
Drop (delete) a namespace from the database.
Args
name
- the namespace to delete. If it does not exist in this database, an error is raised.
max_time_ms
- a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server.
Returns
A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised.
Example
>>> admin_for_my_db.list_namespaces() ['default_keyspace', 'that_other_one'] >>> admin_for_my_db.drop_namespace("that_other_one") {'ok': 1} >>> admin_for_my_db.list_namespaces() ['default_keyspace']
Expand source code
def drop_namespace( self, name: str, *, max_time_ms: Optional[int] = None, ) -> Dict[str, Any]: """ Drop (delete) a namespace from the database. Args: name: the namespace to delete. If it does not exist in this database, an error is raised. max_time_ms: a timeout, in milliseconds, for the whole requested operation to complete. Note that a timeout is no guarantee that the deletion request has not reached the API server. Returns: A dictionary of the form {"ok": 1} in case of success. Otherwise, an exception is raised. Example: >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'that_other_one'] >>> admin_for_my_db.drop_namespace("that_other_one") {'ok': 1} >>> admin_for_my_db.list_namespaces() ['default_keyspace'] """ logger.info("dropping namespace") dn_response = self._api_commander.request( payload={"dropNamespace": {"name": name}}, timeout_info=base_timeout_info(max_time_ms), ) if (dn_response.get("status") or {}).get("ok") != 1: raise DataAPIFaultyResponseException( text="Faulty response from dropNamespace API command.", raw_response=dn_response, ) else: logger.info("finished dropping namespace") return dn_response["status"] # type: ignore[no-any-return]
def find_embedding_providers(self, *, max_time_ms: Optional[int] = None) ‑> FindEmbeddingProvidersResult
-
Query the API for the full information on available embedding providers.
Args
max_time_ms
- a timeout, in milliseconds, for the DevOps API request.
Returns
A
FindEmbeddingProvidersResult
object with the complete information returned by the API about available embedding providers Example (output abridged and indented for clarity): >>> admin_for_my_db.find_embedding_providers() FindEmbeddingProvidersResult(embedding_providers=…, openai, …) >>> admin_for_my_db.find_embedding_providers().embedding_providers { 'openai': EmbeddingProvider( display_name='OpenAI', models=[ EmbeddingProviderModel(name='text-embedding-3-small'), … ] ), … }Expand source code
def find_embedding_providers( self, *, max_time_ms: Optional[int] = None ) -> FindEmbeddingProvidersResult: """ Query the API for the full information on available embedding providers. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: A `FindEmbeddingProvidersResult` object with the complete information returned by the API about available embedding providers Example (output abridged and indented for clarity): >>> admin_for_my_db.find_embedding_providers() FindEmbeddingProvidersResult(embedding_providers=..., openai, ...) >>> admin_for_my_db.find_embedding_providers().embedding_providers { 'openai': EmbeddingProvider( display_name='OpenAI', models=[ EmbeddingProviderModel(name='text-embedding-3-small'), ... ] ), ... } """ logger.info("getting list of embedding providers") fe_response = self._api_commander.request( payload={"findEmbeddingProviders": {}}, timeout_info=base_timeout_info(max_time_ms), ) if "embeddingProviders" not in fe_response.get("status", {}): raise DataAPIFaultyResponseException( text="Faulty response from findEmbeddingProviders API command.", raw_response=fe_response, ) else: logger.info("finished getting list of embedding providers") return FindEmbeddingProvidersResult.from_dict(fe_response["status"])
def get_async_database(self, *, token: Optional[Union[str, TokenProvider]] = None, namespace: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None) ‑> AsyncDatabase
-
Create an AsyncDatabase instance for the database, to be used when doing data-level work (such as creating/managing collections).
This method has identical behavior and signature as the sync counterpart
get_database
: please see that one for more details.Expand source code
def get_async_database( self, *, token: Optional[Union[str, TokenProvider]] = None, namespace: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, ) -> AsyncDatabase: """ Create an AsyncDatabase instance for the database, to be used when doing data-level work (such as creating/managing collections). This method has identical behavior and signature as the sync counterpart `get_database`: please see that one for more details. """ return self.get_database( token=token, namespace=namespace, api_path=api_path, api_version=api_version, ).to_async()
def get_database(self, *, token: Optional[Union[str, TokenProvider]] = None, namespace: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None) ‑> Database
-
Create a Database instance out of this class for working with the data in it.
Args
token
- if supplied, is passed to the Database instead of
the one set for this object. Useful if one wants to work in
a least-privilege manner, limiting the permissions for non-admin work.
This can be either a literal token string or a subclass of
TokenProvider
. namespace
- an optional namespace to set in the resulting Database.
If not provided, no namespace is set, limiting what the Database
can do until setting it with e.g. a
useNamespace
method call. api_path
- path to append to the API Endpoint. In typical usage, this should be left to its default of "".
api_version
- version specifier to append to the API path. In typical usage, this should be left to its default of "v1".
Returns
A Database object, ready to be used for working with data and collections.
Example
>>> my_db = admin_for_my_db.get_database() >>> my_db.list_collection_names() ['movies', 'another_collection']
Note
creating an instance of Database does not trigger actual creation of the database itself, which should exist beforehand.
Expand source code
def get_database( self, *, token: Optional[Union[str, TokenProvider]] = None, namespace: Optional[str] = None, api_path: Optional[str] = None, api_version: Optional[str] = None, ) -> Database: """ Create a Database instance out of this class for working with the data in it. Args: token: if supplied, is passed to the Database instead of the one set for this object. Useful if one wants to work in a least-privilege manner, limiting the permissions for non-admin work. This can be either a literal token string or a subclass of `astrapy.authentication.TokenProvider`. namespace: an optional namespace to set in the resulting Database. If not provided, no namespace is set, limiting what the Database can do until setting it with e.g. a `useNamespace` method call. api_path: path to append to the API Endpoint. In typical usage, this should be left to its default of "". api_version: version specifier to append to the API path. In typical usage, this should be left to its default of "v1". Returns: A Database object, ready to be used for working with data and collections. Example: >>> my_db = admin_for_my_db.get_database() >>> my_db.list_collection_names() ['movies', 'another_collection'] Note: creating an instance of Database does not trigger actual creation of the database itself, which should exist beforehand. """ # lazy importing here to avoid circular dependency from astrapy import Database return Database( api_endpoint=self.api_endpoint, token=coerce_token_provider(token) or self.token_provider, namespace=namespace, caller_name=self.caller_name, caller_version=self.caller_version, environment=self.environment, api_path=api_path, api_version=api_version, )
def list_namespaces(self, *, max_time_ms: Optional[int] = None) ‑> List[str]
-
Query the API for a list of the namespaces in the database.
Args
max_time_ms
- a timeout, in milliseconds, for the DevOps API request.
Returns
A list of the namespaces, each a string, in no particular order.
Example
>>> admin_for_my_db.list_namespaces() ['default_keyspace', 'staging_namespace']
Expand source code
def list_namespaces(self, *, max_time_ms: Optional[int] = None) -> List[str]: """ Query the API for a list of the namespaces in the database. Args: max_time_ms: a timeout, in milliseconds, for the DevOps API request. Returns: A list of the namespaces, each a string, in no particular order. Example: >>> admin_for_my_db.list_namespaces() ['default_keyspace', 'staging_namespace'] """ logger.info("getting list of namespaces") fn_response = self._api_commander.request( payload={"findNamespaces": {}}, timeout_info=base_timeout_info(max_time_ms), ) if "namespaces" not in fn_response.get("status", {}): raise DataAPIFaultyResponseException( text="Faulty response from findNamespaces API command.", raw_response=fn_response, ) else: logger.info("finished getting list of namespaces") return fn_response["status"]["namespaces"] # type: ignore[no-any-return]
def set_caller(self, caller_name: Optional[str] = None, caller_version: Optional[str] = None) ‑> None
-
Set a new identity for the application/framework on behalf of which the DevOps API calls will be performed (the "caller").
New objects spawned from this client afterwards will inherit the new settings.
Args
caller_name
- name of the application, or framework, on behalf of which the DevOps API calls are performed. This ends up in the request user-agent.
caller_version
- version of the caller.
Example
>>> admin_for_my_db.set_caller( ... caller_name="the_caller", ... caller_version="0.1.0", ... )
Expand source code
def set_caller( self, caller_name: Optional[str] = None, caller_version: Optional[str] = None, ) -> None: """ Set a new identity for the application/framework on behalf of which the DevOps API calls will be performed (the "caller"). New objects spawned from this client afterwards will inherit the new settings. Args: caller_name: name of the application, or framework, on behalf of which the DevOps API calls are performed. This ends up in the request user-agent. caller_version: version of the caller. Example: >>> admin_for_my_db.set_caller( ... caller_name="the_caller", ... caller_version="0.1.0", ... ) """ logger.info(f"setting caller to {caller_name}/{caller_version}") self.caller_name = caller_name self.caller_version = caller_version self._api_commander = APICommander( api_endpoint=self.api_endpoint, path="/".join(comp for comp in [self.api_path, self.api_version] if comp), headers=self._commander_headers, callers=[(self.caller_name, self.caller_version)], )
def with_options(self, *, api_endpoint: Optional[str] = None, token: Optional[Union[str, TokenProvider]] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None) ‑> DataAPIDatabaseAdmin
-
Create a clone of this DataAPIDatabaseAdmin with some changed attributes.
Args
api_endpoint
- the full URI to access the Data API, e.g. "http://localhost:8181".
token
- an access token with enough permission to perform admin tasks.
This can be either a literal token string or a subclass of
TokenProvider
. caller_name
- name of the application, or framework, on behalf of which the admin API calls are performed. This ends up in the request user-agent.
caller_version
- version of the caller.
Returns
a new DataAPIDatabaseAdmin instance.
Example
>>> admin_for_my_other_db = admin_for_my_db.with_options( ... api_endpoint="http://10.1.1.5:8181", ... )
Expand source code
def with_options( self, *, api_endpoint: Optional[str] = None, token: Optional[Union[str, TokenProvider]] = None, caller_name: Optional[str] = None, caller_version: Optional[str] = None, ) -> DataAPIDatabaseAdmin: """ Create a clone of this DataAPIDatabaseAdmin with some changed attributes. Args: api_endpoint: the full URI to access the Data API, e.g. "http://localhost:8181". token: an access token with enough permission to perform admin tasks. This can be either a literal token string or a subclass of `astrapy.authentication.TokenProvider`. caller_name: name of the application, or framework, on behalf of which the admin API calls are performed. This ends up in the request user-agent. caller_version: version of the caller. Returns: a new DataAPIDatabaseAdmin instance. Example: >>> admin_for_my_other_db = admin_for_my_db.with_options( ... api_endpoint="http://10.1.1.5:8181", ... ) """ return self._copy( api_endpoint=api_endpoint, token=token, caller_name=caller_name, caller_version=caller_version, )
class DatabaseAdmin
-
An abstract class defining the interface for a database admin object. This supports generic namespace crud, as well as spawning databases, without committing to a specific database architecture (e.g. Astra DB).
Expand source code
class DatabaseAdmin(ABC): """ An abstract class defining the interface for a database admin object. This supports generic namespace crud, as well as spawning databases, without committing to a specific database architecture (e.g. Astra DB). """ environment: str spawner_database: Union[Database, AsyncDatabase] @abstractmethod def list_namespaces(self, *pargs: Any, **kwargs: Any) -> List[str]: """Get a list of namespaces for the database.""" ... @abstractmethod def create_namespace( self, name: str, *, update_db_namespace: Optional[bool] = None, **kwargs: Any, ) -> Dict[str, Any]: """ Create a namespace in the database, returning {'ok': 1} if successful. """ ... @abstractmethod def drop_namespace(self, name: str, *pargs: Any, **kwargs: Any) -> Dict[str, Any]: """ Drop (delete) a namespace from the database, returning {'ok': 1} if successful. """ ... @abstractmethod async def async_list_namespaces(self, *pargs: Any, **kwargs: Any) -> List[str]: """ Get a list of namespaces for the database. (Async version of the method.) """ ... @abstractmethod async def async_create_namespace( self, name: str, *, update_db_namespace: Optional[bool] = None, **kwargs: Any ) -> Dict[str, Any]: """ Create a namespace in the database, returning {'ok': 1} if successful. (Async version of the method.) """ ... @abstractmethod async def async_drop_namespace( self, name: str, *pargs: Any, **kwargs: Any ) -> Dict[str, Any]: """ Drop (delete) a namespace from the database, returning {'ok': 1} if successful. (Async version of the method.) """ ... @abstractmethod def get_database(self, *pargs: Any, **kwargs: Any) -> Database: """Get a Database object from this database admin.""" ... @abstractmethod def get_async_database(self, *pargs: Any, **kwargs: Any) -> AsyncDatabase: """Get an AsyncDatabase object from this database admin.""" ... @abstractmethod def find_embedding_providers( self, *pargs: Any, **kwargs: Any ) -> FindEmbeddingProvidersResult: """Query the Data API for the available embedding providers.""" ... @abstractmethod async def async_find_embedding_providers( self, *pargs: Any, **kwargs: Any ) -> FindEmbeddingProvidersResult: """ Query the Data API for the available embedding providers. (Async version of the method.) """ ...
Ancestors
- abc.ABC
Subclasses
Class variables
var environment : str
var spawner_database : Union[Database, AsyncDatabase]
Methods
async def async_create_namespace(self, name: str, *, update_db_namespace: Optional[bool] = None, **kwargs: Any) ‑> Dict[str, Any]
-
Create a namespace in the database, returning {'ok': 1} if successful. (Async version of the method.)
Expand source code
@abstractmethod async def async_create_namespace( self, name: str, *, update_db_namespace: Optional[bool] = None, **kwargs: Any ) -> Dict[str, Any]: """ Create a namespace in the database, returning {'ok': 1} if successful. (Async version of the method.) """ ...
async def async_drop_namespace(self, name: str, *pargs: Any, **kwargs: Any) ‑> Dict[str, Any]
-
Drop (delete) a namespace from the database, returning {'ok': 1} if successful. (Async version of the method.)
Expand source code
@abstractmethod async def async_drop_namespace( self, name: str, *pargs: Any, **kwargs: Any ) -> Dict[str, Any]: """ Drop (delete) a namespace from the database, returning {'ok': 1} if successful. (Async version of the method.) """ ...
async def async_find_embedding_providers(self, *pargs: Any, **kwargs: Any) ‑> FindEmbeddingProvidersResult
-
Query the Data API for the available embedding providers. (Async version of the method.)
Expand source code
@abstractmethod async def async_find_embedding_providers( self, *pargs: Any, **kwargs: Any ) -> FindEmbeddingProvidersResult: """ Query the Data API for the available embedding providers. (Async version of the method.) """ ...
async def async_list_namespaces(self, *pargs: Any, **kwargs: Any) ‑> List[str]
-
Get a list of namespaces for the database. (Async version of the method.)
Expand source code
@abstractmethod async def async_list_namespaces(self, *pargs: Any, **kwargs: Any) -> List[str]: """ Get a list of namespaces for the database. (Async version of the method.) """ ...
def create_namespace(self, name: str, *, update_db_namespace: Optional[bool] = None, **kwargs: Any) ‑> Dict[str, Any]
-
Create a namespace in the database, returning {'ok': 1} if successful.
Expand source code
@abstractmethod def create_namespace( self, name: str, *, update_db_namespace: Optional[bool] = None, **kwargs: Any, ) -> Dict[str, Any]: """ Create a namespace in the database, returning {'ok': 1} if successful. """ ...
def drop_namespace(self, name: str, *pargs: Any, **kwargs: Any) ‑> Dict[str, Any]
-
Drop (delete) a namespace from the database, returning {'ok': 1} if successful.
Expand source code
@abstractmethod def drop_namespace(self, name: str, *pargs: Any, **kwargs: Any) -> Dict[str, Any]: """ Drop (delete) a namespace from the database, returning {'ok': 1} if successful. """ ...
def find_embedding_providers(self, *pargs: Any, **kwargs: Any) ‑> FindEmbeddingProvidersResult
-
Query the Data API for the available embedding providers.
Expand source code
@abstractmethod def find_embedding_providers( self, *pargs: Any, **kwargs: Any ) -> FindEmbeddingProvidersResult: """Query the Data API for the available embedding providers.""" ...
def get_async_database(self, *pargs: Any, **kwargs: Any) ‑> AsyncDatabase
-
Get an AsyncDatabase object from this database admin.
Expand source code
@abstractmethod def get_async_database(self, *pargs: Any, **kwargs: Any) -> AsyncDatabase: """Get an AsyncDatabase object from this database admin.""" ...
def get_database(self, *pargs: Any, **kwargs: Any) ‑> Database
-
Get a Database object from this database admin.
Expand source code
@abstractmethod def get_database(self, *pargs: Any, **kwargs: Any) -> Database: """Get a Database object from this database admin.""" ...
def list_namespaces(self, *pargs: Any, **kwargs: Any) ‑> List[str]
-
Get a list of namespaces for the database.
Expand source code
@abstractmethod def list_namespaces(self, *pargs: Any, **kwargs: Any) -> List[str]: """Get a list of namespaces for the database.""" ...
class ParsedAPIEndpoint (database_id: str, region: str, environment: str)
-
The results of successfully parsing an Astra DB API endpoint, for internal by database metadata-related functions.
Attributes
database_id
- e. g. "01234567-89ab-cdef-0123-456789abcdef".
region
- a region ID, such as "us-west1".
environment
- a label, whose value is one of Environment.PROD, Environment.DEV or Environment.TEST.
Expand source code
@dataclass class ParsedAPIEndpoint: """ The results of successfully parsing an Astra DB API endpoint, for internal by database metadata-related functions. Attributes: database_id: e. g. "01234567-89ab-cdef-0123-456789abcdef". region: a region ID, such as "us-west1". environment: a label, whose value is one of Environment.PROD, Environment.DEV or Environment.TEST. """ database_id: str region: str environment: str
Class variables
var database_id : str
var environment : str
var region : str