Module astrapy.operations

Expand source code
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass
from functools import reduce
from typing import (
    Any,
    Dict,
    Iterable,
    List,
    Optional,
)

from astrapy.constants import DocumentType, SortType, VectorType
from astrapy.results import (
    BulkWriteResult,
    DeleteResult,
    InsertManyResult,
    InsertOneResult,
    UpdateResult,
)
from astrapy.collection import AsyncCollection, Collection


def reduce_bulk_write_results(results: List[BulkWriteResult]) -> BulkWriteResult:
    """
    Reduce a list of bulk write results into a single one.

    Args:
        results: a list of BulkWriteResult instances.

    Returns:
        A new BulkWRiteResult object which summarized the whole input list.
    """

    zero = BulkWriteResult.zero()

    def _sum_results(r1: BulkWriteResult, r2: BulkWriteResult) -> BulkWriteResult:
        bulk_api_results = {**r1.bulk_api_results, **r2.bulk_api_results}
        if r1.deleted_count is None or r2.deleted_count is None:
            deleted_count = None
        else:
            deleted_count = r1.deleted_count + r2.deleted_count
        inserted_count = r1.inserted_count + r2.inserted_count
        matched_count = r1.matched_count + r2.matched_count
        modified_count = r1.modified_count + r2.modified_count
        upserted_count = r1.upserted_count + r2.upserted_count
        upserted_ids = {**r1.upserted_ids, **r2.upserted_ids}
        return BulkWriteResult(
            bulk_api_results=bulk_api_results,
            deleted_count=deleted_count,
            inserted_count=inserted_count,
            matched_count=matched_count,
            modified_count=modified_count,
            upserted_count=upserted_count,
            upserted_ids=upserted_ids,
        )

    return reduce(_sum_results, results, zero)


