Module astrapy.operations
Expand source code
# Copyright DataStax, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from functools import reduce
from typing import Any, Iterable
from astrapy.collection import AsyncCollection, Collection
from astrapy.constants import DocumentType, SortType, VectorType
from astrapy.meta import check_deprecated_vector_ize
from astrapy.results import (
BulkWriteResult,
DeleteResult,
InsertManyResult,
InsertOneResult,
UpdateResult,
)
def reduce_bulk_write_results(results: list[BulkWriteResult]) -> BulkWriteResult:
"""
Reduce a list of bulk write results into a single one.
Args:
results: a list of BulkWriteResult instances.
Returns:
A new BulkWRiteResult object which summarized the whole input list.
"""
zero = BulkWriteResult.zero()
def _sum_results(r1: BulkWriteResult, r2: BulkWriteResult) -> BulkWriteResult:
bulk_api_results = {**r1.bulk_api_results, **r2.bulk_api_results}
deleted_count = r1.deleted_count + r2.deleted_count
inserted_count = r1.inserted_count + r2.inserted_count
matched_count = r1.matched_count + r2.matched_count
modified_count = r1.modified_count + r2.modified_count
upserted_count = r1.upserted_count + r2.upserted_count
upserted_ids = {**r1.upserted_ids, **r2.upserted_ids}
return BulkWriteResult(
bulk_api_results=bulk_api_results,
deleted_count=deleted_count,
inserted_count=inserted_count,
matched_count=matched_count,
modified_count=modified_count,
upserted_count=upserted_count,
upserted_ids=upserted_ids,
)
return reduce(_sum_results, results, zero)
class BaseOperation(ABC):
"""
Base class for all operations amenable to be used
in bulk writes on (sync) collections.
"""
@abstractmethod
def execute(
self,
collection: Collection,
index_in_bulk_write: int,
bulk_write_timeout_ms: int | None,
) -> BulkWriteResult: ...
@dataclass
class InsertOne(BaseOperation):
"""
Represents an `insert_one` operation on a (sync) collection.
See the documentation on the collection method for more information.
Attributes:
document: the document to insert.
vector: an optional suitable vector to enrich the document at insertion.
*DEPRECATED* (removal in 2.0). Use a `$vector` key in the document instead.
vectorize: a string to be made into a vector, with the same result as the
`vector` attribute, through an embedding service, assuming one is
configured for the collection.
*DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the documents instead.
"""
document: DocumentType
vector: VectorType | None
vectorize: str | None
def __init__(
self,
document: DocumentType,
*,
vector: VectorType | None = None,
vectorize: str | None = None,
) -> None:
self.document = document
check_deprecated_vector_ize(
vector=vector,
vectors=None,
vectorize=vectorize,
kind="insert",
)
self.vector = vector
self.vectorize = vectorize
def execute(
self,
collection: Collection,
index_in_bulk_write: int,
bulk_write_timeout_ms: int | None,
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: InsertOneResult = collection.insert_one(
document=self.document,
vector=self.vector,
vectorize=self.vectorize,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class InsertMany(BaseOperation):
"""
Represents an `insert_many` operation on a (sync) collection.
See the documentation on the collection method for more information.
Attributes:
documents: the list document to insert.
vectors: an optional list of vectors to enrich the documents at insertion.
*DEPRECATED* (removal in 2.0). Use a `$vector` key in the documents instead.
vectorize: an optional list of texts achieving the same effect as `vectors`
except through an embedding service, if one is configured for the collection.
*DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the documents instead.
ordered: whether the inserts should be done in sequence.
chunk_size: how many documents to include in a single API request.
Exceeding the server maximum allowed value results in an error.
Leave it unspecified (recommended) to use the system default.
concurrency: maximum number of concurrent requests to the API at
a given time. It cannot be more than one for ordered insertions.
"""
documents: Iterable[DocumentType]
vectors: Iterable[VectorType | None] | None
vectorize: Iterable[str | None] | None
ordered: bool
chunk_size: int | None
concurrency: int | None
def __init__(
self,
documents: Iterable[DocumentType],
*,
vectors: Iterable[VectorType | None] | None = None,
vectorize: Iterable[str | None] | None = None,
ordered: bool = True,
chunk_size: int | None = None,
concurrency: int | None = None,
) -> None:
self.documents = documents
self.ordered = ordered
check_deprecated_vector_ize(
vector=None,
vectors=vectors,
vectorize=vectorize,
kind="insert",
)
self.vectors = vectors
self.vectorize = vectorize
self.chunk_size = chunk_size
self.concurrency = concurrency
def execute(
self,
collection: Collection,
index_in_bulk_write: int,
bulk_write_timeout_ms: int | None,
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: InsertManyResult = collection.insert_many(
documents=self.documents,
vectors=self.vectors,
vectorize=self.vectorize,
ordered=self.ordered,
chunk_size=self.chunk_size,
concurrency=self.concurrency,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class UpdateOne(BaseOperation):
"""
Represents an `update_one` operation on a (sync) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select a target document.
update: an update prescription to apply to the document.
vector: a vector of numbers to use for ANN (vector-search) sorting.
*DEPRECATED* (removal in 2.0). Use a `$vector` key in the
sort clause dict instead.
vectorize: a string to be made into a vector, with the same result as the
`vector` attribute, through an embedding service, assuming one is
configured for the collection.
*DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the
sort clause dict instead.
sort: controls ordering of results, hence which document is affected.
upsert: controls what to do when no documents are found.
"""
filter: dict[str, Any]
update: dict[str, Any]
vector: VectorType | None
vectorize: str | None
sort: SortType | None
upsert: bool
def __init__(
self,
filter: dict[str, Any],
update: dict[str, Any],
*,
vector: VectorType | None = None,
vectorize: str | None = None,
sort: SortType | None = None,
upsert: bool = False,
) -> None:
self.filter = filter
self.update = update
check_deprecated_vector_ize(
vector=vector,
vectors=None,
vectorize=vectorize,
kind="find",
)
self.vector = vector
self.vectorize = vectorize
self.sort = sort
self.upsert = upsert
def execute(
self,
collection: Collection,
index_in_bulk_write: int,
bulk_write_timeout_ms: int | None,
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: UpdateResult = collection.update_one(
filter=self.filter,
update=self.update,
vector=self.vector,
vectorize=self.vectorize,
sort=self.sort,
upsert=self.upsert,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class UpdateMany(BaseOperation):
"""
Represents an `update_many` operation on a (sync) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select target documents.
update: an update prescription to apply to the documents.
upsert: controls what to do when no documents are found.
"""
filter: dict[str, Any]
update: dict[str, Any]
upsert: bool
def __init__(
self,
filter: dict[str, Any],
update: dict[str, Any],
*,
upsert: bool = False,
) -> None:
self.filter = filter
self.update = update
self.upsert = upsert
def execute(
self,
collection: Collection,
index_in_bulk_write: int,
bulk_write_timeout_ms: int | None,
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: UpdateResult = collection.update_many(
filter=self.filter,
update=self.update,
upsert=self.upsert,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class ReplaceOne(BaseOperation):
"""
Represents a `replace_one` operation on a (sync) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select a target document.
replacement: the replacement document.
vector: a vector of numbers to use for ANN (vector-search) sorting.
*DEPRECATED* (removal in 2.0). Use a `$vector` key in the
sort clause dict instead.
vectorize: a string to be made into a vector, with the same result as the
`vector` attribute, through an embedding service, assuming one is
configured for the collection.
*DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the
sort clause dict instead.
sort: controls ordering of results, hence which document is affected.
upsert: controls what to do when no documents are found.
"""
filter: dict[str, Any]
replacement: DocumentType
vector: VectorType | None
vectorize: str | None
sort: SortType | None
upsert: bool
def __init__(
self,
filter: dict[str, Any],
replacement: DocumentType,
*,
vector: VectorType | None = None,
vectorize: str | None = None,
sort: SortType | None = None,
upsert: bool = False,
) -> None:
self.filter = filter
self.replacement = replacement
check_deprecated_vector_ize(
vector=vector,
vectors=None,
vectorize=vectorize,
kind="find",
)
self.vector = vector
self.vectorize = vectorize
self.sort = sort
self.upsert = upsert
def execute(
self,
collection: Collection,
index_in_bulk_write: int,
bulk_write_timeout_ms: int | None,
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: UpdateResult = collection.replace_one(
filter=self.filter,
replacement=self.replacement,
vector=self.vector,
vectorize=self.vectorize,
sort=self.sort,
upsert=self.upsert,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class DeleteOne(BaseOperation):
"""
Represents a `delete_one` operation on a (sync) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select a target document.
vector: a vector of numbers to use for ANN (vector-search) sorting.
*DEPRECATED* (removal in 2.0). Use a `$vector` key in the
sort clause dict instead.
vectorize: a string to be made into a vector, with the same result as the
`vector` attribute, through an embedding service, assuming one is
configured for the collection.
*DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the
sort clause dict instead.
sort: controls ordering of results, hence which document is affected.
"""
filter: dict[str, Any]
vector: VectorType | None
vectorize: str | None
sort: SortType | None
def __init__(
self,
filter: dict[str, Any],
*,
vector: VectorType | None = None,
vectorize: str | None = None,
sort: SortType | None = None,
) -> None:
self.filter = filter
check_deprecated_vector_ize(
vector=vector,
vectors=None,
vectorize=vectorize,
kind="find",
)
self.vector = vector
self.vectorize = vectorize
self.sort = sort
def execute(
self,
collection: Collection,
index_in_bulk_write: int,
bulk_write_timeout_ms: int | None,
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: DeleteResult = collection.delete_one(
filter=self.filter,
vector=self.vector,
vectorize=self.vectorize,
sort=self.sort,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class DeleteMany(BaseOperation):
"""
Represents a `delete_many` operation on a (sync) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select target documents.
"""
filter: dict[str, Any]
def __init__(
self,
filter: dict[str, Any],
) -> None:
self.filter = filter
def execute(
self,
collection: Collection,
index_in_bulk_write: int,
bulk_write_timeout_ms: int | None,
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: DeleteResult = collection.delete_many(
filter=self.filter, max_time_ms=bulk_write_timeout_ms
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class AsyncBaseOperation(ABC):
"""
Base class for all operations amenable to be used
in bulk writes on (async) collections.
"""
@abstractmethod
async def execute(
self,
collection: AsyncCollection,
index_in_bulk_write: int,
bulk_write_timeout_ms: int | None,
) -> BulkWriteResult: ...
@dataclass
class AsyncInsertOne(AsyncBaseOperation):
"""
Represents an `insert_one` operation on a (async) collection.
See the documentation on the collection method for more information.
Attributes:
document: the document to insert.
vector: an optional suitable vector to enrich the document at insertion.
*DEPRECATED* (removal in 2.0). Use a `$vector` key in the document instead.
vectorize: a string to be made into a vector, with the same result as the
`vector` attribute, through an embedding service, assuming one is
configured for the collection.
*DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the document instead.
"""
document: DocumentType
vector: VectorType | None
vectorize: str | None
def __init__(
self,
document: DocumentType,
*,
vector: VectorType | None = None,
vectorize: str | None = None,
) -> None:
self.document = document
check_deprecated_vector_ize(
vector=vector,
vectors=None,
vectorize=vectorize,
kind="insert",
)
self.vector = vector
self.vectorize = vectorize
async def execute(
self,
collection: AsyncCollection,
index_in_bulk_write: int,
bulk_write_timeout_ms: int | None,
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: InsertOneResult = await collection.insert_one(
document=self.document,
vector=self.vector,
vectorize=self.vectorize,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class AsyncInsertMany(AsyncBaseOperation):
"""
Represents an `insert_many` operation on a (async) collection.
See the documentation on the collection method for more information.
Attributes:
documents: the list document to insert.
vectors: an optional list of vectors to enrich the documents at insertion.
*DEPRECATED* (removal in 2.0). Use a `$vector` key in the documents instead.
vectorize: an optional list of texts achieving the same effect as `vectors`
except through an embedding service, if one is configured for the collection.
*DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the documents instead.
ordered: whether the inserts should be done in sequence.
chunk_size: how many documents to include in a single API request.
Exceeding the server maximum allowed value results in an error.
Leave it unspecified (recommended) to use the system default.
concurrency: maximum number of concurrent requests to the API at
a given time. It cannot be more than one for ordered insertions.
"""
documents: Iterable[DocumentType]
vectors: Iterable[VectorType | None] | None
vectorize: Iterable[str | None] | None
ordered: bool
chunk_size: int | None
concurrency: int | None
def __init__(
self,
documents: Iterable[DocumentType],
*,
vectors: Iterable[VectorType | None] | None = None,
vectorize: Iterable[str | None] | None = None,
ordered: bool = True,
chunk_size: int | None = None,
concurrency: int | None = None,
) -> None:
self.documents = documents
check_deprecated_vector_ize(
vector=None,
vectors=vectors,
vectorize=vectorize,
kind="insert",
)
self.vectors = vectors
self.vectorize = vectorize
self.ordered = ordered
self.chunk_size = chunk_size
self.concurrency = concurrency
async def execute(
self,
collection: AsyncCollection,
index_in_bulk_write: int,
bulk_write_timeout_ms: int | None,
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: InsertManyResult = await collection.insert_many(
documents=self.documents,
vectors=self.vectors,
vectorize=self.vectorize,
ordered=self.ordered,
chunk_size=self.chunk_size,
concurrency=self.concurrency,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class AsyncUpdateOne(AsyncBaseOperation):
"""
Represents an `update_one` operation on a (async) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select a target document.
update: an update prescription to apply to the document.
vector: a vector of numbers to use for ANN (vector-search) sorting.
*DEPRECATED* (removal in 2.0). Use a `$vector` key in the
sort clause dict instead.
vectorize: a string to be made into a vector, with the same result as the
`vector` attribute, through an embedding service, assuming one is
configured for the collection.
*DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the
sort clause dict instead.
sort: controls ordering of results, hence which document is affected.
upsert: controls what to do when no documents are found.
"""
filter: dict[str, Any]
update: dict[str, Any]
vector: VectorType | None
vectorize: str | None
sort: SortType | None
upsert: bool
def __init__(
self,
filter: dict[str, Any],
update: dict[str, Any],
*,
vector: VectorType | None = None,
vectorize: str | None = None,
sort: SortType | None = None,
upsert: bool = False,
) -> None:
self.filter = filter
self.update = update
check_deprecated_vector_ize(
vector=vector,
vectors=None,
vectorize=vectorize,
kind="find",
)
self.vector = vector
self.vectorize = vectorize
self.sort = sort
self.upsert = upsert
async def execute(
self,
collection: AsyncCollection,
index_in_bulk_write: int,
bulk_write_timeout_ms: int | None,
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: UpdateResult = await collection.update_one(
filter=self.filter,
update=self.update,
vector=self.vector,
vectorize=self.vectorize,
sort=self.sort,
upsert=self.upsert,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class AsyncUpdateMany(AsyncBaseOperation):
"""
Represents an `update_many` operation on a (async) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select target documents.
update: an update prescription to apply to the documents.
upsert: controls what to do when no documents are found.
"""
filter: dict[str, Any]
update: dict[str, Any]
upsert: bool
def __init__(
self,
filter: dict[str, Any],
update: dict[str, Any],
*,
upsert: bool = False,
) -> None:
self.filter = filter
self.update = update
self.upsert = upsert
async def execute(
self,
collection: AsyncCollection,
index_in_bulk_write: int,
bulk_write_timeout_ms: int | None,
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: UpdateResult = await collection.update_many(
filter=self.filter,
update=self.update,
upsert=self.upsert,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class AsyncReplaceOne(AsyncBaseOperation):
"""
Represents a `replace_one` operation on a (async) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select a target document.
replacement: the replacement document.
vector: a vector of numbers to use for ANN (vector-search) sorting.
*DEPRECATED* (removal in 2.0). Use a `$vector` key in the
sort clause dict instead.
vectorize: a string to be made into a vector, with the same result as the
`vector` attribute, through an embedding service, assuming one is
configured for the collection.
*DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the
sort clause dict instead.
sort: controls ordering of results, hence which document is affected.
upsert: controls what to do when no documents are found.
"""
filter: dict[str, Any]
replacement: DocumentType
vector: VectorType | None
vectorize: str | None
sort: SortType | None
upsert: bool
def __init__(
self,
filter: dict[str, Any],
replacement: DocumentType,
*,
vector: VectorType | None = None,
vectorize: str | None = None,
sort: SortType | None = None,
upsert: bool = False,
) -> None:
self.filter = filter
self.replacement = replacement
check_deprecated_vector_ize(
vector=vector,
vectors=None,
vectorize=vectorize,
kind="find",
)
self.vector = vector
self.vectorize = vectorize
self.sort = sort
self.upsert = upsert
async def execute(
self,
collection: AsyncCollection,
index_in_bulk_write: int,
bulk_write_timeout_ms: int | None,
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: UpdateResult = await collection.replace_one(
filter=self.filter,
replacement=self.replacement,
vector=self.vector,
vectorize=self.vectorize,
sort=self.sort,
upsert=self.upsert,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class AsyncDeleteOne(AsyncBaseOperation):
"""
Represents a `delete_one` operation on a (async) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select a target document.
vector: a vector of numbers to use for ANN (vector-search) sorting.
*DEPRECATED* (removal in 2.0). Use a `$vector` key in the
sort clause dict instead.
vectorize: a string to be made into a vector, with the same result as the
`vector` attribute, through an embedding service, assuming one is
configured for the collection.
*DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the
sort clause dict instead.
sort: controls ordering of results, hence which document is affected.
"""
filter: dict[str, Any]
vector: VectorType | None
vectorize: str | None
sort: SortType | None
def __init__(
self,
filter: dict[str, Any],
*,
vector: VectorType | None = None,
vectorize: str | None = None,
sort: SortType | None = None,
) -> None:
self.filter = filter
check_deprecated_vector_ize(
vector=vector,
vectors=None,
vectorize=vectorize,
kind="find",
)
self.vector = vector
self.vectorize = vectorize
self.sort = sort
async def execute(
self,
collection: AsyncCollection,
index_in_bulk_write: int,
bulk_write_timeout_ms: int | None,
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: DeleteResult = await collection.delete_one(
filter=self.filter,
vector=self.vector,
vectorize=self.vectorize,
sort=self.sort,
max_time_ms=bulk_write_timeout_ms,
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
@dataclass
class AsyncDeleteMany(AsyncBaseOperation):
"""
Represents a `delete_many` operation on a (async) collection.
See the documentation on the collection method for more information.
Attributes:
filter: a filter condition to select target documents.
"""
filter: dict[str, Any]
def __init__(
self,
filter: dict[str, Any],
) -> None:
self.filter = filter
async def execute(
self,
collection: AsyncCollection,
index_in_bulk_write: int,
bulk_write_timeout_ms: int | None,
) -> BulkWriteResult:
"""
Execute this operation against a collection as part of a bulk write.
Args:
collection: the collection this write targets.
insert_in_bulk_write: the index in the list of bulkoperations
"""
op_result: DeleteResult = await collection.delete_many(
filter=self.filter, max_time_ms=bulk_write_timeout_ms
)
return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
Functions
def reduce_bulk_write_results(results: list[BulkWriteResult]) ‑> BulkWriteResult
-
Reduce a list of bulk write results into a single one.
Args
results
- a list of BulkWriteResult instances.
Returns
A new BulkWRiteResult object which summarized the whole input list.
Expand source code
def reduce_bulk_write_results(results: list[BulkWriteResult]) -> BulkWriteResult: """ Reduce a list of bulk write results into a single one. Args: results: a list of BulkWriteResult instances. Returns: A new BulkWRiteResult object which summarized the whole input list. """ zero = BulkWriteResult.zero() def _sum_results(r1: BulkWriteResult, r2: BulkWriteResult) -> BulkWriteResult: bulk_api_results = {**r1.bulk_api_results, **r2.bulk_api_results} deleted_count = r1.deleted_count + r2.deleted_count inserted_count = r1.inserted_count + r2.inserted_count matched_count = r1.matched_count + r2.matched_count modified_count = r1.modified_count + r2.modified_count upserted_count = r1.upserted_count + r2.upserted_count upserted_ids = {**r1.upserted_ids, **r2.upserted_ids} return BulkWriteResult( bulk_api_results=bulk_api_results, deleted_count=deleted_count, inserted_count=inserted_count, matched_count=matched_count, modified_count=modified_count, upserted_count=upserted_count, upserted_ids=upserted_ids, ) return reduce(_sum_results, results, zero)
Classes
class AsyncBaseOperation
-
Base class for all operations amenable to be used in bulk writes on (async) collections.
Expand source code
class AsyncBaseOperation(ABC): """ Base class for all operations amenable to be used in bulk writes on (async) collections. """ @abstractmethod async def execute( self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: ...
Ancestors
- abc.ABC
Subclasses
- AsyncDeleteMany
- AsyncDeleteOne
- AsyncInsertMany
- AsyncInsertOne
- AsyncReplaceOne
- AsyncUpdateMany
- AsyncUpdateOne
Methods
async def execute(self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None) ‑> BulkWriteResult
-
Expand source code
@abstractmethod async def execute( self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: ...
class AsyncDeleteMany (filter: dict[str, Any])
-
Represents a
delete_many
operation on a (async) collection. See the documentation on the collection method for more information.Attributes
filter
- a filter condition to select target documents.
Expand source code
@dataclass class AsyncDeleteMany(AsyncBaseOperation): """ Represents a `delete_many` operation on a (async) collection. See the documentation on the collection method for more information. Attributes: filter: a filter condition to select target documents. """ filter: dict[str, Any] def __init__( self, filter: dict[str, Any], ) -> None: self.filter = filter async def execute( self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: DeleteResult = await collection.delete_many( filter=self.filter, max_time_ms=bulk_write_timeout_ms ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
Ancestors
- AsyncBaseOperation
- abc.ABC
Class variables
var filter : dict[str, typing.Any]
Methods
async def execute(self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None) ‑> BulkWriteResult
-
Execute this operation against a collection as part of a bulk write.
Args
collection
- the collection this write targets.
insert_in_bulk_write
- the index in the list of bulkoperations
Expand source code
async def execute( self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: DeleteResult = await collection.delete_many( filter=self.filter, max_time_ms=bulk_write_timeout_ms ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class AsyncDeleteOne (filter: dict[str, Any], *, vector: VectorType | None = None, vectorize: str | None = None, sort: SortType | None = None)
-
Represents a
delete_one
operation on a (async) collection. See the documentation on the collection method for more information.Attributes
filter
- a filter condition to select a target document.
vector
- a vector of numbers to use for ANN (vector-search) sorting.
DEPRECATED (removal in 2.0). Use a
$vector
key in the sort clause dict instead. vectorize
- a string to be made into a vector, with the same result as the
vector
attribute, through an embedding service, assuming one is configured for the collection. DEPRECATED (removal in 2.0). Use a$vectorize
key in the sort clause dict instead. sort
- controls ordering of results, hence which document is affected.
Expand source code
@dataclass class AsyncDeleteOne(AsyncBaseOperation): """ Represents a `delete_one` operation on a (async) collection. See the documentation on the collection method for more information. Attributes: filter: a filter condition to select a target document. vector: a vector of numbers to use for ANN (vector-search) sorting. *DEPRECATED* (removal in 2.0). Use a `$vector` key in the sort clause dict instead. vectorize: a string to be made into a vector, with the same result as the `vector` attribute, through an embedding service, assuming one is configured for the collection. *DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the sort clause dict instead. sort: controls ordering of results, hence which document is affected. """ filter: dict[str, Any] vector: VectorType | None vectorize: str | None sort: SortType | None def __init__( self, filter: dict[str, Any], *, vector: VectorType | None = None, vectorize: str | None = None, sort: SortType | None = None, ) -> None: self.filter = filter check_deprecated_vector_ize( vector=vector, vectors=None, vectorize=vectorize, kind="find", ) self.vector = vector self.vectorize = vectorize self.sort = sort async def execute( self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: DeleteResult = await collection.delete_one( filter=self.filter, vector=self.vector, vectorize=self.vectorize, sort=self.sort, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
Ancestors
- AsyncBaseOperation
- abc.ABC
Class variables
var filter : dict[str, typing.Any]
var sort : Optional[Dict[str, Any]]
var vector : Optional[Iterable[float]]
var vectorize : str | None
Methods
async def execute(self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None) ‑> BulkWriteResult
-
Execute this operation against a collection as part of a bulk write.
Args
collection
- the collection this write targets.
insert_in_bulk_write
- the index in the list of bulkoperations
Expand source code
async def execute( self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: DeleteResult = await collection.delete_one( filter=self.filter, vector=self.vector, vectorize=self.vectorize, sort=self.sort, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class AsyncInsertMany (documents: Iterable[DocumentType], *, vectors: Iterable[VectorType | None] | None = None, vectorize: Iterable[str | None] | None = None, ordered: bool = True, chunk_size: int | None = None, concurrency: int | None = None)
-
Represents an
insert_many
operation on a (async) collection. See the documentation on the collection method for more information.Attributes
documents
- the list document to insert.
vectors
- an optional list of vectors to enrich the documents at insertion.
DEPRECATED (removal in 2.0). Use a
$vector
key in the documents instead. vectorize
- an optional list of texts achieving the same effect as
vectors
except through an embedding service, if one is configured for the collection. DEPRECATED (removal in 2.0). Use a$vectorize
key in the documents instead. ordered
- whether the inserts should be done in sequence.
chunk_size
- how many documents to include in a single API request. Exceeding the server maximum allowed value results in an error. Leave it unspecified (recommended) to use the system default.
concurrency
- maximum number of concurrent requests to the API at a given time. It cannot be more than one for ordered insertions.
Expand source code
@dataclass class AsyncInsertMany(AsyncBaseOperation): """ Represents an `insert_many` operation on a (async) collection. See the documentation on the collection method for more information. Attributes: documents: the list document to insert. vectors: an optional list of vectors to enrich the documents at insertion. *DEPRECATED* (removal in 2.0). Use a `$vector` key in the documents instead. vectorize: an optional list of texts achieving the same effect as `vectors` except through an embedding service, if one is configured for the collection. *DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the documents instead. ordered: whether the inserts should be done in sequence. chunk_size: how many documents to include in a single API request. Exceeding the server maximum allowed value results in an error. Leave it unspecified (recommended) to use the system default. concurrency: maximum number of concurrent requests to the API at a given time. It cannot be more than one for ordered insertions. """ documents: Iterable[DocumentType] vectors: Iterable[VectorType | None] | None vectorize: Iterable[str | None] | None ordered: bool chunk_size: int | None concurrency: int | None def __init__( self, documents: Iterable[DocumentType], *, vectors: Iterable[VectorType | None] | None = None, vectorize: Iterable[str | None] | None = None, ordered: bool = True, chunk_size: int | None = None, concurrency: int | None = None, ) -> None: self.documents = documents check_deprecated_vector_ize( vector=None, vectors=vectors, vectorize=vectorize, kind="insert", ) self.vectors = vectors self.vectorize = vectorize self.ordered = ordered self.chunk_size = chunk_size self.concurrency = concurrency async def execute( self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: InsertManyResult = await collection.insert_many( documents=self.documents, vectors=self.vectors, vectorize=self.vectorize, ordered=self.ordered, chunk_size=self.chunk_size, concurrency=self.concurrency, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
Ancestors
- AsyncBaseOperation
- abc.ABC
Class variables
var chunk_size : int | None
var concurrency : int | None
var documents : Iterable[Dict[str, Any]]
var ordered : bool
var vectorize : Optional[Iterable[str | None]]
var vectors : Optional[Iterable[Optional[Iterable[float]]]]
Methods
async def execute(self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None) ‑> BulkWriteResult
-
Execute this operation against a collection as part of a bulk write.
Args
collection
- the collection this write targets.
insert_in_bulk_write
- the index in the list of bulkoperations
Expand source code
async def execute( self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: InsertManyResult = await collection.insert_many( documents=self.documents, vectors=self.vectors, vectorize=self.vectorize, ordered=self.ordered, chunk_size=self.chunk_size, concurrency=self.concurrency, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class AsyncInsertOne (document: DocumentType, *, vector: VectorType | None = None, vectorize: str | None = None)
-
Represents an
insert_one
operation on a (async) collection. See the documentation on the collection method for more information.Attributes
document
- the document to insert.
vector
- an optional suitable vector to enrich the document at insertion.
DEPRECATED (removal in 2.0). Use a
$vector
key in the document instead. vectorize
- a string to be made into a vector, with the same result as the
vector
attribute, through an embedding service, assuming one is configured for the collection. DEPRECATED (removal in 2.0). Use a$vectorize
key in the document instead.
Expand source code
@dataclass class AsyncInsertOne(AsyncBaseOperation): """ Represents an `insert_one` operation on a (async) collection. See the documentation on the collection method for more information. Attributes: document: the document to insert. vector: an optional suitable vector to enrich the document at insertion. *DEPRECATED* (removal in 2.0). Use a `$vector` key in the document instead. vectorize: a string to be made into a vector, with the same result as the `vector` attribute, through an embedding service, assuming one is configured for the collection. *DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the document instead. """ document: DocumentType vector: VectorType | None vectorize: str | None def __init__( self, document: DocumentType, *, vector: VectorType | None = None, vectorize: str | None = None, ) -> None: self.document = document check_deprecated_vector_ize( vector=vector, vectors=None, vectorize=vectorize, kind="insert", ) self.vector = vector self.vectorize = vectorize async def execute( self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: InsertOneResult = await collection.insert_one( document=self.document, vector=self.vector, vectorize=self.vectorize, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
Ancestors
- AsyncBaseOperation
- abc.ABC
Class variables
var document : Dict[str, Any]
var vector : Optional[Iterable[float]]
var vectorize : str | None
Methods
async def execute(self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None) ‑> BulkWriteResult
-
Execute this operation against a collection as part of a bulk write.
Args
collection
- the collection this write targets.
insert_in_bulk_write
- the index in the list of bulkoperations
Expand source code
async def execute( self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: InsertOneResult = await collection.insert_one( document=self.document, vector=self.vector, vectorize=self.vectorize, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class AsyncReplaceOne (filter: dict[str, Any], replacement: DocumentType, *, vector: VectorType | None = None, vectorize: str | None = None, sort: SortType | None = None, upsert: bool = False)
-
Represents a
replace_one
operation on a (async) collection. See the documentation on the collection method for more information.Attributes
filter
- a filter condition to select a target document.
replacement
- the replacement document.
vector
- a vector of numbers to use for ANN (vector-search) sorting.
DEPRECATED (removal in 2.0). Use a
$vector
key in the sort clause dict instead. vectorize
- a string to be made into a vector, with the same result as the
vector
attribute, through an embedding service, assuming one is configured for the collection. DEPRECATED (removal in 2.0). Use a$vectorize
key in the sort clause dict instead. sort
- controls ordering of results, hence which document is affected.
upsert
- controls what to do when no documents are found.
Expand source code
@dataclass class AsyncReplaceOne(AsyncBaseOperation): """ Represents a `replace_one` operation on a (async) collection. See the documentation on the collection method for more information. Attributes: filter: a filter condition to select a target document. replacement: the replacement document. vector: a vector of numbers to use for ANN (vector-search) sorting. *DEPRECATED* (removal in 2.0). Use a `$vector` key in the sort clause dict instead. vectorize: a string to be made into a vector, with the same result as the `vector` attribute, through an embedding service, assuming one is configured for the collection. *DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the sort clause dict instead. sort: controls ordering of results, hence which document is affected. upsert: controls what to do when no documents are found. """ filter: dict[str, Any] replacement: DocumentType vector: VectorType | None vectorize: str | None sort: SortType | None upsert: bool def __init__( self, filter: dict[str, Any], replacement: DocumentType, *, vector: VectorType | None = None, vectorize: str | None = None, sort: SortType | None = None, upsert: bool = False, ) -> None: self.filter = filter self.replacement = replacement check_deprecated_vector_ize( vector=vector, vectors=None, vectorize=vectorize, kind="find", ) self.vector = vector self.vectorize = vectorize self.sort = sort self.upsert = upsert async def execute( self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: UpdateResult = await collection.replace_one( filter=self.filter, replacement=self.replacement, vector=self.vector, vectorize=self.vectorize, sort=self.sort, upsert=self.upsert, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
Ancestors
- AsyncBaseOperation
- abc.ABC
Class variables
var filter : dict[str, typing.Any]
var replacement : Dict[str, Any]
var sort : Optional[Dict[str, Any]]
var upsert : bool
var vector : Optional[Iterable[float]]
var vectorize : str | None
Methods
async def execute(self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None) ‑> BulkWriteResult
-
Execute this operation against a collection as part of a bulk write.
Args
collection
- the collection this write targets.
insert_in_bulk_write
- the index in the list of bulkoperations
Expand source code
async def execute( self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: UpdateResult = await collection.replace_one( filter=self.filter, replacement=self.replacement, vector=self.vector, vectorize=self.vectorize, sort=self.sort, upsert=self.upsert, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class AsyncUpdateMany (filter: dict[str, Any], update: dict[str, Any], *, upsert: bool = False)
-
Represents an
update_many
operation on a (async) collection. See the documentation on the collection method for more information.Attributes
filter
- a filter condition to select target documents.
update
- an update prescription to apply to the documents.
upsert
- controls what to do when no documents are found.
Expand source code
@dataclass class AsyncUpdateMany(AsyncBaseOperation): """ Represents an `update_many` operation on a (async) collection. See the documentation on the collection method for more information. Attributes: filter: a filter condition to select target documents. update: an update prescription to apply to the documents. upsert: controls what to do when no documents are found. """ filter: dict[str, Any] update: dict[str, Any] upsert: bool def __init__( self, filter: dict[str, Any], update: dict[str, Any], *, upsert: bool = False, ) -> None: self.filter = filter self.update = update self.upsert = upsert async def execute( self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: UpdateResult = await collection.update_many( filter=self.filter, update=self.update, upsert=self.upsert, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
Ancestors
- AsyncBaseOperation
- abc.ABC
Class variables
var filter : dict[str, typing.Any]
var update : dict[str, typing.Any]
var upsert : bool
Methods
async def execute(self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None) ‑> BulkWriteResult
-
Execute this operation against a collection as part of a bulk write.
Args
collection
- the collection this write targets.
insert_in_bulk_write
- the index in the list of bulkoperations
Expand source code
async def execute( self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: UpdateResult = await collection.update_many( filter=self.filter, update=self.update, upsert=self.upsert, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class AsyncUpdateOne (filter: dict[str, Any], update: dict[str, Any], *, vector: VectorType | None = None, vectorize: str | None = None, sort: SortType | None = None, upsert: bool = False)
-
Represents an
update_one
operation on a (async) collection. See the documentation on the collection method for more information.Attributes
filter
- a filter condition to select a target document.
update
- an update prescription to apply to the document.
vector
- a vector of numbers to use for ANN (vector-search) sorting.
DEPRECATED (removal in 2.0). Use a
$vector
key in the sort clause dict instead. vectorize
- a string to be made into a vector, with the same result as the
vector
attribute, through an embedding service, assuming one is configured for the collection. DEPRECATED (removal in 2.0). Use a$vectorize
key in the sort clause dict instead. sort
- controls ordering of results, hence which document is affected.
upsert
- controls what to do when no documents are found.
Expand source code
@dataclass class AsyncUpdateOne(AsyncBaseOperation): """ Represents an `update_one` operation on a (async) collection. See the documentation on the collection method for more information. Attributes: filter: a filter condition to select a target document. update: an update prescription to apply to the document. vector: a vector of numbers to use for ANN (vector-search) sorting. *DEPRECATED* (removal in 2.0). Use a `$vector` key in the sort clause dict instead. vectorize: a string to be made into a vector, with the same result as the `vector` attribute, through an embedding service, assuming one is configured for the collection. *DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the sort clause dict instead. sort: controls ordering of results, hence which document is affected. upsert: controls what to do when no documents are found. """ filter: dict[str, Any] update: dict[str, Any] vector: VectorType | None vectorize: str | None sort: SortType | None upsert: bool def __init__( self, filter: dict[str, Any], update: dict[str, Any], *, vector: VectorType | None = None, vectorize: str | None = None, sort: SortType | None = None, upsert: bool = False, ) -> None: self.filter = filter self.update = update check_deprecated_vector_ize( vector=vector, vectors=None, vectorize=vectorize, kind="find", ) self.vector = vector self.vectorize = vectorize self.sort = sort self.upsert = upsert async def execute( self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: UpdateResult = await collection.update_one( filter=self.filter, update=self.update, vector=self.vector, vectorize=self.vectorize, sort=self.sort, upsert=self.upsert, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
Ancestors
- AsyncBaseOperation
- abc.ABC
Class variables
var filter : dict[str, typing.Any]
var sort : Optional[Dict[str, Any]]
var update : dict[str, typing.Any]
var upsert : bool
var vector : Optional[Iterable[float]]
var vectorize : str | None
Methods
async def execute(self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None) ‑> BulkWriteResult
-
Execute this operation against a collection as part of a bulk write.
Args
collection
- the collection this write targets.
insert_in_bulk_write
- the index in the list of bulkoperations
Expand source code
async def execute( self, collection: AsyncCollection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: UpdateResult = await collection.update_one( filter=self.filter, update=self.update, vector=self.vector, vectorize=self.vectorize, sort=self.sort, upsert=self.upsert, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class BaseOperation
-
Base class for all operations amenable to be used in bulk writes on (sync) collections.
Expand source code
class BaseOperation(ABC): """ Base class for all operations amenable to be used in bulk writes on (sync) collections. """ @abstractmethod def execute( self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: ...
Ancestors
- abc.ABC
Subclasses
Methods
def execute(self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None) ‑> BulkWriteResult
-
Expand source code
@abstractmethod def execute( self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: ...
class DeleteMany (filter: dict[str, Any])
-
Represents a
delete_many
operation on a (sync) collection. See the documentation on the collection method for more information.Attributes
filter
- a filter condition to select target documents.
Expand source code
@dataclass class DeleteMany(BaseOperation): """ Represents a `delete_many` operation on a (sync) collection. See the documentation on the collection method for more information. Attributes: filter: a filter condition to select target documents. """ filter: dict[str, Any] def __init__( self, filter: dict[str, Any], ) -> None: self.filter = filter def execute( self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: DeleteResult = collection.delete_many( filter=self.filter, max_time_ms=bulk_write_timeout_ms ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
Ancestors
- BaseOperation
- abc.ABC
Class variables
var filter : dict[str, typing.Any]
Methods
def execute(self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None) ‑> BulkWriteResult
-
Execute this operation against a collection as part of a bulk write.
Args
collection
- the collection this write targets.
insert_in_bulk_write
- the index in the list of bulkoperations
Expand source code
def execute( self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: DeleteResult = collection.delete_many( filter=self.filter, max_time_ms=bulk_write_timeout_ms ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class DeleteOne (filter: dict[str, Any], *, vector: VectorType | None = None, vectorize: str | None = None, sort: SortType | None = None)
-
Represents a
delete_one
operation on a (sync) collection. See the documentation on the collection method for more information.Attributes
filter
- a filter condition to select a target document.
vector
- a vector of numbers to use for ANN (vector-search) sorting.
DEPRECATED (removal in 2.0). Use a
$vector
key in the sort clause dict instead. vectorize
- a string to be made into a vector, with the same result as the
vector
attribute, through an embedding service, assuming one is configured for the collection. DEPRECATED (removal in 2.0). Use a$vectorize
key in the sort clause dict instead. sort
- controls ordering of results, hence which document is affected.
Expand source code
@dataclass class DeleteOne(BaseOperation): """ Represents a `delete_one` operation on a (sync) collection. See the documentation on the collection method for more information. Attributes: filter: a filter condition to select a target document. vector: a vector of numbers to use for ANN (vector-search) sorting. *DEPRECATED* (removal in 2.0). Use a `$vector` key in the sort clause dict instead. vectorize: a string to be made into a vector, with the same result as the `vector` attribute, through an embedding service, assuming one is configured for the collection. *DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the sort clause dict instead. sort: controls ordering of results, hence which document is affected. """ filter: dict[str, Any] vector: VectorType | None vectorize: str | None sort: SortType | None def __init__( self, filter: dict[str, Any], *, vector: VectorType | None = None, vectorize: str | None = None, sort: SortType | None = None, ) -> None: self.filter = filter check_deprecated_vector_ize( vector=vector, vectors=None, vectorize=vectorize, kind="find", ) self.vector = vector self.vectorize = vectorize self.sort = sort def execute( self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: DeleteResult = collection.delete_one( filter=self.filter, vector=self.vector, vectorize=self.vectorize, sort=self.sort, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
Ancestors
- BaseOperation
- abc.ABC
Class variables
var filter : dict[str, typing.Any]
var sort : Optional[Dict[str, Any]]
var vector : Optional[Iterable[float]]
var vectorize : str | None
Methods
def execute(self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None) ‑> BulkWriteResult
-
Execute this operation against a collection as part of a bulk write.
Args
collection
- the collection this write targets.
insert_in_bulk_write
- the index in the list of bulkoperations
Expand source code
def execute( self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: DeleteResult = collection.delete_one( filter=self.filter, vector=self.vector, vectorize=self.vectorize, sort=self.sort, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class InsertMany (documents: Iterable[DocumentType], *, vectors: Iterable[VectorType | None] | None = None, vectorize: Iterable[str | None] | None = None, ordered: bool = True, chunk_size: int | None = None, concurrency: int | None = None)
-
Represents an
insert_many
operation on a (sync) collection. See the documentation on the collection method for more information.Attributes
documents
- the list document to insert.
vectors
- an optional list of vectors to enrich the documents at insertion.
DEPRECATED (removal in 2.0). Use a
$vector
key in the documents instead. vectorize
- an optional list of texts achieving the same effect as
vectors
except through an embedding service, if one is configured for the collection. DEPRECATED (removal in 2.0). Use a$vectorize
key in the documents instead. ordered
- whether the inserts should be done in sequence.
chunk_size
- how many documents to include in a single API request. Exceeding the server maximum allowed value results in an error. Leave it unspecified (recommended) to use the system default.
concurrency
- maximum number of concurrent requests to the API at a given time. It cannot be more than one for ordered insertions.
Expand source code
@dataclass class InsertMany(BaseOperation): """ Represents an `insert_many` operation on a (sync) collection. See the documentation on the collection method for more information. Attributes: documents: the list document to insert. vectors: an optional list of vectors to enrich the documents at insertion. *DEPRECATED* (removal in 2.0). Use a `$vector` key in the documents instead. vectorize: an optional list of texts achieving the same effect as `vectors` except through an embedding service, if one is configured for the collection. *DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the documents instead. ordered: whether the inserts should be done in sequence. chunk_size: how many documents to include in a single API request. Exceeding the server maximum allowed value results in an error. Leave it unspecified (recommended) to use the system default. concurrency: maximum number of concurrent requests to the API at a given time. It cannot be more than one for ordered insertions. """ documents: Iterable[DocumentType] vectors: Iterable[VectorType | None] | None vectorize: Iterable[str | None] | None ordered: bool chunk_size: int | None concurrency: int | None def __init__( self, documents: Iterable[DocumentType], *, vectors: Iterable[VectorType | None] | None = None, vectorize: Iterable[str | None] | None = None, ordered: bool = True, chunk_size: int | None = None, concurrency: int | None = None, ) -> None: self.documents = documents self.ordered = ordered check_deprecated_vector_ize( vector=None, vectors=vectors, vectorize=vectorize, kind="insert", ) self.vectors = vectors self.vectorize = vectorize self.chunk_size = chunk_size self.concurrency = concurrency def execute( self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: InsertManyResult = collection.insert_many( documents=self.documents, vectors=self.vectors, vectorize=self.vectorize, ordered=self.ordered, chunk_size=self.chunk_size, concurrency=self.concurrency, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
Ancestors
- BaseOperation
- abc.ABC
Class variables
var chunk_size : int | None
var concurrency : int | None
var documents : Iterable[Dict[str, Any]]
var ordered : bool
var vectorize : Optional[Iterable[str | None]]
var vectors : Optional[Iterable[Optional[Iterable[float]]]]
Methods
def execute(self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None) ‑> BulkWriteResult
-
Execute this operation against a collection as part of a bulk write.
Args
collection
- the collection this write targets.
insert_in_bulk_write
- the index in the list of bulkoperations
Expand source code
def execute( self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: InsertManyResult = collection.insert_many( documents=self.documents, vectors=self.vectors, vectorize=self.vectorize, ordered=self.ordered, chunk_size=self.chunk_size, concurrency=self.concurrency, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class InsertOne (document: DocumentType, *, vector: VectorType | None = None, vectorize: str | None = None)
-
Represents an
insert_one
operation on a (sync) collection. See the documentation on the collection method for more information.Attributes
document
- the document to insert.
vector
- an optional suitable vector to enrich the document at insertion.
DEPRECATED (removal in 2.0). Use a
$vector
key in the document instead. vectorize
- a string to be made into a vector, with the same result as the
vector
attribute, through an embedding service, assuming one is configured for the collection. DEPRECATED (removal in 2.0). Use a$vectorize
key in the documents instead.
Expand source code
@dataclass class InsertOne(BaseOperation): """ Represents an `insert_one` operation on a (sync) collection. See the documentation on the collection method for more information. Attributes: document: the document to insert. vector: an optional suitable vector to enrich the document at insertion. *DEPRECATED* (removal in 2.0). Use a `$vector` key in the document instead. vectorize: a string to be made into a vector, with the same result as the `vector` attribute, through an embedding service, assuming one is configured for the collection. *DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the documents instead. """ document: DocumentType vector: VectorType | None vectorize: str | None def __init__( self, document: DocumentType, *, vector: VectorType | None = None, vectorize: str | None = None, ) -> None: self.document = document check_deprecated_vector_ize( vector=vector, vectors=None, vectorize=vectorize, kind="insert", ) self.vector = vector self.vectorize = vectorize def execute( self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: InsertOneResult = collection.insert_one( document=self.document, vector=self.vector, vectorize=self.vectorize, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
Ancestors
- BaseOperation
- abc.ABC
Class variables
var document : Dict[str, Any]
var vector : Optional[Iterable[float]]
var vectorize : str | None
Methods
def execute(self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None) ‑> BulkWriteResult
-
Execute this operation against a collection as part of a bulk write.
Args
collection
- the collection this write targets.
insert_in_bulk_write
- the index in the list of bulkoperations
Expand source code
def execute( self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: InsertOneResult = collection.insert_one( document=self.document, vector=self.vector, vectorize=self.vectorize, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class ReplaceOne (filter: dict[str, Any], replacement: DocumentType, *, vector: VectorType | None = None, vectorize: str | None = None, sort: SortType | None = None, upsert: bool = False)
-
Represents a
replace_one
operation on a (sync) collection. See the documentation on the collection method for more information.Attributes
filter
- a filter condition to select a target document.
replacement
- the replacement document.
vector
- a vector of numbers to use for ANN (vector-search) sorting.
DEPRECATED (removal in 2.0). Use a
$vector
key in the sort clause dict instead. vectorize
- a string to be made into a vector, with the same result as the
vector
attribute, through an embedding service, assuming one is configured for the collection. DEPRECATED (removal in 2.0). Use a$vectorize
key in the sort clause dict instead. sort
- controls ordering of results, hence which document is affected.
upsert
- controls what to do when no documents are found.
Expand source code
@dataclass class ReplaceOne(BaseOperation): """ Represents a `replace_one` operation on a (sync) collection. See the documentation on the collection method for more information. Attributes: filter: a filter condition to select a target document. replacement: the replacement document. vector: a vector of numbers to use for ANN (vector-search) sorting. *DEPRECATED* (removal in 2.0). Use a `$vector` key in the sort clause dict instead. vectorize: a string to be made into a vector, with the same result as the `vector` attribute, through an embedding service, assuming one is configured for the collection. *DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the sort clause dict instead. sort: controls ordering of results, hence which document is affected. upsert: controls what to do when no documents are found. """ filter: dict[str, Any] replacement: DocumentType vector: VectorType | None vectorize: str | None sort: SortType | None upsert: bool def __init__( self, filter: dict[str, Any], replacement: DocumentType, *, vector: VectorType | None = None, vectorize: str | None = None, sort: SortType | None = None, upsert: bool = False, ) -> None: self.filter = filter self.replacement = replacement check_deprecated_vector_ize( vector=vector, vectors=None, vectorize=vectorize, kind="find", ) self.vector = vector self.vectorize = vectorize self.sort = sort self.upsert = upsert def execute( self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: UpdateResult = collection.replace_one( filter=self.filter, replacement=self.replacement, vector=self.vector, vectorize=self.vectorize, sort=self.sort, upsert=self.upsert, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
Ancestors
- BaseOperation
- abc.ABC
Class variables
var filter : dict[str, typing.Any]
var replacement : Dict[str, Any]
var sort : Optional[Dict[str, Any]]
var upsert : bool
var vector : Optional[Iterable[float]]
var vectorize : str | None
Methods
def execute(self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None) ‑> BulkWriteResult
-
Execute this operation against a collection as part of a bulk write.
Args
collection
- the collection this write targets.
insert_in_bulk_write
- the index in the list of bulkoperations
Expand source code
def execute( self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: UpdateResult = collection.replace_one( filter=self.filter, replacement=self.replacement, vector=self.vector, vectorize=self.vectorize, sort=self.sort, upsert=self.upsert, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class UpdateMany (filter: dict[str, Any], update: dict[str, Any], *, upsert: bool = False)
-
Represents an
update_many
operation on a (sync) collection. See the documentation on the collection method for more information.Attributes
filter
- a filter condition to select target documents.
update
- an update prescription to apply to the documents.
upsert
- controls what to do when no documents are found.
Expand source code
@dataclass class UpdateMany(BaseOperation): """ Represents an `update_many` operation on a (sync) collection. See the documentation on the collection method for more information. Attributes: filter: a filter condition to select target documents. update: an update prescription to apply to the documents. upsert: controls what to do when no documents are found. """ filter: dict[str, Any] update: dict[str, Any] upsert: bool def __init__( self, filter: dict[str, Any], update: dict[str, Any], *, upsert: bool = False, ) -> None: self.filter = filter self.update = update self.upsert = upsert def execute( self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: UpdateResult = collection.update_many( filter=self.filter, update=self.update, upsert=self.upsert, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
Ancestors
- BaseOperation
- abc.ABC
Class variables
var filter : dict[str, typing.Any]
var update : dict[str, typing.Any]
var upsert : bool
Methods
def execute(self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None) ‑> BulkWriteResult
-
Execute this operation against a collection as part of a bulk write.
Args
collection
- the collection this write targets.
insert_in_bulk_write
- the index in the list of bulkoperations
Expand source code
def execute( self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: UpdateResult = collection.update_many( filter=self.filter, update=self.update, upsert=self.upsert, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
class UpdateOne (filter: dict[str, Any], update: dict[str, Any], *, vector: VectorType | None = None, vectorize: str | None = None, sort: SortType | None = None, upsert: bool = False)
-
Represents an
update_one
operation on a (sync) collection. See the documentation on the collection method for more information.Attributes
filter
- a filter condition to select a target document.
update
- an update prescription to apply to the document.
vector
- a vector of numbers to use for ANN (vector-search) sorting.
DEPRECATED (removal in 2.0). Use a
$vector
key in the sort clause dict instead. vectorize
- a string to be made into a vector, with the same result as the
vector
attribute, through an embedding service, assuming one is configured for the collection. DEPRECATED (removal in 2.0). Use a$vectorize
key in the sort clause dict instead. sort
- controls ordering of results, hence which document is affected.
upsert
- controls what to do when no documents are found.
Expand source code
@dataclass class UpdateOne(BaseOperation): """ Represents an `update_one` operation on a (sync) collection. See the documentation on the collection method for more information. Attributes: filter: a filter condition to select a target document. update: an update prescription to apply to the document. vector: a vector of numbers to use for ANN (vector-search) sorting. *DEPRECATED* (removal in 2.0). Use a `$vector` key in the sort clause dict instead. vectorize: a string to be made into a vector, with the same result as the `vector` attribute, through an embedding service, assuming one is configured for the collection. *DEPRECATED* (removal in 2.0). Use a `$vectorize` key in the sort clause dict instead. sort: controls ordering of results, hence which document is affected. upsert: controls what to do when no documents are found. """ filter: dict[str, Any] update: dict[str, Any] vector: VectorType | None vectorize: str | None sort: SortType | None upsert: bool def __init__( self, filter: dict[str, Any], update: dict[str, Any], *, vector: VectorType | None = None, vectorize: str | None = None, sort: SortType | None = None, upsert: bool = False, ) -> None: self.filter = filter self.update = update check_deprecated_vector_ize( vector=vector, vectors=None, vectorize=vectorize, kind="find", ) self.vector = vector self.vectorize = vectorize self.sort = sort self.upsert = upsert def execute( self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: UpdateResult = collection.update_one( filter=self.filter, update=self.update, vector=self.vector, vectorize=self.vectorize, sort=self.sort, upsert=self.upsert, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)
Ancestors
- BaseOperation
- abc.ABC
Class variables
var filter : dict[str, typing.Any]
var sort : Optional[Dict[str, Any]]
var update : dict[str, typing.Any]
var upsert : bool
var vector : Optional[Iterable[float]]
var vectorize : str | None
Methods
def execute(self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None) ‑> BulkWriteResult
-
Execute this operation against a collection as part of a bulk write.
Args
collection
- the collection this write targets.
insert_in_bulk_write
- the index in the list of bulkoperations
Expand source code
def execute( self, collection: Collection, index_in_bulk_write: int, bulk_write_timeout_ms: int | None, ) -> BulkWriteResult: """ Execute this operation against a collection as part of a bulk write. Args: collection: the collection this write targets. insert_in_bulk_write: the index in the list of bulkoperations """ op_result: UpdateResult = collection.update_one( filter=self.filter, update=self.update, vector=self.vector, vectorize=self.vectorize, sort=self.sort, upsert=self.upsert, max_time_ms=bulk_write_timeout_ms, ) return op_result.to_bulk_write_result(index_in_bulk_write=index_in_bulk_write)