Module astrapy.data.cursors.cursor
Expand source code
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
import logging
from abc import ABC
from decimal import Decimal
from enum import Enum
from typing import Any, Generic, TypeVar
from astrapy.data_types import DataAPIVector
from astrapy.exceptions import (
CursorException,
)
from astrapy.utils.api_options import FullSerdesOptions
# A cursor reads TRAW from DB and maps them to T if any mapping.
# A new cursor returned by .map will map to TNEW
TRAW = TypeVar("TRAW")
T = TypeVar("T")
TNEW = TypeVar("TNEW")
logger = logging.getLogger(__name__)
def _revise_timeouts_for_cursor_copy(
*,
new_general_method_timeout_ms: int | None,
new_timeout_ms: int | None,
old_request_timeout_ms: int | None,
) -> tuple[int | None, int | None]:
"""
This utility applies the desired logic to get the new timeout specification
for a cursor copy done for the purpose of to_list or for_each.
Namely, the original cursor would have an old request timeout (its overall
timeout assumed empty); and the to_list method call may have a general timeout
specified (and/or its alias, timeout_ms). This function returns the
(request_timeout_ms, overall_timeout_ms)
for use in the cursor copy.
(1) the two new_* parameters put in their priority for the new 'overall"
(2) the new per-request is either the old per-request or, if (1) is shorter,
that takes precedence. This is done in a None-aware safe manner.
"""
_general_method_timeout_ms = (
new_timeout_ms if new_timeout_ms is not None else new_general_method_timeout_ms
)
# the per-request timeout, depending on what is specified, may have
# to undergo a min(...) logic if overall timeout is surprisingly narrow:
_new_request_timeout_ms: int | None
if _general_method_timeout_ms is not None:
if old_request_timeout_ms is not None:
_new_request_timeout_ms = min(
_general_method_timeout_ms,
old_request_timeout_ms,
)
else:
_new_request_timeout_ms = _general_method_timeout_ms
else:
if old_request_timeout_ms is not None:
_new_request_timeout_ms = old_request_timeout_ms
else:
_new_request_timeout_ms = None
return (_new_request_timeout_ms, _general_method_timeout_ms)
def _ensure_vector(
fvector: list[float | Decimal] | None,
options: FullSerdesOptions,
) -> list[float] | DataAPIVector | None:
"""
For Tables and - depending on the JSON response parsing - collections alike,
the sort vector included in the response from a find-like could arrive as a list
of Decimal instances. This utility makes it back to either a plain list of floats
or a DataAPIVector, according the the preferences for the table/collection being
queried.
"""
if fvector is None:
return None
else:
# this can be a list of Decimal instances (because it's from tables,
# or from collections set to use decimals).
f_list = [float(x) for x in fvector]
if options.custom_datatypes_in_reading:
return DataAPIVector(f_list)
else:
return f_list
class CursorState(Enum):
"""
This enum expresses the possible states for a `Cursor`.
Values:
IDLE: Iteration over results has not started yet (alive=T, started=F)
STARTED: Iteration has started, *can* still yield results (alive=T, started=T)
CLOSED: Finished/forcibly stopped. Won't return more documents (alive=F)
"""
# Iteration over results has not started yet (alive=T, started=F)
IDLE = "idle"
# Iteration has started, *can* still yield results (alive=T, started=T)
STARTED = "started"
# Finished/forcibly stopped. Won't return more documents (alive=F)
CLOSED = "closed"
class AbstractCursor(ABC, Generic[TRAW]):
"""
A cursor obtained from the invocation of a find-type method over a table or
a collection.
This is the main interface to scroll through the results (resp. rows or documents).
This class is not meant to be directly instantiated by the user, rather it
is a superclass capturing some basic mechanisms common to all find cursors.
Cursors provide a seamless interface to the caller code, allowing iteration
over results while chunks of new data (pages) are exchanged periodically with
the API. For this reason, cursors internally manage a local buffer that is
progressively emptied and re-filled with a new page in a manner hidden from the
user -- except, some cursor methods allow to peek into this buffer should it
be necessary.
"""
_state: CursorState
_buffer: list[TRAW]
_pages_retrieved: int
_consumed: int
_next_page_state: str | None
_last_response_status: dict[str, Any] | None
def __init__(self) -> None:
self.rewind()
def _imprint_internal_state(self, other: AbstractCursor[TRAW]) -> None:
"""Mutably copy the internal state of this cursor onto another one."""
other._state = self._state
other._buffer = self._buffer
other._pages_retrieved = self._pages_retrieved
other._consumed = self._consumed
other._next_page_state = self._next_page_state
other._last_response_status = self._last_response_status
def _ensure_alive(self) -> None:
if self._state == CursorState.CLOSED:
raise CursorException(
text="Cursor is stopped.",
cursor_state=self._state.value,
)
def _ensure_idle(self) -> None:
if self._state != CursorState.IDLE:
raise CursorException(
text="Cursor is not idle anymore.",
cursor_state=self._state.value,
)
@property
def state(self) -> CursorState:
"""
The current state of this cursor.
Returns:
a value in `astrapy.cursors.CursorState`.
"""
return self._state
@property
def consumed(self) -> int:
"""
The number of items the cursors has yielded, i.e. how many items
have been already read by the code consuming the cursor.
Returns:
consumed: a non-negative integer, the count of items yielded so far.
"""
return self._consumed
@property
def cursor_id(self) -> int:
"""
An integer uniquely identifying this cursor.
Returns:
cursor_id: an integer number uniquely identifying the cursor.
"""
return id(self)
@property
def buffered_count(self) -> int:
"""
The number of items (documents, rows) currently stored in the client-side
buffer of this cursor. Reading this property never triggers new API calls
to re-fill the buffer.
Returns:
buffered_count: a non-negative integer, the amount of items currently
stored in the local buffer.
"""
return len(self._buffer)
def close(self) -> None:
"""
Close the cursor, regardless of its state. A cursor can be closed at any
time, possibly discarding the portion of results that has not yet been
consumed, if any.
This is an in-place modification of the cursor.
"""
self._state = CursorState.CLOSED
self._buffer = []
def rewind(self) -> None:
"""
Rewind the cursor, bringing it back to its pristine state of no items
retrieved/consumed yet, regardless of its current state.
All cursor settings (filter, mapping, projection, etc) are retained.
A cursor can be rewound at any time. Keep in mind that, subject to changes
occurred on the table or collection the results may be different if a cursor
is browsed a second time after rewinding it.
This is an in-place modification of the cursor.
"""
self._state = CursorState.IDLE
self._buffer = []
self._pages_retrieved = 0
self._consumed = 0
self._next_page_state = None
self._last_response_status = None
def consume_buffer(self, n: int | None = None) -> list[TRAW]:
"""
Consume (return) up to the requested number of buffered items (rows/documents).
The returned items are marked as consumed, meaning that subsequently consuming
the cursor will start after those items.
This method is an in-place modification of the cursor and only concerns
the local buffer: it never triggers fetching of new pages from the Data API.
This method can be called regardless of the cursor state without exceptions
being raised.
Args:
n: amount of items to return. If omitted, the whole buffer is returned.
Returns:
list: a list of items (rows/document dictionaries). If there are fewer
items than requested, the whole buffer is returned without errors:
in particular, if it is empty (such as when the cursor is closed),
an empty list is returned.
"""
_n = n if n is not None else len(self._buffer)
if _n < 0:
raise ValueError("A negative amount of items was requested.")
returned, remaining = self._buffer[:_n], self._buffer[_n:]
self._buffer = remaining
self._consumed += len(returned)
return returned
Classes
class AbstractCursor
-
A cursor obtained from the invocation of a find-type method over a table or a collection. This is the main interface to scroll through the results (resp. rows or documents).
This class is not meant to be directly instantiated by the user, rather it is a superclass capturing some basic mechanisms common to all find cursors.
Cursors provide a seamless interface to the caller code, allowing iteration over results while chunks of new data (pages) are exchanged periodically with the API. For this reason, cursors internally manage a local buffer that is progressively emptied and re-filled with a new page in a manner hidden from the user – except, some cursor methods allow to peek into this buffer should it be necessary.
Expand source code
class AbstractCursor(ABC, Generic[TRAW]): """ A cursor obtained from the invocation of a find-type method over a table or a collection. This is the main interface to scroll through the results (resp. rows or documents). This class is not meant to be directly instantiated by the user, rather it is a superclass capturing some basic mechanisms common to all find cursors. Cursors provide a seamless interface to the caller code, allowing iteration over results while chunks of new data (pages) are exchanged periodically with the API. For this reason, cursors internally manage a local buffer that is progressively emptied and re-filled with a new page in a manner hidden from the user -- except, some cursor methods allow to peek into this buffer should it be necessary. """ _state: CursorState _buffer: list[TRAW] _pages_retrieved: int _consumed: int _next_page_state: str | None _last_response_status: dict[str, Any] | None def __init__(self) -> None: self.rewind() def _imprint_internal_state(self, other: AbstractCursor[TRAW]) -> None: """Mutably copy the internal state of this cursor onto another one.""" other._state = self._state other._buffer = self._buffer other._pages_retrieved = self._pages_retrieved other._consumed = self._consumed other._next_page_state = self._next_page_state other._last_response_status = self._last_response_status def _ensure_alive(self) -> None: if self._state == CursorState.CLOSED: raise CursorException( text="Cursor is stopped.", cursor_state=self._state.value, ) def _ensure_idle(self) -> None: if self._state != CursorState.IDLE: raise CursorException( text="Cursor is not idle anymore.", cursor_state=self._state.value, ) @property def state(self) -> CursorState: """ The current state of this cursor. Returns: a value in `astrapy.cursors.CursorState`. """ return self._state @property def consumed(self) -> int: """ The number of items the cursors has yielded, i.e. how many items have been already read by the code consuming the cursor. Returns: consumed: a non-negative integer, the count of items yielded so far. """ return self._consumed @property def cursor_id(self) -> int: """ An integer uniquely identifying this cursor. Returns: cursor_id: an integer number uniquely identifying the cursor. """ return id(self) @property def buffered_count(self) -> int: """ The number of items (documents, rows) currently stored in the client-side buffer of this cursor. Reading this property never triggers new API calls to re-fill the buffer. Returns: buffered_count: a non-negative integer, the amount of items currently stored in the local buffer. """ return len(self._buffer) def close(self) -> None: """ Close the cursor, regardless of its state. A cursor can be closed at any time, possibly discarding the portion of results that has not yet been consumed, if any. This is an in-place modification of the cursor. """ self._state = CursorState.CLOSED self._buffer = [] def rewind(self) -> None: """ Rewind the cursor, bringing it back to its pristine state of no items retrieved/consumed yet, regardless of its current state. All cursor settings (filter, mapping, projection, etc) are retained. A cursor can be rewound at any time. Keep in mind that, subject to changes occurred on the table or collection the results may be different if a cursor is browsed a second time after rewinding it. This is an in-place modification of the cursor. """ self._state = CursorState.IDLE self._buffer = [] self._pages_retrieved = 0 self._consumed = 0 self._next_page_state = None self._last_response_status = None def consume_buffer(self, n: int | None = None) -> list[TRAW]: """ Consume (return) up to the requested number of buffered items (rows/documents). The returned items are marked as consumed, meaning that subsequently consuming the cursor will start after those items. This method is an in-place modification of the cursor and only concerns the local buffer: it never triggers fetching of new pages from the Data API. This method can be called regardless of the cursor state without exceptions being raised. Args: n: amount of items to return. If omitted, the whole buffer is returned. Returns: list: a list of items (rows/document dictionaries). If there are fewer items than requested, the whole buffer is returned without errors: in particular, if it is empty (such as when the cursor is closed), an empty list is returned. """ _n = n if n is not None else len(self._buffer) if _n < 0: raise ValueError("A negative amount of items was requested.") returned, remaining = self._buffer[:_n], self._buffer[_n:] self._buffer = remaining self._consumed += len(returned) return returned
Ancestors
- abc.ABC
- typing.Generic
Subclasses
- AsyncCollectionFindAndRerankCursor
- CollectionFindAndRerankCursor
- AsyncCollectionFindCursor
- AsyncTableFindCursor
- CollectionFindCursor
- TableFindCursor
Instance variables
var buffered_count : int
-
The number of items (documents, rows) currently stored in the client-side buffer of this cursor. Reading this property never triggers new API calls to re-fill the buffer.
Returns
buffered_count
- a non-negative integer, the amount of items currently stored in the local buffer.
Expand source code
@property def buffered_count(self) -> int: """ The number of items (documents, rows) currently stored in the client-side buffer of this cursor. Reading this property never triggers new API calls to re-fill the buffer. Returns: buffered_count: a non-negative integer, the amount of items currently stored in the local buffer. """ return len(self._buffer)
var consumed : int
-
The number of items the cursors has yielded, i.e. how many items have been already read by the code consuming the cursor.
Returns
consumed
- a non-negative integer, the count of items yielded so far.
Expand source code
@property def consumed(self) -> int: """ The number of items the cursors has yielded, i.e. how many items have been already read by the code consuming the cursor. Returns: consumed: a non-negative integer, the count of items yielded so far. """ return self._consumed
var cursor_id : int
-
An integer uniquely identifying this cursor.
Returns
cursor_id
- an integer number uniquely identifying the cursor.
Expand source code
@property def cursor_id(self) -> int: """ An integer uniquely identifying this cursor. Returns: cursor_id: an integer number uniquely identifying the cursor. """ return id(self)
var state : CursorState
-
Expand source code
@property def state(self) -> CursorState: """ The current state of this cursor. Returns: a value in `astrapy.cursors.CursorState`. """ return self._state
Methods
def close(self) ‑> None
-
Close the cursor, regardless of its state. A cursor can be closed at any time, possibly discarding the portion of results that has not yet been consumed, if any.
This is an in-place modification of the cursor.
Expand source code
def close(self) -> None: """ Close the cursor, regardless of its state. A cursor can be closed at any time, possibly discarding the portion of results that has not yet been consumed, if any. This is an in-place modification of the cursor. """ self._state = CursorState.CLOSED self._buffer = []
def consume_buffer(self, n: int | None = None) ‑> list[~TRAW]
-
Consume (return) up to the requested number of buffered items (rows/documents). The returned items are marked as consumed, meaning that subsequently consuming the cursor will start after those items.
This method is an in-place modification of the cursor and only concerns the local buffer: it never triggers fetching of new pages from the Data API.
This method can be called regardless of the cursor state without exceptions being raised.
Args
n
- amount of items to return. If omitted, the whole buffer is returned.
Returns
list
- a list of items (rows/document dictionaries). If there are fewer items than requested, the whole buffer is returned without errors: in particular, if it is empty (such as when the cursor is closed), an empty list is returned.
Expand source code
def consume_buffer(self, n: int | None = None) -> list[TRAW]: """ Consume (return) up to the requested number of buffered items (rows/documents). The returned items are marked as consumed, meaning that subsequently consuming the cursor will start after those items. This method is an in-place modification of the cursor and only concerns the local buffer: it never triggers fetching of new pages from the Data API. This method can be called regardless of the cursor state without exceptions being raised. Args: n: amount of items to return. If omitted, the whole buffer is returned. Returns: list: a list of items (rows/document dictionaries). If there are fewer items than requested, the whole buffer is returned without errors: in particular, if it is empty (such as when the cursor is closed), an empty list is returned. """ _n = n if n is not None else len(self._buffer) if _n < 0: raise ValueError("A negative amount of items was requested.") returned, remaining = self._buffer[:_n], self._buffer[_n:] self._buffer = remaining self._consumed += len(returned) return returned
def rewind(self) ‑> None
-
Rewind the cursor, bringing it back to its pristine state of no items retrieved/consumed yet, regardless of its current state. All cursor settings (filter, mapping, projection, etc) are retained.
A cursor can be rewound at any time. Keep in mind that, subject to changes occurred on the table or collection the results may be different if a cursor is browsed a second time after rewinding it.
This is an in-place modification of the cursor.
Expand source code
def rewind(self) -> None: """ Rewind the cursor, bringing it back to its pristine state of no items retrieved/consumed yet, regardless of its current state. All cursor settings (filter, mapping, projection, etc) are retained. A cursor can be rewound at any time. Keep in mind that, subject to changes occurred on the table or collection the results may be different if a cursor is browsed a second time after rewinding it. This is an in-place modification of the cursor. """ self._state = CursorState.IDLE self._buffer = [] self._pages_retrieved = 0 self._consumed = 0 self._next_page_state = None self._last_response_status = None
class CursorState (*args, **kwds)
-
This enum expresses the possible states for a
Cursor
.Values
IDLE: Iteration over results has not started yet (alive=T, started=F) STARTED: Iteration has started, can still yield results (alive=T, started=T) CLOSED: Finished/forcibly stopped. Won't return more documents (alive=F)
Expand source code
class CursorState(Enum): """ This enum expresses the possible states for a `Cursor`. Values: IDLE: Iteration over results has not started yet (alive=T, started=F) STARTED: Iteration has started, *can* still yield results (alive=T, started=T) CLOSED: Finished/forcibly stopped. Won't return more documents (alive=F) """ # Iteration over results has not started yet (alive=T, started=F) IDLE = "idle" # Iteration has started, *can* still yield results (alive=T, started=T) STARTED = "started" # Finished/forcibly stopped. Won't return more documents (alive=F) CLOSED = "closed"
Ancestors
- enum.Enum
Class variables
var CLOSED
var IDLE
var STARTED