class BaseOperation(ABC):
    """
    Base class for all operations amenable to be used
    in bulk writes on (sync) collections.
    """

    @abstractmethod
    def execute(
        self,
        collection: Collection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult: ...


@dataclass
class InsertOne(BaseOperation):
    """
    Represents an `insert_one` operation on a (sync) collection.
    See the documentation on the collection method for more information.

    Attributes:
        document: the document to insert.
        vector: an optional suitable vector to enrich the document at insertion.
        vectorize: a string to be made into a vector, with the same result as the
            `vector` attribute, through an embedding service, assuming one is
            configured for the collection.
    """

    document: DocumentType
    vector: Optional[VectorType]
    vectorize: Optional[str]

    def __init__(
        self,
        document: DocumentType,
        *,
        vector: Optional[VectorType] = None,
        vectorize: Optional[str] = None,
    ) -> None:
        self.document = document
        self.vector = vector
        self.vectorize = vectorize

    def execute(
        self,
        collection: Collection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: InsertOneResult = collection.insert_one(
            document=self.document,
            vector=self.vector,
            vectorize=self.vectorize,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)


@dataclass
class InsertMany(BaseOperation):
    """
    Represents an `insert_many` operation on a (sync) collection.
    See the documentation on the collection method for more information.

    Attributes:
        documents: the list document to insert.
        vectors: an optional list of vectors to enrich the documents at insertion.
        vectorize: an optional list of texts achieving the same effect as `vectors`
            except through an embedding service, if one is configured for the collection.
        ordered: whether the inserts should be done in sequence.
        chunk_size: how many documents to include in a single API request.
            Exceeding the server maximum allowed value results in an error.
            Leave it unspecified (recommended) to use the system default.
        concurrency: maximum number of concurrent requests to the API at
            a given time. It cannot be more than one for ordered insertions.
    """

    documents: Iterable[DocumentType]
    vectors: Optional[Iterable[Optional[VectorType]]]
    vectorize: Optional[Iterable[Optional[str]]]
    ordered: bool
    chunk_size: Optional[int]
    concurrency: Optional[int]

    def __init__(
        self,
        documents: Iterable[DocumentType],
        *,
        vectors: Optional[Iterable[Optional[VectorType]]] = None,
        vectorize: Optional[Iterable[Optional[str]]] = None,
        ordered: bool = True,
        chunk_size: Optional[int] = None,
        concurrency: Optional[int] = None,
    ) -> None:
        self.documents = documents
        self.ordered = ordered
        self.vectors = vectors
        self.vectorize = vectorize
        self.chunk_size = chunk_size
        self.concurrency = concurrency

    def execute(
        self,
        collection: Collection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: InsertManyResult = collection.insert_many(
            documents=self.documents,
            vectors=self.vectors,
            vectorize=self.vectorize,
            ordered=self.ordered,
            chunk_size=self.chunk_size,
            concurrency=self.concurrency,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)


@dataclass
class UpdateOne(BaseOperation):
    """
    Represents an `update_one` operation on a (sync) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select a target document.
        update: an update prescription to apply to the document.
        vector: a vector of numbers to use for ANN (vector-search) sorting.
        vectorize: a string to be made into a vector, with the same result as the
            `vector` attribute, through an embedding service, assuming one is
            configured for the collection.
        sort: controls ordering of results, hence which document is affected.
        upsert: controls what to do when no documents are found.
    """

    filter: Dict[str, Any]
    update: Dict[str, Any]
    vector: Optional[VectorType]
    vectorize: Optional[str]
    sort: Optional[SortType]
    upsert: bool

    def __init__(
        self,
        filter: Dict[str, Any],
        update: Dict[str, Any],
        *,
        vector: Optional[VectorType] = None,
        vectorize: Optional[str] = None,
        sort: Optional[SortType] = None,
        upsert: bool = False,
    ) -> None:
        self.filter = filter
        self.update = update
        self.vector = vector
        self.vectorize = vectorize
        self.sort = sort
        self.upsert = upsert

    def execute(
        self,
        collection: Collection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: UpdateResult = collection.update_one(
            filter=self.filter,
            update=self.update,
            vector=self.vector,
            vectorize=self.vectorize,
            sort=self.sort,
            upsert=self.upsert,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)


@dataclass
class UpdateMany(BaseOperation):
    """
    Represents an `update_many` operation on a (sync) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select target documents.
        update: an update prescription to apply to the documents.
        upsert: controls what to do when no documents are found.
    """

    filter: Dict[str, Any]
    update: Dict[str, Any]
    upsert: bool

    def __init__(
        self,
        filter: Dict[str, Any],
        update: Dict[str, Any],
        *,
        upsert: bool = False,
    ) -> None:
        self.filter = filter
        self.update = update
        self.upsert = upsert

    def execute(
        self,
        collection: Collection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: UpdateResult = collection.update_many(
            filter=self.filter,
            update=self.update,
            upsert=self.upsert,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)


@dataclass
class ReplaceOne(BaseOperation):
    """
    Represents a `replace_one` operation on a (sync) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select a target document.
        replacement: the replacement document.
        vector: a vector of numbers to use for ANN (vector-search) sorting.
        vectorize: a string to be made into a vector, with the same result as the
            `vector` attribute, through an embedding service, assuming one is
            configured for the collection.
        sort: controls ordering of results, hence which document is affected.
        upsert: controls what to do when no documents are found.
    """

    filter: Dict[str, Any]
    replacement: DocumentType
    vector: Optional[VectorType]
    vectorize: Optional[str]
    sort: Optional[SortType]
    upsert: bool

    def __init__(
        self,
        filter: Dict[str, Any],
        replacement: DocumentType,
        *,
        vector: Optional[VectorType] = None,
        vectorize: Optional[str] = None,
        sort: Optional[SortType] = None,
        upsert: bool = False,
    ) -> None:
        self.filter = filter
        self.replacement = replacement
        self.vector = vector
        self.vectorize = vectorize
        self.sort = sort
        self.upsert = upsert

    def execute(
        self,
        collection: Collection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: UpdateResult = collection.replace_one(
            filter=self.filter,
            replacement=self.replacement,
            vector=self.vector,
            vectorize=self.vectorize,
            sort=self.sort,
            upsert=self.upsert,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)


@dataclass
class DeleteOne(BaseOperation):
    """
    Represents a `delete_one` operation on a (sync) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select a target document.
        vector: a vector of numbers to use for ANN (vector-search) sorting.
        vectorize: a string to be made into a vector, with the same result as the
            `vector` attribute, through an embedding service, assuming one is
            configured for the collection.
        sort: controls ordering of results, hence which document is affected.
    """

    filter: Dict[str, Any]
    vector: Optional[VectorType]
    vectorize: Optional[str]
    sort: Optional[SortType]

    def __init__(
        self,
        filter: Dict[str, Any],
        *,
        vector: Optional[VectorType] = None,
        vectorize: Optional[str] = None,
        sort: Optional[SortType] = None,
    ) -> None:
        self.filter = filter
        self.vector = vector
        self.vectorize = vectorize
        self.sort = sort

    def execute(
        self,
        collection: Collection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: DeleteResult = collection.delete_one(
            filter=self.filter,
            vector=self.vector,
            vectorize=self.vectorize,
            sort=self.sort,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)


@dataclass
class DeleteMany(BaseOperation):
    """
    Represents a `delete_many` operation on a (sync) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select target documents.
    """

    filter: Dict[str, Any]

    def __init__(
        self,
        filter: Dict[str, Any],
    ) -> None:
        self.filter = filter

    def execute(
        self,
        collection: Collection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: DeleteResult = collection.delete_many(
            filter=self.filter, max_time_ms=bulk_write_timeout_ms
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)


class AsyncBaseOperation(ABC):
    """
    Base class for all operations amenable to be used
    in bulk writes on (async) collections.
    """

    @abstractmethod
    async def execute(
        self,
        collection: AsyncCollection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult: ...


@dataclass
class AsyncInsertOne(AsyncBaseOperation):
    """
    Represents an `insert_one` operation on a (async) collection.
    See the documentation on the collection method for more information.

    Attributes:
        document: the document to insert.
        vector: an optional suitable vector to enrich the document at insertion.
        vectorize: a string to be made into a vector, with the same result as the
            `vector` attribute, through an embedding service, assuming one is
            configured for the collection.
    """

    document: DocumentType
    vector: Optional[VectorType]
    vectorize: Optional[str]

    def __init__(
        self,
        document: DocumentType,
        *,
        vector: Optional[VectorType] = None,
        vectorize: Optional[str] = None,
    ) -> None:
        self.document = document
        self.vector = vector
        self.vectorize = vectorize

    async def execute(
        self,
        collection: AsyncCollection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: InsertOneResult = await collection.insert_one(
            document=self.document,
            vector=self.vector,
            vectorize=self.vectorize,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)


@dataclass
class AsyncInsertMany(AsyncBaseOperation):
    """
    Represents an `insert_many` operation on a (async) collection.
    See the documentation on the collection method for more information.

    Attributes:
        documents: the list document to insert.
        vectors: an optional list of vectors to enrich the documents at insertion.
        vectorize: an optional list of texts achieving the same effect as `vectors`
            except through an embedding service, if one is configured for the collection.
        ordered: whether the inserts should be done in sequence.
        chunk_size: how many documents to include in a single API request.
            Exceeding the server maximum allowed value results in an error.
            Leave it unspecified (recommended) to use the system default.
        concurrency: maximum number of concurrent requests to the API at
            a given time. It cannot be more than one for ordered insertions.
    """

    documents: Iterable[DocumentType]
    vectors: Optional[Iterable[Optional[VectorType]]]
    vectorize: Optional[Iterable[Optional[str]]]
    ordered: bool
    chunk_size: Optional[int]
    concurrency: Optional[int]

    def __init__(
        self,
        documents: Iterable[DocumentType],
        *,
        vectors: Optional[Iterable[Optional[VectorType]]] = None,
        vectorize: Optional[Iterable[Optional[str]]] = None,
        ordered: bool = True,
        chunk_size: Optional[int] = None,
        concurrency: Optional[int] = None,
    ) -> None:
        self.documents = documents
        self.vectors = vectors
        self.vectorize = vectorize
        self.ordered = ordered
        self.chunk_size = chunk_size
        self.concurrency = concurrency

    async def execute(
        self,
        collection: AsyncCollection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: InsertManyResult = await collection.insert_many(
            documents=self.documents,
            vectors=self.vectors,
            vectorize=self.vectorize,
            ordered=self.ordered,
            chunk_size=self.chunk_size,
            concurrency=self.concurrency,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)


@dataclass
class AsyncUpdateOne(AsyncBaseOperation):
    """
    Represents an `update_one` operation on a (async) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select a target document.
        update: an update prescription to apply to the document.
        vector: a vector of numbers to use for ANN (vector-search) sorting.
        vectorize: a string to be made into a vector, with the same result as the
            `vector` attribute, through an embedding service, assuming one is
            configured for the collection.
        sort: controls ordering of results, hence which document is affected.
        upsert: controls what to do when no documents are found.
    """

    filter: Dict[str, Any]
    update: Dict[str, Any]
    vector: Optional[VectorType]
    vectorize: Optional[str]
    sort: Optional[SortType]
    upsert: bool

    def __init__(
        self,
        filter: Dict[str, Any],
        update: Dict[str, Any],
        *,
        vector: Optional[VectorType] = None,
        vectorize: Optional[str] = None,
        sort: Optional[SortType] = None,
        upsert: bool = False,
    ) -> None:
        self.filter = filter
        self.update = update
        self.vector = vector
        self.vectorize = vectorize
        self.sort = sort
        self.upsert = upsert

    async def execute(
        self,
        collection: AsyncCollection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: UpdateResult = await collection.update_one(
            filter=self.filter,
            update=self.update,
            vector=self.vector,
            vectorize=self.vectorize,
            sort=self.sort,
            upsert=self.upsert,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)


@dataclass
class AsyncUpdateMany(AsyncBaseOperation):
    """
    Represents an `update_many` operation on a (async) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select target documents.
        update: an update prescription to apply to the documents.
        upsert: controls what to do when no documents are found.
    """

    filter: Dict[str, Any]
    update: Dict[str, Any]
    upsert: bool

    def __init__(
        self,
        filter: Dict[str, Any],
        update: Dict[str, Any],
        *,
        upsert: bool = False,
    ) -> None:
        self.filter = filter
        self.update = update
        self.upsert = upsert

    async def execute(
        self,
        collection: AsyncCollection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: UpdateResult = await collection.update_many(
            filter=self.filter,
            update=self.update,
            upsert=self.upsert,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)


@dataclass
class AsyncReplaceOne(AsyncBaseOperation):
    """
    Represents a `replace_one` operation on a (async) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select a target document.
        replacement: the replacement document.
        vector: a vector of numbers to use for ANN (vector-search) sorting.
        vectorize: a string to be made into a vector, with the same result as the
            `vector` attribute, through an embedding service, assuming one is
            configured for the collection.
        sort: controls ordering of results, hence which document is affected.
        upsert: controls what to do when no documents are found.
    """

    filter: Dict[str, Any]
    replacement: DocumentType
    vector: Optional[VectorType]
    vectorize: Optional[str]
    sort: Optional[SortType]
    upsert: bool

    def __init__(
        self,
        filter: Dict[str, Any],
        replacement: DocumentType,
        *,
        vector: Optional[VectorType] = None,
        vectorize: Optional[str] = None,
        sort: Optional[SortType] = None,
        upsert: bool = False,
    ) -> None:
        self.filter = filter
        self.replacement = replacement
        self.vector = vector
        self.vectorize = vectorize
        self.sort = sort
        self.upsert = upsert

    async def execute(
        self,
        collection: AsyncCollection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: UpdateResult = await collection.replace_one(
            filter=self.filter,
            replacement=self.replacement,
            vector=self.vector,
            vectorize=self.vectorize,
            sort=self.sort,
            upsert=self.upsert,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)


@dataclass
class AsyncDeleteOne(AsyncBaseOperation):
    """
    Represents a `delete_one` operation on a (async) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select a target document.
        vector: a vector of numbers to use for ANN (vector-search) sorting.
        vectorize: a string to be made into a vector, with the same result as the
            `vector` attribute, through an embedding service, assuming one is
            configured for the collection.
        sort: controls ordering of results, hence which document is affected.
    """

    filter: Dict[str, Any]
    vector: Optional[VectorType]
    vectorize: Optional[str]
    sort: Optional[SortType]

    def __init__(
        self,
        filter: Dict[str, Any],
        *,
        vector: Optional[VectorType] = None,
        vectorize: Optional[str] = None,
        sort: Optional[SortType] = None,
    ) -> None:
        self.filter = filter
        self.vector = vector
        self.vectorize = vectorize
        self.sort = sort

    async def execute(
        self,
        collection: AsyncCollection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: DeleteResult = await collection.delete_one(
            filter=self.filter,
            vector=self.vector,
            vectorize=self.vectorize,
            sort=self.sort,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)


@dataclass
class AsyncDeleteMany(AsyncBaseOperation):
    """
    Represents a `delete_many` operation on a (async) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select target documents.
    """

    filter: Dict[str, Any]

    def __init__(
        self,
        filter: Dict[str, Any],
    ) -> None:
        self.filter = filter

    async def execute(
        self,
        collection: AsyncCollection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: DeleteResult = await collection.delete_many(
            filter=self.filter, max_time_ms=bulk_write_timeout_ms
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)

Functions

def reduce_bulk_write_results(results: List[BulkWriteResult]) ‑> BulkWriteResult

Reduce a list of bulk write results into a single one.

Args

results
a list of BulkWriteResult instances.

Returns

A new BulkWRiteResult object which summarized the whole input list.

Expand source code
def reduce_bulk_write_results(results: List[BulkWriteResult]) -> BulkWriteResult:
    """
    Reduce a list of bulk write results into a single one.

    Args:
        results: a list of BulkWriteResult instances.

    Returns:
        A new BulkWRiteResult object which summarized the whole input list.
    """

    zero = BulkWriteResult.zero()

    def _sum_results(r1: BulkWriteResult, r2: BulkWriteResult) -> BulkWriteResult:
        bulk_api_results = {**r1.bulk_api_results, **r2.bulk_api_results}
        if r1.deleted_count is None or r2.deleted_count is None:
            deleted_count = None
        else:
            deleted_count = r1.deleted_count + r2.deleted_count
        inserted_count = r1.inserted_count + r2.inserted_count
        matched_count = r1.matched_count + r2.matched_count
        modified_count = r1.modified_count + r2.modified_count
        upserted_count = r1.upserted_count + r2.upserted_count
        upserted_ids = {**r1.upserted_ids, **r2.upserted_ids}
        return BulkWriteResult(
            bulk_api_results=bulk_api_results,
            deleted_count=deleted_count,
            inserted_count=inserted_count,
            matched_count=matched_count,
            modified_count=modified_count,
            upserted_count=upserted_count,
            upserted_ids=upserted_ids,
        )

    return reduce(_sum_results, results, zero)

Classes

class AsyncBaseOperation

Base class for all operations amenable to be used in bulk writes on (async) collections.

Expand source code
class AsyncBaseOperation(ABC):
    """
    Base class for all operations amenable to be used
    in bulk writes on (async) collections.
    """

    @abstractmethod
    async def execute(
        self,
        collection: AsyncCollection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult: ...

Ancestors

  • abc.ABC

Subclasses

Methods

async def execute(self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: Optional[int]) ‑> BulkWriteResult
Expand source code
@abstractmethod
async def execute(
    self,
    collection: AsyncCollection,
    index_in_bulk_write: int,
    bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult: ...
class AsyncDeleteMany (filter: Dict[str, Any])

Represents a delete_many operation on a (async) collection. See the documentation on the collection method for more information.

Attributes

filter
a filter condition to select target documents.
Expand source code
@dataclass
class AsyncDeleteMany(AsyncBaseOperation):
    """
    Represents a `delete_many` operation on a (async) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select target documents.
    """

    filter: Dict[str, Any]

    def __init__(
        self,
        filter: Dict[str, Any],
    ) -> None:
        self.filter = filter

    async def execute(
        self,
        collection: AsyncCollection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: DeleteResult = await collection.delete_many(
            filter=self.filter, max_time_ms=bulk_write_timeout_ms
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)

Ancestors

Class variables

var filter : Dict[str, Any]

Methods

async def execute(self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: Optional[int]) ‑> BulkWriteResult

Execute this operation against a collection as part of a bulk write.

Args

collection
the collection this write targets.
insert_in_bulk_write
the index in the list of bulkoperations
Expand source code
async def execute(
    self,
    collection: AsyncCollection,
    index_in_bulk_write: int,
    bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
    """
    Execute this operation against a collection as part of a bulk write.

    Args:
        collection: the collection this write targets.
        insert_in_bulk_write: the index in the list of bulkoperations
    """

    op_result: DeleteResult = await collection.delete_many(
        filter=self.filter, max_time_ms=bulk_write_timeout_ms
    )
    return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class AsyncDeleteOne (filter: Dict[str, Any], *, vector: Optional[VectorType] = None, vectorize: Optional[str] = None, sort: Optional[SortType] = None)

Represents a delete_one operation on a (async) collection. See the documentation on the collection method for more information.

Attributes

filter
a filter condition to select a target document.
vector
a vector of numbers to use for ANN (vector-search) sorting.
vectorize
a string to be made into a vector, with the same result as the vector attribute, through an embedding service, assuming one is configured for the collection.
sort
controls ordering of results, hence which document is affected.
Expand source code
@dataclass
class AsyncDeleteOne(AsyncBaseOperation):
    """
    Represents a `delete_one` operation on a (async) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select a target document.
        vector: a vector of numbers to use for ANN (vector-search) sorting.
        vectorize: a string to be made into a vector, with the same result as the
            `vector` attribute, through an embedding service, assuming one is
            configured for the collection.
        sort: controls ordering of results, hence which document is affected.
    """

    filter: Dict[str, Any]
    vector: Optional[VectorType]
    vectorize: Optional[str]
    sort: Optional[SortType]

    def __init__(
        self,
        filter: Dict[str, Any],
        *,
        vector: Optional[VectorType] = None,
        vectorize: Optional[str] = None,
        sort: Optional[SortType] = None,
    ) -> None:
        self.filter = filter
        self.vector = vector
        self.vectorize = vectorize
        self.sort = sort

    async def execute(
        self,
        collection: AsyncCollection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: DeleteResult = await collection.delete_one(
            filter=self.filter,
            vector=self.vector,
            vectorize=self.vectorize,
            sort=self.sort,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)

Ancestors

Class variables

var filter : Dict[str, Any]
var sort : Optional[Dict[str, Any]]
var vector : Optional[Iterable[float]]
var vectorize : Optional[str]

Methods

async def execute(self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: Optional[int]) ‑> BulkWriteResult

Execute this operation against a collection as part of a bulk write.

Args

collection
the collection this write targets.
insert_in_bulk_write
the index in the list of bulkoperations
Expand source code
async def execute(
    self,
    collection: AsyncCollection,
    index_in_bulk_write: int,
    bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
    """
    Execute this operation against a collection as part of a bulk write.

    Args:
        collection: the collection this write targets.
        insert_in_bulk_write: the index in the list of bulkoperations
    """

    op_result: DeleteResult = await collection.delete_one(
        filter=self.filter,
        vector=self.vector,
        vectorize=self.vectorize,
        sort=self.sort,
        max_time_ms=bulk_write_timeout_ms,
    )
    return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class AsyncInsertMany (documents: Iterable[DocumentType], *, vectors: Optional[Iterable[Optional[VectorType]]] = None, vectorize: Optional[Iterable[Optional[str]]] = None, ordered: bool = True, chunk_size: Optional[int] = None, concurrency: Optional[int] = None)

Represents an insert_many operation on a (async) collection. See the documentation on the collection method for more information.

Attributes

documents
the list document to insert.
vectors
an optional list of vectors to enrich the documents at insertion.
vectorize
an optional list of texts achieving the same effect as vectors except through an embedding service, if one is configured for the collection.
ordered
whether the inserts should be done in sequence.
chunk_size
how many documents to include in a single API request. Exceeding the server maximum allowed value results in an error. Leave it unspecified (recommended) to use the system default.
concurrency
maximum number of concurrent requests to the API at a given time. It cannot be more than one for ordered insertions.
Expand source code
@dataclass
class AsyncInsertMany(AsyncBaseOperation):
    """
    Represents an `insert_many` operation on a (async) collection.
    See the documentation on the collection method for more information.

    Attributes:
        documents: the list document to insert.
        vectors: an optional list of vectors to enrich the documents at insertion.
        vectorize: an optional list of texts achieving the same effect as `vectors`
            except through an embedding service, if one is configured for the collection.
        ordered: whether the inserts should be done in sequence.
        chunk_size: how many documents to include in a single API request.
            Exceeding the server maximum allowed value results in an error.
            Leave it unspecified (recommended) to use the system default.
        concurrency: maximum number of concurrent requests to the API at
            a given time. It cannot be more than one for ordered insertions.
    """

    documents: Iterable[DocumentType]
    vectors: Optional[Iterable[Optional[VectorType]]]
    vectorize: Optional[Iterable[Optional[str]]]
    ordered: bool
    chunk_size: Optional[int]
    concurrency: Optional[int]

    def __init__(
        self,
        documents: Iterable[DocumentType],
        *,
        vectors: Optional[Iterable[Optional[VectorType]]] = None,
        vectorize: Optional[Iterable[Optional[str]]] = None,
        ordered: bool = True,
        chunk_size: Optional[int] = None,
        concurrency: Optional[int] = None,
    ) -> None:
        self.documents = documents
        self.vectors = vectors
        self.vectorize = vectorize
        self.ordered = ordered
        self.chunk_size = chunk_size
        self.concurrency = concurrency

    async def execute(
        self,
        collection: AsyncCollection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: InsertManyResult = await collection.insert_many(
            documents=self.documents,
            vectors=self.vectors,
            vectorize=self.vectorize,
            ordered=self.ordered,
            chunk_size=self.chunk_size,
            concurrency=self.concurrency,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)

Ancestors

Class variables

var chunk_size : Optional[int]
var concurrency : Optional[int]
var documents : Iterable[Dict[str, Any]]
var ordered : bool
var vectorize : Optional[Iterable[Optional[str]]]
var vectors : Optional[Iterable[Optional[Iterable[float]]]]

Methods

async def execute(self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: Optional[int]) ‑> BulkWriteResult

Execute this operation against a collection as part of a bulk write.

Args

collection
the collection this write targets.
insert_in_bulk_write
the index in the list of bulkoperations
Expand source code
async def execute(
    self,
    collection: AsyncCollection,
    index_in_bulk_write: int,
    bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
    """
    Execute this operation against a collection as part of a bulk write.

    Args:
        collection: the collection this write targets.
        insert_in_bulk_write: the index in the list of bulkoperations
    """

    op_result: InsertManyResult = await collection.insert_many(
        documents=self.documents,
        vectors=self.vectors,
        vectorize=self.vectorize,
        ordered=self.ordered,
        chunk_size=self.chunk_size,
        concurrency=self.concurrency,
        max_time_ms=bulk_write_timeout_ms,
    )
    return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class AsyncInsertOne (document: DocumentType, *, vector: Optional[VectorType] = None, vectorize: Optional[str] = None)

Represents an insert_one operation on a (async) collection. See the documentation on the collection method for more information.

Attributes

document
the document to insert.
vector
an optional suitable vector to enrich the document at insertion.
vectorize
a string to be made into a vector, with the same result as the vector attribute, through an embedding service, assuming one is configured for the collection.
Expand source code
@dataclass
class AsyncInsertOne(AsyncBaseOperation):
    """
    Represents an `insert_one` operation on a (async) collection.
    See the documentation on the collection method for more information.

    Attributes:
        document: the document to insert.
        vector: an optional suitable vector to enrich the document at insertion.
        vectorize: a string to be made into a vector, with the same result as the
            `vector` attribute, through an embedding service, assuming one is
            configured for the collection.
    """

    document: DocumentType
    vector: Optional[VectorType]
    vectorize: Optional[str]

    def __init__(
        self,
        document: DocumentType,
        *,
        vector: Optional[VectorType] = None,
        vectorize: Optional[str] = None,
    ) -> None:
        self.document = document
        self.vector = vector
        self.vectorize = vectorize

    async def execute(
        self,
        collection: AsyncCollection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: InsertOneResult = await collection.insert_one(
            document=self.document,
            vector=self.vector,
            vectorize=self.vectorize,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)

Ancestors

Class variables

var document : Dict[str, Any]
var vector : Optional[Iterable[float]]
var vectorize : Optional[str]

Methods

async def execute(self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: Optional[int]) ‑> BulkWriteResult

Execute this operation against a collection as part of a bulk write.

Args

collection
the collection this write targets.
insert_in_bulk_write
the index in the list of bulkoperations
Expand source code
async def execute(
    self,
    collection: AsyncCollection,
    index_in_bulk_write: int,
    bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
    """
    Execute this operation against a collection as part of a bulk write.

    Args:
        collection: the collection this write targets.
        insert_in_bulk_write: the index in the list of bulkoperations
    """

    op_result: InsertOneResult = await collection.insert_one(
        document=self.document,
        vector=self.vector,
        vectorize=self.vectorize,
        max_time_ms=bulk_write_timeout_ms,
    )
    return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class AsyncReplaceOne (filter: Dict[str, Any], replacement: DocumentType, *, vector: Optional[VectorType] = None, vectorize: Optional[str] = None, sort: Optional[SortType] = None, upsert: bool = False)

Represents a replace_one operation on a (async) collection. See the documentation on the collection method for more information.

Attributes

filter
a filter condition to select a target document.
replacement
the replacement document.
vector
a vector of numbers to use for ANN (vector-search) sorting.
vectorize
a string to be made into a vector, with the same result as the vector attribute, through an embedding service, assuming one is configured for the collection.
sort
controls ordering of results, hence which document is affected.
upsert
controls what to do when no documents are found.
Expand source code
@dataclass
class AsyncReplaceOne(AsyncBaseOperation):
    """
    Represents a `replace_one` operation on a (async) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select a target document.
        replacement: the replacement document.
        vector: a vector of numbers to use for ANN (vector-search) sorting.
        vectorize: a string to be made into a vector, with the same result as the
            `vector` attribute, through an embedding service, assuming one is
            configured for the collection.
        sort: controls ordering of results, hence which document is affected.
        upsert: controls what to do when no documents are found.
    """

    filter: Dict[str, Any]
    replacement: DocumentType
    vector: Optional[VectorType]
    vectorize: Optional[str]
    sort: Optional[SortType]
    upsert: bool

    def __init__(
        self,
        filter: Dict[str, Any],
        replacement: DocumentType,
        *,
        vector: Optional[VectorType] = None,
        vectorize: Optional[str] = None,
        sort: Optional[SortType] = None,
        upsert: bool = False,
    ) -> None:
        self.filter = filter
        self.replacement = replacement
        self.vector = vector
        self.vectorize = vectorize
        self.sort = sort
        self.upsert = upsert

    async def execute(
        self,
        collection: AsyncCollection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: UpdateResult = await collection.replace_one(
            filter=self.filter,
            replacement=self.replacement,
            vector=self.vector,
            vectorize=self.vectorize,
            sort=self.sort,
            upsert=self.upsert,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)

Ancestors

Class variables

var filter : Dict[str, Any]
var replacement : Dict[str, Any]
var sort : Optional[Dict[str, Any]]
var upsert : bool
var vector : Optional[Iterable[float]]
var vectorize : Optional[str]

Methods

async def execute(self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: Optional[int]) ‑> BulkWriteResult

Execute this operation against a collection as part of a bulk write.

Args

collection
the collection this write targets.
insert_in_bulk_write
the index in the list of bulkoperations
Expand source code
async def execute(
    self,
    collection: AsyncCollection,
    index_in_bulk_write: int,
    bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
    """
    Execute this operation against a collection as part of a bulk write.

    Args:
        collection: the collection this write targets.
        insert_in_bulk_write: the index in the list of bulkoperations
    """

    op_result: UpdateResult = await collection.replace_one(
        filter=self.filter,
        replacement=self.replacement,
        vector=self.vector,
        vectorize=self.vectorize,
        sort=self.sort,
        upsert=self.upsert,
        max_time_ms=bulk_write_timeout_ms,
    )
    return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class AsyncUpdateMany (filter: Dict[str, Any], update: Dict[str, Any], *, upsert: bool = False)

Represents an update_many operation on a (async) collection. See the documentation on the collection method for more information.

Attributes

filter
a filter condition to select target documents.
update
an update prescription to apply to the documents.
upsert
controls what to do when no documents are found.
Expand source code
@dataclass
class AsyncUpdateMany(AsyncBaseOperation):
    """
    Represents an `update_many` operation on a (async) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select target documents.
        update: an update prescription to apply to the documents.
        upsert: controls what to do when no documents are found.
    """

    filter: Dict[str, Any]
    update: Dict[str, Any]
    upsert: bool

    def __init__(
        self,
        filter: Dict[str, Any],
        update: Dict[str, Any],
        *,
        upsert: bool = False,
    ) -> None:
        self.filter = filter
        self.update = update
        self.upsert = upsert

    async def execute(
        self,
        collection: AsyncCollection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: UpdateResult = await collection.update_many(
            filter=self.filter,
            update=self.update,
            upsert=self.upsert,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)

Ancestors

Class variables

var filter : Dict[str, Any]
var update : Dict[str, Any]
var upsert : bool

Methods

async def execute(self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: Optional[int]) ‑> BulkWriteResult

Execute this operation against a collection as part of a bulk write.

Args

collection
the collection this write targets.
insert_in_bulk_write
the index in the list of bulkoperations
Expand source code
async def execute(
    self,
    collection: AsyncCollection,
    index_in_bulk_write: int,
    bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
    """
    Execute this operation against a collection as part of a bulk write.

    Args:
        collection: the collection this write targets.
        insert_in_bulk_write: the index in the list of bulkoperations
    """

    op_result: UpdateResult = await collection.update_many(
        filter=self.filter,
        update=self.update,
        upsert=self.upsert,
        max_time_ms=bulk_write_timeout_ms,
    )
    return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class AsyncUpdateOne (filter: Dict[str, Any], update: Dict[str, Any], *, vector: Optional[VectorType] = None, vectorize: Optional[str] = None, sort: Optional[SortType] = None, upsert: bool = False)

Represents an update_one operation on a (async) collection. See the documentation on the collection method for more information.

Attributes

filter
a filter condition to select a target document.
update
an update prescription to apply to the document.
vector
a vector of numbers to use for ANN (vector-search) sorting.
vectorize
a string to be made into a vector, with the same result as the vector attribute, through an embedding service, assuming one is configured for the collection.
sort
controls ordering of results, hence which document is affected.
upsert
controls what to do when no documents are found.
Expand source code
@dataclass
class AsyncUpdateOne(AsyncBaseOperation):
    """
    Represents an `update_one` operation on a (async) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select a target document.
        update: an update prescription to apply to the document.
        vector: a vector of numbers to use for ANN (vector-search) sorting.
        vectorize: a string to be made into a vector, with the same result as the
            `vector` attribute, through an embedding service, assuming one is
            configured for the collection.
        sort: controls ordering of results, hence which document is affected.
        upsert: controls what to do when no documents are found.
    """

    filter: Dict[str, Any]
    update: Dict[str, Any]
    vector: Optional[VectorType]
    vectorize: Optional[str]
    sort: Optional[SortType]
    upsert: bool

    def __init__(
        self,
        filter: Dict[str, Any],
        update: Dict[str, Any],
        *,
        vector: Optional[VectorType] = None,
        vectorize: Optional[str] = None,
        sort: Optional[SortType] = None,
        upsert: bool = False,
    ) -> None:
        self.filter = filter
        self.update = update
        self.vector = vector
        self.vectorize = vectorize
        self.sort = sort
        self.upsert = upsert

    async def execute(
        self,
        collection: AsyncCollection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: UpdateResult = await collection.update_one(
            filter=self.filter,
            update=self.update,
            vector=self.vector,
            vectorize=self.vectorize,
            sort=self.sort,
            upsert=self.upsert,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)

Ancestors

Class variables

var filter : Dict[str, Any]
var sort : Optional[Dict[str, Any]]
var update : Dict[str, Any]
var upsert : bool
var vector : Optional[Iterable[float]]
var vectorize : Optional[str]

Methods

async def execute(self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: Optional[int]) ‑> BulkWriteResult

Execute this operation against a collection as part of a bulk write.

Args

collection
the collection this write targets.
insert_in_bulk_write
the index in the list of bulkoperations
Expand source code
async def execute(
    self,
    collection: AsyncCollection,
    index_in_bulk_write: int,
    bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
    """
    Execute this operation against a collection as part of a bulk write.

    Args:
        collection: the collection this write targets.
        insert_in_bulk_write: the index in the list of bulkoperations
    """

    op_result: UpdateResult = await collection.update_one(
        filter=self.filter,
        update=self.update,
        vector=self.vector,
        vectorize=self.vectorize,
        sort=self.sort,
        upsert=self.upsert,
        max_time_ms=bulk_write_timeout_ms,
    )
    return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class BaseOperation

Base class for all operations amenable to be used in bulk writes on (sync) collections.

Expand source code
class BaseOperation(ABC):
    """
    Base class for all operations amenable to be used
    in bulk writes on (sync) collections.
    """

    @abstractmethod
    def execute(
        self,
        collection: Collection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult: ...

Ancestors

  • abc.ABC

Subclasses

Methods

def execute(self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: Optional[int]) ‑> BulkWriteResult
Expand source code
@abstractmethod
def execute(
    self,
    collection: Collection,
    index_in_bulk_write: int,
    bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult: ...
class DeleteMany (filter: Dict[str, Any])

Represents a delete_many operation on a (sync) collection. See the documentation on the collection method for more information.

Attributes

filter
a filter condition to select target documents.
Expand source code
@dataclass
class DeleteMany(BaseOperation):
    """
    Represents a `delete_many` operation on a (sync) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select target documents.
    """

    filter: Dict[str, Any]

    def __init__(
        self,
        filter: Dict[str, Any],
    ) -> None:
        self.filter = filter

    def execute(
        self,
        collection: Collection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: DeleteResult = collection.delete_many(
            filter=self.filter, max_time_ms=bulk_write_timeout_ms
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)

Ancestors

Class variables

var filter : Dict[str, Any]

Methods

def execute(self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: Optional[int]) ‑> BulkWriteResult

Execute this operation against a collection as part of a bulk write.

Args

collection
the collection this write targets.
insert_in_bulk_write
the index in the list of bulkoperations
Expand source code
def execute(
    self,
    collection: Collection,
    index_in_bulk_write: int,
    bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
    """
    Execute this operation against a collection as part of a bulk write.

    Args:
        collection: the collection this write targets.
        insert_in_bulk_write: the index in the list of bulkoperations
    """

    op_result: DeleteResult = collection.delete_many(
        filter=self.filter, max_time_ms=bulk_write_timeout_ms
    )
    return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class DeleteOne (filter: Dict[str, Any], *, vector: Optional[VectorType] = None, vectorize: Optional[str] = None, sort: Optional[SortType] = None)

Represents a delete_one operation on a (sync) collection. See the documentation on the collection method for more information.

Attributes

filter
a filter condition to select a target document.
vector
a vector of numbers to use for ANN (vector-search) sorting.
vectorize
a string to be made into a vector, with the same result as the vector attribute, through an embedding service, assuming one is configured for the collection.
sort
controls ordering of results, hence which document is affected.
Expand source code
@dataclass
class DeleteOne(BaseOperation):
    """
    Represents a `delete_one` operation on a (sync) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select a target document.
        vector: a vector of numbers to use for ANN (vector-search) sorting.
        vectorize: a string to be made into a vector, with the same result as the
            `vector` attribute, through an embedding service, assuming one is
            configured for the collection.
        sort: controls ordering of results, hence which document is affected.
    """

    filter: Dict[str, Any]
    vector: Optional[VectorType]
    vectorize: Optional[str]
    sort: Optional[SortType]

    def __init__(
        self,
        filter: Dict[str, Any],
        *,
        vector: Optional[VectorType] = None,
        vectorize: Optional[str] = None,
        sort: Optional[SortType] = None,
    ) -> None:
        self.filter = filter
        self.vector = vector
        self.vectorize = vectorize
        self.sort = sort

    def execute(
        self,
        collection: Collection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: DeleteResult = collection.delete_one(
            filter=self.filter,
            vector=self.vector,
            vectorize=self.vectorize,
            sort=self.sort,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)

Ancestors

Class variables

var filter : Dict[str, Any]
var sort : Optional[Dict[str, Any]]
var vector : Optional[Iterable[float]]
var vectorize : Optional[str]

Methods

def execute(self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: Optional[int]) ‑> BulkWriteResult

Execute this operation against a collection as part of a bulk write.

Args

collection
the collection this write targets.
insert_in_bulk_write
the index in the list of bulkoperations
Expand source code
def execute(
    self,
    collection: Collection,
    index_in_bulk_write: int,
    bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
    """
    Execute this operation against a collection as part of a bulk write.

    Args:
        collection: the collection this write targets.
        insert_in_bulk_write: the index in the list of bulkoperations
    """

    op_result: DeleteResult = collection.delete_one(
        filter=self.filter,
        vector=self.vector,
        vectorize=self.vectorize,
        sort=self.sort,
        max_time_ms=bulk_write_timeout_ms,
    )
    return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class InsertMany (documents: Iterable[DocumentType], *, vectors: Optional[Iterable[Optional[VectorType]]] = None, vectorize: Optional[Iterable[Optional[str]]] = None, ordered: bool = True, chunk_size: Optional[int] = None, concurrency: Optional[int] = None)

Represents an insert_many operation on a (sync) collection. See the documentation on the collection method for more information.

Attributes

documents
the list document to insert.
vectors
an optional list of vectors to enrich the documents at insertion.
vectorize
an optional list of texts achieving the same effect as vectors except through an embedding service, if one is configured for the collection.
ordered
whether the inserts should be done in sequence.
chunk_size
how many documents to include in a single API request. Exceeding the server maximum allowed value results in an error. Leave it unspecified (recommended) to use the system default.
concurrency
maximum number of concurrent requests to the API at a given time. It cannot be more than one for ordered insertions.
Expand source code
@dataclass
class InsertMany(BaseOperation):
    """
    Represents an `insert_many` operation on a (sync) collection.
    See the documentation on the collection method for more information.

    Attributes:
        documents: the list document to insert.
        vectors: an optional list of vectors to enrich the documents at insertion.
        vectorize: an optional list of texts achieving the same effect as `vectors`
            except through an embedding service, if one is configured for the collection.
        ordered: whether the inserts should be done in sequence.
        chunk_size: how many documents to include in a single API request.
            Exceeding the server maximum allowed value results in an error.
            Leave it unspecified (recommended) to use the system default.
        concurrency: maximum number of concurrent requests to the API at
            a given time. It cannot be more than one for ordered insertions.
    """

    documents: Iterable[DocumentType]
    vectors: Optional[Iterable[Optional[VectorType]]]
    vectorize: Optional[Iterable[Optional[str]]]
    ordered: bool
    chunk_size: Optional[int]
    concurrency: Optional[int]

    def __init__(
        self,
        documents: Iterable[DocumentType],
        *,
        vectors: Optional[Iterable[Optional[VectorType]]] = None,
        vectorize: Optional[Iterable[Optional[str]]] = None,
        ordered: bool = True,
        chunk_size: Optional[int] = None,
        concurrency: Optional[int] = None,
    ) -> None:
        self.documents = documents
        self.ordered = ordered
        self.vectors = vectors
        self.vectorize = vectorize
        self.chunk_size = chunk_size
        self.concurrency = concurrency

    def execute(
        self,
        collection: Collection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: InsertManyResult = collection.insert_many(
            documents=self.documents,
            vectors=self.vectors,
            vectorize=self.vectorize,
            ordered=self.ordered,
            chunk_size=self.chunk_size,
            concurrency=self.concurrency,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)

Ancestors

Class variables

var chunk_size : Optional[int]
var concurrency : Optional[int]
var documents : Iterable[Dict[str, Any]]
var ordered : bool
var vectorize : Optional[Iterable[Optional[str]]]
var vectors : Optional[Iterable[Optional[Iterable[float]]]]

Methods

def execute(self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: Optional[int]) ‑> BulkWriteResult

Execute this operation against a collection as part of a bulk write.

Args

collection
the collection this write targets.
insert_in_bulk_write
the index in the list of bulkoperations
Expand source code
def execute(
    self,
    collection: Collection,
    index_in_bulk_write: int,
    bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
    """
    Execute this operation against a collection as part of a bulk write.

    Args:
        collection: the collection this write targets.
        insert_in_bulk_write: the index in the list of bulkoperations
    """

    op_result: InsertManyResult = collection.insert_many(
        documents=self.documents,
        vectors=self.vectors,
        vectorize=self.vectorize,
        ordered=self.ordered,
        chunk_size=self.chunk_size,
        concurrency=self.concurrency,
        max_time_ms=bulk_write_timeout_ms,
    )
    return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class InsertOne (document: DocumentType, *, vector: Optional[VectorType] = None, vectorize: Optional[str] = None)

Represents an insert_one operation on a (sync) collection. See the documentation on the collection method for more information.

Attributes

document
the document to insert.
vector
an optional suitable vector to enrich the document at insertion.
vectorize
a string to be made into a vector, with the same result as the vector attribute, through an embedding service, assuming one is configured for the collection.
Expand source code
@dataclass
class InsertOne(BaseOperation):
    """
    Represents an `insert_one` operation on a (sync) collection.
    See the documentation on the collection method for more information.

    Attributes:
        document: the document to insert.
        vector: an optional suitable vector to enrich the document at insertion.
        vectorize: a string to be made into a vector, with the same result as the
            `vector` attribute, through an embedding service, assuming one is
            configured for the collection.
    """

    document: DocumentType
    vector: Optional[VectorType]
    vectorize: Optional[str]

    def __init__(
        self,
        document: DocumentType,
        *,
        vector: Optional[VectorType] = None,
        vectorize: Optional[str] = None,
    ) -> None:
        self.document = document
        self.vector = vector
        self.vectorize = vectorize

    def execute(
        self,
        collection: Collection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: InsertOneResult = collection.insert_one(
            document=self.document,
            vector=self.vector,
            vectorize=self.vectorize,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)

Ancestors

Class variables

var document : Dict[str, Any]
var vector : Optional[Iterable[float]]
var vectorize : Optional[str]

Methods

def execute(self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: Optional[int]) ‑> BulkWriteResult

Execute this operation against a collection as part of a bulk write.

Args

collection
the collection this write targets.
insert_in_bulk_write
the index in the list of bulkoperations
Expand source code
def execute(
    self,
    collection: Collection,
    index_in_bulk_write: int,
    bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
    """
    Execute this operation against a collection as part of a bulk write.

    Args:
        collection: the collection this write targets.
        insert_in_bulk_write: the index in the list of bulkoperations
    """

    op_result: InsertOneResult = collection.insert_one(
        document=self.document,
        vector=self.vector,
        vectorize=self.vectorize,
        max_time_ms=bulk_write_timeout_ms,
    )
    return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class ReplaceOne (filter: Dict[str, Any], replacement: DocumentType, *, vector: Optional[VectorType] = None, vectorize: Optional[str] = None, sort: Optional[SortType] = None, upsert: bool = False)

Represents a replace_one operation on a (sync) collection. See the documentation on the collection method for more information.

Attributes

filter
a filter condition to select a target document.
replacement
the replacement document.
vector
a vector of numbers to use for ANN (vector-search) sorting.
vectorize
a string to be made into a vector, with the same result as the vector attribute, through an embedding service, assuming one is configured for the collection.
sort
controls ordering of results, hence which document is affected.
upsert
controls what to do when no documents are found.
Expand source code
@dataclass
class ReplaceOne(BaseOperation):
    """
    Represents a `replace_one` operation on a (sync) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select a target document.
        replacement: the replacement document.
        vector: a vector of numbers to use for ANN (vector-search) sorting.
        vectorize: a string to be made into a vector, with the same result as the
            `vector` attribute, through an embedding service, assuming one is
            configured for the collection.
        sort: controls ordering of results, hence which document is affected.
        upsert: controls what to do when no documents are found.
    """

    filter: Dict[str, Any]
    replacement: DocumentType
    vector: Optional[VectorType]
    vectorize: Optional[str]
    sort: Optional[SortType]
    upsert: bool

    def __init__(
        self,
        filter: Dict[str, Any],
        replacement: DocumentType,
        *,
        vector: Optional[VectorType] = None,
        vectorize: Optional[str] = None,
        sort: Optional[SortType] = None,
        upsert: bool = False,
    ) -> None:
        self.filter = filter
        self.replacement = replacement
        self.vector = vector
        self.vectorize = vectorize
        self.sort = sort
        self.upsert = upsert

    def execute(
        self,
        collection: Collection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: UpdateResult = collection.replace_one(
            filter=self.filter,
            replacement=self.replacement,
            vector=self.vector,
            vectorize=self.vectorize,
            sort=self.sort,
            upsert=self.upsert,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)

Ancestors

Class variables

var filter : Dict[str, Any]
var replacement : Dict[str, Any]
var sort : Optional[Dict[str, Any]]
var upsert : bool
var vector : Optional[Iterable[float]]
var vectorize : Optional[str]

Methods

def execute(self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: Optional[int]) ‑> BulkWriteResult

Execute this operation against a collection as part of a bulk write.

Args

collection
the collection this write targets.
insert_in_bulk_write
the index in the list of bulkoperations
Expand source code
def execute(
    self,
    collection: Collection,
    index_in_bulk_write: int,
    bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
    """
    Execute this operation against a collection as part of a bulk write.

    Args:
        collection: the collection this write targets.
        insert_in_bulk_write: the index in the list of bulkoperations
    """

    op_result: UpdateResult = collection.replace_one(
        filter=self.filter,
        replacement=self.replacement,
        vector=self.vector,
        vectorize=self.vectorize,
        sort=self.sort,
        upsert=self.upsert,
        max_time_ms=bulk_write_timeout_ms,
    )
    return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class UpdateMany (filter: Dict[str, Any], update: Dict[str, Any], *, upsert: bool = False)

Represents an update_many operation on a (sync) collection. See the documentation on the collection method for more information.

Attributes

filter
a filter condition to select target documents.
update
an update prescription to apply to the documents.
upsert
controls what to do when no documents are found.
Expand source code
@dataclass
class UpdateMany(BaseOperation):
    """
    Represents an `update_many` operation on a (sync) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select target documents.
        update: an update prescription to apply to the documents.
        upsert: controls what to do when no documents are found.
    """

    filter: Dict[str, Any]
    update: Dict[str, Any]
    upsert: bool

    def __init__(
        self,
        filter: Dict[str, Any],
        update: Dict[str, Any],
        *,
        upsert: bool = False,
    ) -> None:
        self.filter = filter
        self.update = update
        self.upsert = upsert

    def execute(
        self,
        collection: Collection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: UpdateResult = collection.update_many(
            filter=self.filter,
            update=self.update,
            upsert=self.upsert,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)

Ancestors

Class variables

var filter : Dict[str, Any]
var update : Dict[str, Any]
var upsert : bool

Methods

def execute(self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: Optional[int]) ‑> BulkWriteResult

Execute this operation against a collection as part of a bulk write.

Args

collection
the collection this write targets.
insert_in_bulk_write
the index in the list of bulkoperations
Expand source code
def execute(
    self,
    collection: Collection,
    index_in_bulk_write: int,
    bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
    """
    Execute this operation against a collection as part of a bulk write.

    Args:
        collection: the collection this write targets.
        insert_in_bulk_write: the index in the list of bulkoperations
    """

    op_result: UpdateResult = collection.update_many(
        filter=self.filter,
        update=self.update,
        upsert=self.upsert,
        max_time_ms=bulk_write_timeout_ms,
    )
    return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class UpdateOne (filter: Dict[str, Any], update: Dict[str, Any], *, vector: Optional[VectorType] = None, vectorize: Optional[str] = None, sort: Optional[SortType] = None, upsert: bool = False)

Represents an update_one operation on a (sync) collection. See the documentation on the collection method for more information.

Attributes

filter
a filter condition to select a target document.
update
an update prescription to apply to the document.
vector
a vector of numbers to use for ANN (vector-search) sorting.
vectorize
a string to be made into a vector, with the same result as the vector attribute, through an embedding service, assuming one is configured for the collection.
sort
controls ordering of results, hence which document is affected.
upsert
controls what to do when no documents are found.
Expand source code
@dataclass
class UpdateOne(BaseOperation):
    """
    Represents an `update_one` operation on a (sync) collection.
    See the documentation on the collection method for more information.

    Attributes:
        filter: a filter condition to select a target document.
        update: an update prescription to apply to the document.
        vector: a vector of numbers to use for ANN (vector-search) sorting.
        vectorize: a string to be made into a vector, with the same result as the
            `vector` attribute, through an embedding service, assuming one is
            configured for the collection.
        sort: controls ordering of results, hence which document is affected.
        upsert: controls what to do when no documents are found.
    """

    filter: Dict[str, Any]
    update: Dict[str, Any]
    vector: Optional[VectorType]
    vectorize: Optional[str]
    sort: Optional[SortType]
    upsert: bool

    def __init__(
        self,
        filter: Dict[str, Any],
        update: Dict[str, Any],
        *,
        vector: Optional[VectorType] = None,
        vectorize: Optional[str] = None,
        sort: Optional[SortType] = None,
        upsert: bool = False,
    ) -> None:
        self.filter = filter
        self.update = update
        self.vector = vector
        self.vectorize = vectorize
        self.sort = sort
        self.upsert = upsert

    def execute(
        self,
        collection: Collection,
        index_in_bulk_write: int,
        bulk_write_timeout_ms: Optional[int],
    ) -> BulkWriteResult:
        """
        Execute this operation against a collection as part of a bulk write.

        Args:
            collection: the collection this write targets.
            insert_in_bulk_write: the index in the list of bulkoperations
        """

        op_result: UpdateResult = collection.update_one(
            filter=self.filter,
            update=self.update,
            vector=self.vector,
            vectorize=self.vectorize,
            sort=self.sort,
            upsert=self.upsert,
            max_time_ms=bulk_write_timeout_ms,
        )
        return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)

Ancestors

Class variables

var filter : Dict[str, Any]
var sort : Optional[Dict[str, Any]]
var update : Dict[str, Any]
var upsert : bool
var vector : Optional[Iterable[float]]
var vectorize : Optional[str]

Methods

def execute(self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: Optional[int]) ‑> BulkWriteResult

Execute this operation against a collection as part of a bulk write.

Args

collection
the collection this write targets.
insert_in_bulk_write
the index in the list of bulkoperations
Expand source code
def execute(
    self,
    collection: Collection,
    index_in_bulk_write: int,
    bulk_write_timeout_ms: Optional[int],
) -> BulkWriteResult:
    """
    Execute this operation against a collection as part of a bulk write.

    Args:
        collection: the collection this write targets.
        insert_in_bulk_write: the index in the list of bulkoperations
    """

    op_result: UpdateResult = collection.update_one(
        filter=self.filter,
        update=self.update,
        vector=self.vector,
        vectorize=self.vectorize,
        sort=self.sort,
        upsert=self.upsert,
        max_time_ms=bulk_write_timeout_ms,
    )
    return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)