Vector Stores
Vector databases store vector data, which backs AI workloads like chatbots and Retrieval Augmented Generation.
Vector database components establish connections to existing vector databases or create in-memory vector stores for storing and retrieving vector data.
Vector database components are distinct from memory components, which are built specifically for storing and retrieving chat messages from external databases.
Use a vector store component in a flow
Vector databases can be populated from within Langflow with document ingestion pipelines, like the following.

This example uses the Astra DB vector store component. Your vector store component’s parameters and authentication may be different, but the document ingestion workflow is the same. A document is loaded from a local machine and chunked. The Astra DB vector store generates embeddings with the connected model component, and stores them in the connected Astra DB database.
This vector data can then be retrieved for workloads like Retrieval Augmented Generation.

The user’s chat input is embedded and compared to the vectors embedded during document ingestion for a similarity search. The results are output from the vector database component as a Data
object, and parsed into text. This text fills the {context}
variable in the Prompt component, which informs the Open AI model component’s responses.
Alternatively, connect the vector database component’s Retriever port to a retriever tool, and then to an agent component. This enables the agent to use your vector database as a tool and make decisions based on the available data.

Astra DB Vector Store
This component implements a Vector Store using Astra DB Serverless with search capabilities.
Parameters
Name | Display Name | Info |
---|---|---|
collection_name |
Collection Name |
The name of the collection within Astra DB Serverless where the vectors will be stored (required). |
token |
Astra DB Application Token |
Authentication token for accessing Astra DB Serverless (required). |
api_endpoint |
API Endpoint |
API endpoint URL for the Astra DB Serverless service (required). |
search_input |
Search Input |
Query string for similarity search. |
ingest_data |
Ingest Data |
Data to be ingested into the vector store. |
namespace |
Namespace |
Optional namespace within Astra DB Serverless to use for the collection. |
embedding_service |
Embedding Model or Astra Vectorize |
Determines whether to use an Embedding Model or Astra Vectorize for the collection. |
embedding |
Embedding Model |
Allows an embedding model configuration (when using Embedding Model). |
provider |
Vectorize Provider |
Provider for Astra Vectorize (when using Astra Vectorize). |
metric |
Metric |
Optional distance metric for vector comparisons. |
batch_size |
Batch Size |
Optional number of data to process in a single batch. |
setup_mode |
Setup Mode |
Configuration mode for setting up the vector store (options: "Sync", "Async", "Off", default: "Sync"). |
pre_delete_collection |
Pre Delete Collection |
Boolean flag to determine whether to delete the collection before creating a new one. |
number_of_results |
Number of Results |
Number of results to return in similarity search (default: 4). |
search_type |
Search Type |
Search type to use (options: "Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)"). |
search_score_threshold |
Search Score Threshold |
Minimum similarity score threshold for search results. |
search_filter |
Search Metadata Filter |
Optional dictionary of filters to apply to the search query. |
Name | Display Name | Info |
---|---|---|
vector_store |
Vector Store |
Built Astra DB Serverless vector store |
search_results |
Search Results |
Results of the similarity search as a list of Data objects |
Component code
astradb.py
from collections import defaultdict
from dataclasses import asdict, dataclass, field
from astrapy import AstraDBAdmin, DataAPIClient, Database
from astrapy.info import CollectionDescriptor
from langchain_astradb import AstraDBVectorStore, CollectionVectorServiceOptions
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers import docs_to_data
from langflow.inputs import FloatInput, NestedDictInput
from langflow.io import (
BoolInput,
DropdownInput,
HandleInput,
IntInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data
from langflow.utils.version import get_version_info
class AstraDBVectorStoreComponent(LCVectorStoreComponent):
display_name: str = "Astra DB"
description: str = "Ingest and search documents in Astra DB"
documentation: str = "https://docs.datastax.com/en/langflow/astra-components.html"
name = "AstraDB"
icon: str = "AstraDB"
_cached_vector_store: AstraDBVectorStore | None = None
@dataclass
class NewDatabaseInput:
functionality: str = "create"
fields: dict[str, dict] = field(
default_factory=lambda: {
"data": {
"node": {
"name": "create_database",
"description": "",
"display_name": "Create new database",
"field_order": ["new_database_name", "cloud_provider", "region"],
"template": {
"new_database_name": StrInput(
name="new_database_name",
display_name="Name",
info="Name of the new database to create in Astra DB.",
required=True,
),
"cloud_provider": DropdownInput(
name="cloud_provider",
display_name="Cloud provider",
info="Cloud provider for the new database.",
options=["Amazon Web Services", "Google Cloud Platform", "Microsoft Azure"],
required=True,
real_time_refresh=True,
),
"region": DropdownInput(
name="region",
display_name="Region",
info="Region for the new database.",
options=[],
required=True,
),
},
},
}
}
)
@dataclass
class NewCollectionInput:
functionality: str = "create"
fields: dict[str, dict] = field(
default_factory=lambda: {
"data": {
"node": {
"name": "create_collection",
"description": "",
"display_name": "Create new collection",
"field_order": [
"new_collection_name",
"embedding_generation_provider",
"embedding_generation_model",
"dimension",
],
"template": {
"new_collection_name": StrInput(
name="new_collection_name",
display_name="Name",
info="Name of the new collection to create in Astra DB.",
required=True,
),
"embedding_generation_provider": DropdownInput(
name="embedding_generation_provider",
display_name="Embedding generation method",
info="Provider to use for generating embeddings.",
real_time_refresh=True,
required=True,
options=["Bring your own", "Nvidia"],
),
"embedding_generation_model": DropdownInput(
name="embedding_generation_model",
display_name="Embedding model",
info="Model to use for generating embeddings.",
required=True,
options=[],
),
"dimension": IntInput(
name="dimension",
display_name="Dimensions (Required only for `Bring your own`)",
info="Dimensions of the embeddings to generate.",
required=False,
value=1024,
),
},
},
}
}
)
inputs = [
SecretStrInput(
name="token",
display_name="Astra DB Application Token",
info="Authentication token for accessing Astra DB.",
value="ASTRA_DB_APPLICATION_TOKEN",
required=True,
real_time_refresh=True,
input_types=[],
),
StrInput(
name="environment",
display_name="Environment",
info="The environment for the Astra DB API Endpoint.",
advanced=True,
real_time_refresh=True,
),
DropdownInput(
name="database_name",
display_name="Database",
info="The Database name for the Astra DB instance.",
required=True,
refresh_button=True,
real_time_refresh=True,
dialog_inputs=asdict(NewDatabaseInput()),
combobox=True,
),
StrInput(
name="api_endpoint",
display_name="Astra DB API Endpoint",
info="The API Endpoint for the Astra DB instance. Supercedes database selection.",
advanced=True,
),
DropdownInput(
name="collection_name",
display_name="Collection",
info="The name of the collection within Astra DB where the vectors will be stored.",
required=True,
refresh_button=True,
real_time_refresh=True,
dialog_inputs=asdict(NewCollectionInput()),
combobox=True,
advanced=True,
),
StrInput(
name="keyspace",
display_name="Keyspace",
info="Optional keyspace within Astra DB to use for the collection.",
advanced=True,
),
DropdownInput(
name="embedding_choice",
display_name="Embedding Model or Astra Vectorize",
info="Choose an embedding model or use Astra Vectorize.",
options=["Embedding Model", "Astra Vectorize"],
value="Embedding Model",
advanced=True,
real_time_refresh=True,
),
HandleInput(
name="embedding_model",
display_name="Embedding Model",
input_types=["Embeddings"],
info="Specify the Embedding Model. Not required for Astra Vectorize collections.",
required=False,
),
*LCVectorStoreComponent.inputs,
IntInput(
name="number_of_results",
display_name="Number of Search Results",
info="Number of search results to return.",
advanced=True,
value=4,
),
DropdownInput(
name="search_type",
display_name="Search Type",
info="Search type to use",
options=["Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)"],
value="Similarity",
advanced=True,
),
FloatInput(
name="search_score_threshold",
display_name="Search Score Threshold",
info="Minimum similarity score threshold for search results. "
"(when using 'Similarity with score threshold')",
value=0,
advanced=True,
),
NestedDictInput(
name="advanced_search_filter",
display_name="Search Metadata Filter",
info="Optional dictionary of filters to apply to the search query.",
advanced=True,
),
BoolInput(
name="autodetect_collection",
display_name="Autodetect Collection",
info="Boolean flag to determine whether to autodetect the collection.",
advanced=True,
value=True,
),
StrInput(
name="content_field",
display_name="Content Field",
info="Field to use as the text content field for the vector store.",
advanced=True,
),
StrInput(
name="deletion_field",
display_name="Deletion Based On Field",
info="When this parameter is provided, documents in the target collection with "
"metadata field values matching the input metadata field value will be deleted "
"before new data is loaded.",
advanced=True,
),
BoolInput(
name="ignore_invalid_documents",
display_name="Ignore Invalid Documents",
info="Boolean flag to determine whether to ignore invalid documents at runtime.",
advanced=True,
),
NestedDictInput(
name="astradb_vectorstore_kwargs",
display_name="AstraDBVectorStore Parameters",
info="Optional dictionary of additional parameters for the AstraDBVectorStore.",
advanced=True,
),
]
@classmethod
def map_cloud_providers(cls):
# TODO: Programmatically fetch the regions for each cloud provider
return {
"Amazon Web Services": {
"id": "aws",
"regions": ["us-east-2", "ap-south-1", "eu-west-1"],
},
"Google Cloud Platform": {
"id": "gcp",
"regions": ["us-east1"],
},
"Microsoft Azure": {
"id": "azure",
"regions": ["westus3"],
},
}
@classmethod
def get_vectorize_providers(cls, token: str, environment: str | None = None, api_endpoint: str | None = None):
try:
# Get the admin object
admin = AstraDBAdmin(token=token, environment=environment)
db_admin = admin.get_database_admin(api_endpoint=api_endpoint)
# Get the list of embedding providers
embedding_providers = db_admin.find_embedding_providers().as_dict()
vectorize_providers_mapping = {}
# Map the provider display name to the provider key and models
for provider_key, provider_data in embedding_providers["embeddingProviders"].items():
# Get the provider display name and models
display_name = provider_data["displayName"]
models = [model["name"] for model in provider_data["models"]]
# Build our mapping
vectorize_providers_mapping[display_name] = [provider_key, models]
# Sort the resulting dictionary
return defaultdict(list, dict(sorted(vectorize_providers_mapping.items())))
except Exception as e:
msg = f"Error fetching vectorize providers: {e}"
raise ValueError(msg) from e
@classmethod
async def create_database_api(
cls,
new_database_name: str,
cloud_provider: str,
region: str,
token: str,
environment: str | None = None,
keyspace: str | None = None,
):
client = DataAPIClient(token=token, environment=environment)
# Get the admin object
admin_client = client.get_admin(token=token)
# Call the create database function
return await admin_client.async_create_database(
name=new_database_name,
cloud_provider=cls.map_cloud_providers()[cloud_provider]["id"],
region=region,
keyspace=keyspace,
wait_until_active=False,
)
@classmethod
async def create_collection_api(
cls,
new_collection_name: str,
token: str,
api_endpoint: str,
environment: str | None = None,
keyspace: str | None = None,
dimension: int | None = None,
embedding_generation_provider: str | None = None,
embedding_generation_model: str | None = None,
):
# Create the data API client
client = DataAPIClient(token=token, environment=environment)
# Get the database object
database = client.get_async_database(api_endpoint=api_endpoint, token=token)
# Build vectorize options, if needed
vectorize_options = None
if not dimension:
vectorize_options = CollectionVectorServiceOptions(
provider=cls.get_vectorize_providers(
token=token, environment=environment, api_endpoint=api_endpoint
).get(embedding_generation_provider, [None, []])[0],
model_name=embedding_generation_model,
)
# Create the collection
return await database.create_collection(
name=new_collection_name,
keyspace=keyspace,
dimension=dimension,
service=vectorize_options,
)
@classmethod
def get_database_list_static(cls, token: str, environment: str | None = None):
client = DataAPIClient(token=token, environment=environment)
# Get the admin object
admin_client = client.get_admin(token=token)
# Get the list of databases
db_list = list(admin_client.list_databases())
# Set the environment properly
env_string = ""
if environment and environment != "prod":
env_string = f"-{environment}"
# Generate the api endpoint for each database
db_info_dict = {}
for db in db_list:
try:
# Get the API endpoint for the database
api_endpoint = f"https://{db.info.id}-{db.info.region}.apps.astra{env_string}.datastax.com"
# Get the number of collections
try:
num_collections = len(
list(
client.get_database(
api_endpoint=api_endpoint, token=token, keyspace=db.info.keyspace
).list_collection_names(keyspace=db.info.keyspace)
)
)
except Exception: # noqa: BLE001
num_collections = 0
if db.status != "PENDING":
continue
# Add the database to the dictionary
db_info_dict[db.info.name] = {
"api_endpoint": api_endpoint,
"collections": num_collections,
"status": db.status if db.status != "ACTIVE" else None,
}
except Exception: # noqa: BLE001, S110
pass
return db_info_dict
def get_database_list(self):
return self.get_database_list_static(token=self.token, environment=self.environment)
@classmethod
def get_api_endpoint_static(
cls,
token: str,
environment: str | None = None,
api_endpoint: str | None = None,
database_name: str | None = None,
):
# If the api_endpoint is set, return it
if api_endpoint:
return api_endpoint
# Check if the database_name is like a url
if database_name and database_name.startswith("https://"):
return database_name
# If the database is not set, nothing we can do.
if not database_name:
return None
# Grab the database object
db = cls.get_database_list_static(token=token, environment=environment).get(database_name)
if not db:
return None
# Otherwise, get the URL from the database list
return db.get("api_endpoint")
def get_api_endpoint(self):
return self.get_api_endpoint_static(
token=self.token,
environment=self.environment,
api_endpoint=self.api_endpoint,
database_name=self.database_name,
)
def get_keyspace(self):
keyspace = self.keyspace
if keyspace:
return keyspace.strip()
return None
def get_database_object(self, api_endpoint: str | None = None):
try:
client = DataAPIClient(token=self.token, environment=self.environment)
return client.get_database(
api_endpoint=api_endpoint or self.get_api_endpoint(),
token=self.token,
keyspace=self.get_keyspace(),
)
except Exception as e:
msg = f"Error fetching database object: {e}"
raise ValueError(msg) from e
def collection_data(self, collection_name: str, database: Database | None = None):
try:
if not database:
client = DataAPIClient(token=self.token, environment=self.environment)
database = client.get_database(
api_endpoint=self.get_api_endpoint(),
token=self.token,
keyspace=self.get_keyspace(),
)
collection = database.get_collection(collection_name, keyspace=self.get_keyspace())
return collection.estimated_document_count()
except Exception as e: # noqa: BLE001
self.log(f"Error checking collection data: {e}")
return None
def _initialize_database_options(self):
try:
return [
{
"name": name,
"status": info["status"],
"collections": info["collections"],
"api_endpoint": info["api_endpoint"],
"icon": "data",
}
for name, info in self.get_database_list().items()
]
except Exception as e:
msg = f"Error fetching database options: {e}"
raise ValueError(msg) from e
@classmethod
def get_provider_icon(cls, collection: CollectionDescriptor | None = None, provider_name: str | None = None) -> str:
# Get the provider name from the collection
provider_name = provider_name or (
collection.options.vector.service.provider
if collection and collection.options and collection.options.vector and collection.options.vector.service
else None
)
# If there is no provider, use the vector store icon
if not provider_name or provider_name == "bring your own":
return "vectorstores"
# Special case for certain models
# TODO: Add more icons
if provider_name == "nvidia":
return "NVIDIA"
if provider_name == "openai":
return "OpenAI"
# Title case on the provider for the icon if no special case
return provider_name.title()
def _initialize_collection_options(self, api_endpoint: str | None = None):
# Nothing to generate if we don't have an API endpoint yet
api_endpoint = api_endpoint or self.get_api_endpoint()
if not api_endpoint:
return []
# Retrieve the database object
database = self.get_database_object(api_endpoint=api_endpoint)
# Get the list of collections
collection_list = list(database.list_collections(keyspace=self.get_keyspace()))
# Return the list of collections and metadata associated
return [
{
"name": col.name,
"records": self.collection_data(collection_name=col.name, database=database),
"provider": (
col.options.vector.service.provider if col.options.vector and col.options.vector.service else None
),
"icon": self.get_provider_icon(collection=col),
"model": (
col.options.vector.service.model_name if col.options.vector and col.options.vector.service else None
),
}
for col in collection_list
]
def reset_provider_options(self, build_config: dict):
# Get the list of vectorize providers
vectorize_providers = self.get_vectorize_providers(
token=self.token,
environment=self.environment,
api_endpoint=build_config["api_endpoint"]["value"],
)
# Append a special case for Bring your own
vectorize_providers["Bring your own"] = [None, ["Bring your own"]]
# If the collection is set, allow user to see embedding options
build_config["collection_name"]["dialog_inputs"]["fields"]["data"]["node"]["template"][
"embedding_generation_provider"
]["options"] = ["Bring your own", "Nvidia", *[key for key in vectorize_providers if key != "Nvidia"]]
# For all not Bring your own or Nvidia providers, add metadata saying configure in Astra DB Portal
provider_options = build_config["collection_name"]["dialog_inputs"]["fields"]["data"]["node"]["template"][
"embedding_generation_provider"
]["options"]
# Go over each possible provider and add metadata to configure in Astra DB Portal
for provider in provider_options:
# Skip Bring your own and Nvidia, automatically configured
if provider in ["Bring your own", "Nvidia"]:
build_config["collection_name"]["dialog_inputs"]["fields"]["data"]["node"]["template"][
"embedding_generation_provider"
]["options_metadata"].append({"icon": self.get_provider_icon(provider_name=provider.lower())})
continue
# Add metadata to configure in Astra DB Portal
build_config["collection_name"]["dialog_inputs"]["fields"]["data"]["node"]["template"][
"embedding_generation_provider"
]["options_metadata"].append({" ": "Configure in Astra DB Portal"})
# And allow the user to see the models based on a selected provider
embedding_provider = build_config["collection_name"]["dialog_inputs"]["fields"]["data"]["node"]["template"][
"embedding_generation_provider"
]["value"]
# Set the options for the embedding model based on the provider
build_config["collection_name"]["dialog_inputs"]["fields"]["data"]["node"]["template"][
"embedding_generation_model"
]["options"] = vectorize_providers.get(embedding_provider, [[], []])[1]
return build_config
def reset_collection_list(self, build_config: dict):
# Get the list of options we have based on the token provided
collection_options = self._initialize_collection_options(api_endpoint=build_config["api_endpoint"]["value"])
# If we retrieved options based on the token, show the dropdown
build_config["collection_name"]["options"] = [col["name"] for col in collection_options]
build_config["collection_name"]["options_metadata"] = [
{k: v for k, v in col.items() if k not in ["name"]} for col in collection_options
]
# Reset the selected collection
if build_config["collection_name"]["value"] not in build_config["collection_name"]["options"]:
build_config["collection_name"]["value"] = ""
# If we have a database, collection name should not be advanced
build_config["collection_name"]["advanced"] = not build_config["database_name"]["value"]
return build_config
def reset_database_list(self, build_config: dict):
# Get the list of options we have based on the token provided
database_options = self._initialize_database_options()
# If we retrieved options based on the token, show the dropdown
build_config["database_name"]["options"] = [db["name"] for db in database_options]
build_config["database_name"]["options_metadata"] = [
{k: v for k, v in db.items() if k not in ["name"]} for db in database_options
]
# Reset the selected database
if build_config["database_name"]["value"] not in build_config["database_name"]["options"]:
build_config["database_name"]["value"] = ""
build_config["api_endpoint"]["value"] = ""
build_config["collection_name"]["advanced"] = True
# If we have a token, database name should not be advanced
build_config["database_name"]["advanced"] = not build_config["token"]["value"]
return build_config
def reset_build_config(self, build_config: dict):
# Reset the list of databases we have based on the token provided
build_config["database_name"]["options"] = []
build_config["database_name"]["options_metadata"] = []
build_config["database_name"]["value"] = ""
build_config["database_name"]["advanced"] = True
build_config["api_endpoint"]["value"] = ""
# Reset the list of collections and metadata associated
build_config["collection_name"]["options"] = []
build_config["collection_name"]["options_metadata"] = []
build_config["collection_name"]["value"] = ""
build_config["collection_name"]["advanced"] = True
return build_config
async def update_build_config(self, build_config: dict, field_value: str, field_name: str | None = None):
# Callback for database creation
if field_name == "database_name" and isinstance(field_value, dict) and "new_database_name" in field_value:
try:
await self.create_database_api(
new_database_name=field_value["new_database_name"],
token=self.token,
keyspace=self.get_keyspace(),
environment=self.environment,
cloud_provider=field_value["cloud_provider"],
region=field_value["region"],
)
except Exception as e:
msg = f"Error creating database: {e}"
raise ValueError(msg) from e
# Add the new database to the list of options
build_config["database_name"]["options"] = build_config["database_name"]["options"] + [
field_value["new_database_name"]
]
build_config["database_name"]["options_metadata"] = build_config["database_name"]["options_metadata"] + [
{"status": "PENDING"}
]
return self.reset_collection_list(build_config)
# This is the callback required to update the list of regions for a cloud provider
if field_name == "database_name" and isinstance(field_value, dict) and "new_database_name" not in field_value:
cloud_provider = field_value["cloud_provider"]
build_config["database_name"]["dialog_inputs"]["fields"]["data"]["node"]["template"]["region"][
"options"
] = self.map_cloud_providers()[cloud_provider]["regions"]
return build_config
# Callback for the creation of collections
if field_name == "collection_name" and isinstance(field_value, dict) and "new_collection_name" in field_value:
try:
# Get the dimension if its a BYO provider
dimension = (
field_value["dimension"]
if field_value["embedding_generation_provider"] == "Bring your own"
else None
)
# Create the collection
await self.create_collection_api(
new_collection_name=field_value["new_collection_name"],
token=self.token,
api_endpoint=build_config["api_endpoint"]["value"],
environment=self.environment,
keyspace=self.get_keyspace(),
dimension=dimension,
embedding_generation_provider=field_value["embedding_generation_provider"],
embedding_generation_model=field_value["embedding_generation_model"],
)
except Exception as e:
msg = f"Error creating collection: {e}"
raise ValueError(msg) from e
# Add the new collection to the list of options
build_config["collection_name"]["value"] = field_value["new_collection_name"]
build_config["collection_name"]["options"].append(field_value["new_collection_name"])
# Get the provider and model for the new collection
generation_provider = field_value["embedding_generation_provider"]
provider = generation_provider if generation_provider != "Bring your own" else None
generation_model = field_value["embedding_generation_model"]
model = generation_model if generation_model and generation_model != "Bring your own" else None
# Set the embedding choice
build_config["embedding_choice"]["value"] = "Astra Vectorize" if provider else "Embedding Model"
build_config["embedding_model"]["advanced"] = bool(provider)
# Add the new collection to the list of options
icon = "NVIDIA" if provider == "Nvidia" else "vectorstores"
build_config["collection_name"]["options_metadata"] = build_config["collection_name"][
"options_metadata"
] + [{"records": 0, "provider": provider, "icon": icon, "model": model}]
return build_config
# Callback to update the model list based on the embedding provider
if (
field_name == "collection_name"
and isinstance(field_value, dict)
and "new_collection_name" not in field_value
):
return self.reset_provider_options(build_config)
# When the component first executes, this is the update refresh call
first_run = field_name == "collection_name" and not field_value and not build_config["database_name"]["options"]
# If the token has not been provided, simply return the empty build config
if not self.token:
return self.reset_build_config(build_config)
# If this is the first execution of the component, reset and build database list
if first_run or field_name in ["token", "environment"]:
return self.reset_database_list(build_config)
# Refresh the collection name options
if field_name == "database_name" and not isinstance(field_value, dict):
# If missing, refresh the database options
if field_value not in build_config["database_name"]["options"]:
build_config = await self.update_build_config(build_config, field_value=self.token, field_name="token")
build_config["database_name"]["value"] = ""
else:
# Find the position of the selected database to align with metadata
index_of_name = build_config["database_name"]["options"].index(field_value)
# Initializing database condition
pending = build_config["database_name"]["options_metadata"][index_of_name]["status"] == "PENDING"
if pending:
return self.update_build_config(build_config, field_value=self.token, field_name="token")
# Set the API endpoint based on the selected database
build_config["api_endpoint"]["value"] = build_config["database_name"]["options_metadata"][
index_of_name
]["api_endpoint"]
# Reset the provider options
build_config = self.reset_provider_options(build_config)
# Reset the list of collections we have based on the token provided
return self.reset_collection_list(build_config)
# Hide embedding model option if opriona_metadata provider is not null
if field_name == "collection_name" and not isinstance(field_value, dict):
# Assume we will be autodetecting the collection:
build_config["autodetect_collection"]["value"] = True
# Reload the collection list
build_config = self.reset_collection_list(build_config)
# Set the options for collection name to be the field value if its a new collection
if field_value and field_value not in build_config["collection_name"]["options"]:
# Add the new collection to the list of options
build_config["collection_name"]["options"].append(field_value)
build_config["collection_name"]["options_metadata"].append(
{"records": 0, "provider": None, "icon": "", "model": None}
)
# Ensure that autodetect collection is set to False, since its a new collection
build_config["autodetect_collection"]["value"] = False
# If nothing is selected, can't detect provider - return
if not field_value:
return build_config
# Find the position of the selected collection to align with metadata
index_of_name = build_config["collection_name"]["options"].index(field_value)
value_of_provider = build_config["collection_name"]["options_metadata"][index_of_name]["provider"]
# If we were able to determine the Vectorize provider, set it accordingly
if value_of_provider:
build_config["embedding_model"]["advanced"] = True
build_config["embedding_choice"]["value"] = "Astra Vectorize"
else:
build_config["embedding_model"]["advanced"] = False
build_config["embedding_choice"]["value"] = "Embedding Model"
return build_config
return build_config
@check_cached_vector_store
def build_vector_store(self):
try:
from langchain_astradb import AstraDBVectorStore
except ImportError as e:
msg = (
"Could not import langchain Astra DB integration package. "
"Please install it with `pip install langchain-astradb`."
)
raise ImportError(msg) from e
# Get the embedding model and additional params
embedding_params = (
{"embedding": self.embedding_model}
if self.embedding_model and self.embedding_choice == "Embedding Model"
else {}
)
# Get the additional parameters
additional_params = self.astradb_vectorstore_kwargs or {}
# Get Langflow version and platform information
__version__ = get_version_info()["version"]
langflow_prefix = ""
# if os.getenv("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE": # TODO: More precise way of detecting
# langflow_prefix = "ds-"
# Get the database object
database = self.get_database_object()
autodetect = self.collection_name in database.list_collection_names() and self.autodetect_collection
# Bundle up the auto-detect parameters
autodetect_params = {
"autodetect_collection": autodetect,
"content_field": (
self.content_field
if self.content_field and embedding_params
else (
"page_content"
if embedding_params
and self.collection_data(collection_name=self.collection_name, database=database) == 0
else None
)
),
"ignore_invalid_documents": self.ignore_invalid_documents,
}
# Attempt to build the Vector Store object
try:
vector_store = AstraDBVectorStore(
# Astra DB Authentication Parameters
token=self.token,
api_endpoint=database.api_endpoint,
namespace=database.keyspace,
collection_name=self.collection_name,
environment=self.environment,
# Astra DB Usage Tracking Parameters
ext_callers=[(f"{langflow_prefix}langflow", __version__)],
# Astra DB Vector Store Parameters
**autodetect_params,
**embedding_params,
**additional_params,
)
except Exception as e:
msg = f"Error initializing AstraDBVectorStore: {e}"
raise ValueError(msg) from e
# Add documents to the vector store
self._add_documents_to_vector_store(vector_store)
return vector_store
def _add_documents_to_vector_store(self, vector_store) -> None:
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
msg = "Vector Store Inputs must be Data objects."
raise TypeError(msg)
if documents and self.deletion_field:
self.log(f"Deleting documents where {self.deletion_field}")
try:
database = self.get_database_object()
collection = database.get_collection(self.collection_name, keyspace=database.keyspace)
delete_values = list({doc.metadata[self.deletion_field] for doc in documents})
self.log(f"Deleting documents where {self.deletion_field} matches {delete_values}.")
collection.delete_many({f"metadata.{self.deletion_field}": {"$in": delete_values}})
except Exception as e:
msg = f"Error deleting documents from AstraDBVectorStore based on '{self.deletion_field}': {e}"
raise ValueError(msg) from e
if documents:
self.log(f"Adding {len(documents)} documents to the Vector Store.")
try:
vector_store.add_documents(documents)
except Exception as e:
msg = f"Error adding documents to AstraDBVectorStore: {e}"
raise ValueError(msg) from e
else:
self.log("No documents to add to the Vector Store.")
def _map_search_type(self) -> str:
search_type_mapping = {
"Similarity with score threshold": "similarity_score_threshold",
"MMR (Max Marginal Relevance)": "mmr",
}
return search_type_mapping.get(self.search_type, "similarity")
def _build_search_args(self):
query = self.search_query if isinstance(self.search_query, str) and self.search_query.strip() else None
if query:
args = {
"query": query,
"search_type": self._map_search_type(),
"k": self.number_of_results,
"score_threshold": self.search_score_threshold,
}
elif self.advanced_search_filter:
args = {
"n": self.number_of_results,
}
else:
return {}
filter_arg = self.advanced_search_filter or {}
if filter_arg:
args["filter"] = filter_arg
return args
def search_documents(self, vector_store=None) -> list[Data]:
vector_store = vector_store or self.build_vector_store()
self.log(f"Search input: {self.search_query}")
self.log(f"Search type: {self.search_type}")
self.log(f"Number of results: {self.number_of_results}")
try:
search_args = self._build_search_args()
except Exception as e:
msg = f"Error in AstraDBVectorStore._build_search_args: {e}"
raise ValueError(msg) from e
if not search_args:
self.log("No search input or filters provided. Skipping search.")
return []
docs = []
search_method = "search" if "query" in search_args else "metadata_search"
try:
self.log(f"Calling vector_store.{search_method} with args: {search_args}")
docs = getattr(vector_store, search_method)(**search_args)
except Exception as e:
msg = f"Error performing {search_method} in AstraDBVectorStore: {e}"
raise ValueError(msg) from e
self.log(f"Retrieved documents: {len(docs)}")
data = docs_to_data(docs)
self.log(f"Converted documents to data: {len(data)}")
self.status = data
return data
def get_retriever_kwargs(self):
search_args = self._build_search_args()
return {
"search_type": self._map_search_type(),
"search_kwargs": search_args,
}
Astra DB graph vector store
This component implements a Vector Store using Astra DB Serverless with graph capabilities.
Parameters
Name | Type | Description |
---|---|---|
token |
SecretString |
Authentication token for accessing Astra DB. |
api_endpoint |
SecretString |
API endpoint URL for the Astra DB service. |
collection_name |
String |
The name of the collection within Astra DB where vectors will be stored. |
embedding |
Handle |
Embedding model to use. |
search_input |
Multiline |
Input text for searching documents. |
ingest_data |
Data |
Data to be ingested into the vector store. |
Name | Type | Description |
---|---|---|
vector_store |
VectorStore |
An instance of AstraDBGraphVectorStore for storing and searching vectors. |
Component code
astradb_graph.py
import os
import orjson
from astrapy.admin import parse_api_endpoint
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers import docs_to_data
from langflow.inputs import (
BoolInput,
DictInput,
DropdownInput,
FloatInput,
HandleInput,
IntInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data
class AstraDBGraphVectorStoreComponent(LCVectorStoreComponent):
display_name: str = "Astra DB Graph"
description: str = "Implementation of Graph Vector Store using Astra DB"
name = "AstraDBGraph"
icon: str = "AstraDB"
inputs = [
SecretStrInput(
name="token",
display_name="Astra DB Application Token",
info="Authentication token for accessing Astra DB.",
value="ASTRA_DB_APPLICATION_TOKEN",
required=True,
advanced=os.getenv("ASTRA_ENHANCED", "false").lower() == "true",
),
SecretStrInput(
name="api_endpoint",
display_name="Database" if os.getenv("ASTRA_ENHANCED", "false").lower() == "true" else "API Endpoint",
info="API endpoint URL for the Astra DB service.",
value="ASTRA_DB_API_ENDPOINT",
required=True,
),
StrInput(
name="collection_name",
display_name="Collection Name",
info="The name of the collection within Astra DB where the vectors will be stored.",
required=True,
),
StrInput(
name="metadata_incoming_links_key",
display_name="Metadata incoming links key",
info="Metadata key used for incoming links.",
advanced=True,
),
*LCVectorStoreComponent.inputs,
StrInput(
name="keyspace",
display_name="Keyspace",
info="Optional keyspace within Astra DB to use for the collection.",
advanced=True,
),
HandleInput(
name="embedding_model",
display_name="Embedding Model",
input_types=["Embeddings"],
info="Allows an embedding model configuration.",
),
DropdownInput(
name="metric",
display_name="Metric",
info="Optional distance metric for vector comparisons in the vector store.",
options=["cosine", "dot_product", "euclidean"],
value="cosine",
advanced=True,
),
IntInput(
name="batch_size",
display_name="Batch Size",
info="Optional number of data to process in a single batch.",
advanced=True,
),
IntInput(
name="bulk_insert_batch_concurrency",
display_name="Bulk Insert Batch Concurrency",
info="Optional concurrency level for bulk insert operations.",
advanced=True,
),
IntInput(
name="bulk_insert_overwrite_concurrency",
display_name="Bulk Insert Overwrite Concurrency",
info="Optional concurrency level for bulk insert operations that overwrite existing data.",
advanced=True,
),
IntInput(
name="bulk_delete_concurrency",
display_name="Bulk Delete Concurrency",
info="Optional concurrency level for bulk delete operations.",
advanced=True,
),
DropdownInput(
name="setup_mode",
display_name="Setup Mode",
info="Configuration mode for setting up the vector store, with options like 'Sync', or 'Off'.",
options=["Sync", "Off"],
advanced=True,
value="Sync",
),
BoolInput(
name="pre_delete_collection",
display_name="Pre Delete Collection",
info="Boolean flag to determine whether to delete the collection before creating a new one.",
advanced=True,
value=False,
),
StrInput(
name="metadata_indexing_include",
display_name="Metadata Indexing Include",
info="Optional list of metadata fields to include in the indexing.",
advanced=True,
list=True,
),
StrInput(
name="metadata_indexing_exclude",
display_name="Metadata Indexing Exclude",
info="Optional list of metadata fields to exclude from the indexing.",
advanced=True,
list=True,
),
StrInput(
name="collection_indexing_policy",
display_name="Collection Indexing Policy",
info='Optional JSON string for the "indexing" field of the collection. '
"See https://docs.datastax.com/en/astra-db-serverless/api-reference/collections.html#the-indexing-option",
advanced=True,
),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
advanced=True,
value=4,
),
DropdownInput(
name="search_type",
display_name="Search Type",
info="Search type to use",
options=[
"Similarity",
"Similarity with score threshold",
"MMR (Max Marginal Relevance)",
"Graph Traversal",
"MMR (Max Marginal Relevance) Graph Traversal",
],
value="MMR (Max Marginal Relevance) Graph Traversal",
advanced=True,
),
FloatInput(
name="search_score_threshold",
display_name="Search Score Threshold",
info="Minimum similarity score threshold for search results. "
"(when using 'Similarity with score threshold')",
value=0,
advanced=True,
),
DictInput(
name="search_filter",
display_name="Search Metadata Filter",
info="Optional dictionary of filters to apply to the search query.",
advanced=True,
is_list=True,
),
]
@check_cached_vector_store
def build_vector_store(self):
try:
from langchain_astradb import AstraDBGraphVectorStore
from langchain_astradb.utils.astradb import SetupMode
except ImportError as e:
msg = (
"Could not import langchain Astra DB integration package. "
"Please install it with `pip install langchain-astradb`."
)
raise ImportError(msg) from e
try:
if not self.setup_mode:
self.setup_mode = self._inputs["setup_mode"].options[0]
setup_mode_value = SetupMode[self.setup_mode.upper()]
except KeyError as e:
msg = f"Invalid setup mode: {self.setup_mode}"
raise ValueError(msg) from e
try:
self.log(f"Initializing Graph Vector Store {self.collection_name}")
vector_store = AstraDBGraphVectorStore(
embedding=self.embedding_model,
collection_name=self.collection_name,
metadata_incoming_links_key=self.metadata_incoming_links_key or "incoming_links",
token=self.token,
api_endpoint=self.api_endpoint,
namespace=self.keyspace or None,
environment=parse_api_endpoint(self.api_endpoint).environment if self.api_endpoint else None,
metric=self.metric or None,
batch_size=self.batch_size or None,
bulk_insert_batch_concurrency=self.bulk_insert_batch_concurrency or None,
bulk_insert_overwrite_concurrency=self.bulk_insert_overwrite_concurrency or None,
bulk_delete_concurrency=self.bulk_delete_concurrency or None,
setup_mode=setup_mode_value,
pre_delete_collection=self.pre_delete_collection,
metadata_indexing_include=[s for s in self.metadata_indexing_include if s] or None,
metadata_indexing_exclude=[s for s in self.metadata_indexing_exclude if s] or None,
collection_indexing_policy=orjson.loads(self.collection_indexing_policy.encode("utf-8"))
if self.collection_indexing_policy
else None,
)
except Exception as e:
msg = f"Error initializing AstraDBGraphVectorStore: {e}"
raise ValueError(msg) from e
self.log(f"Vector Store initialized: {vector_store.astra_env.collection_name}")
self._add_documents_to_vector_store(vector_store)
return vector_store
def _add_documents_to_vector_store(self, vector_store) -> None:
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
msg = "Vector Store Inputs must be Data objects."
raise TypeError(msg)
if documents:
self.log(f"Adding {len(documents)} documents to the Vector Store.")
try:
vector_store.add_documents(documents)
except Exception as e:
msg = f"Error adding documents to AstraDBGraphVectorStore: {e}"
raise ValueError(msg) from e
else:
self.log("No documents to add to the Vector Store.")
def _map_search_type(self) -> str:
match self.search_type:
case "Similarity":
return "similarity"
case "Similarity with score threshold":
return "similarity_score_threshold"
case "MMR (Max Marginal Relevance)":
return "mmr"
case "Graph Traversal":
return "traversal"
case "MMR (Max Marginal Relevance) Graph Traversal":
return "mmr_traversal"
case _:
return "similarity"
def _build_search_args(self):
args = {
"k": self.number_of_results,
"score_threshold": self.search_score_threshold,
}
if self.search_filter:
clean_filter = {k: v for k, v in self.search_filter.items() if k and v}
if len(clean_filter) > 0:
args["filter"] = clean_filter
return args
def search_documents(self, vector_store=None) -> list[Data]:
if not vector_store:
vector_store = self.build_vector_store()
self.log("Searching for documents in AstraDBGraphVectorStore.")
self.log(f"Search query: {self.search_query}")
self.log(f"Search type: {self.search_type}")
self.log(f"Number of results: {self.number_of_results}")
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
try:
search_type = self._map_search_type()
search_args = self._build_search_args()
docs = vector_store.search(query=self.search_query, search_type=search_type, **search_args)
# Drop links from the metadata. At this point the links don't add any value for building the
# context and haven't been restored to json which causes the conversion to fail.
self.log("Removing links from metadata.")
for doc in docs:
if "links" in doc.metadata:
doc.metadata.pop("links")
except Exception as e:
msg = f"Error performing search in AstraDBGraphVectorStore: {e}"
raise ValueError(msg) from e
self.log(f"Retrieved documents: {len(docs)}")
data = docs_to_data(docs)
self.log(f"Converted documents to data: {len(data)}")
self.status = data
return data
self.log("No search input provided. Skipping search.")
return []
def get_retriever_kwargs(self):
search_args = self._build_search_args()
return {
"search_type": self._map_search_type(),
"search_kwargs": search_args,
}
Cassandra
This component creates a Cassandra Vector Store with search capabilities. For more information, see the Cassandra documentation.
Parameters
Name | Type | Description |
---|---|---|
database_ref |
String |
Contact points for the database or AstraDB database ID |
username |
String |
Username for the database (leave empty for AstraDB) |
token |
SecretString |
User password for the database or AstraDB token |
keyspace |
String |
Table Keyspace or AstraDB namespace |
table_name |
String |
Name of the table or AstraDB collection |
ttl_seconds |
Integer |
Time-to-live for added texts |
batch_size |
Integer |
Number of data to process in a single batch |
setup_mode |
String |
Configuration mode for setting up the Cassandra table |
cluster_kwargs |
Dict |
Additional keyword arguments for the Cassandra cluster |
search_query |
String |
Query for similarity search |
ingest_data |
Data |
Data to be ingested into the vector store |
embedding |
Embeddings |
Embedding function to use |
number_of_results |
Integer |
Number of results to return in search |
search_type |
String |
Type of search to perform |
search_score_threshold |
Float |
Minimum similarity score for search results |
search_filter |
Dict |
Metadata filters for search query |
body_search |
String |
Document textual search terms |
enable_body_search |
Boolean |
Flag to enable body search |
Name | Type | Description |
---|---|---|
vector_store |
Cassandra |
Cassandra vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
cassandra.py
from langchain_community.vectorstores import Cassandra
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.inputs import BoolInput, DictInput, FloatInput
from langflow.io import (
DropdownInput,
HandleInput,
IntInput,
MessageTextInput,
SecretStrInput,
)
from langflow.schema import Data
class CassandraVectorStoreComponent(LCVectorStoreComponent):
display_name = "Cassandra"
description = "Cassandra Vector Store with search capabilities"
documentation = "https://python.langchain.com/docs/modules/data_connection/vectorstores/integrations/cassandra"
name = "Cassandra"
icon = "Cassandra"
inputs = [
MessageTextInput(
name="database_ref",
display_name="Contact Points / Astra Database ID",
info="Contact points for the database (or AstraDB database ID)",
required=True,
),
MessageTextInput(
name="username", display_name="Username", info="Username for the database (leave empty for AstraDB)."
),
SecretStrInput(
name="token",
display_name="Password / AstraDB Token",
info="User password for the database (or AstraDB token).",
required=True,
),
MessageTextInput(
name="keyspace",
display_name="Keyspace",
info="Table Keyspace (or AstraDB namespace).",
required=True,
),
MessageTextInput(
name="table_name",
display_name="Table Name",
info="The name of the table (or AstraDB collection) where vectors will be stored.",
required=True,
),
IntInput(
name="ttl_seconds",
display_name="TTL Seconds",
info="Optional time-to-live for the added texts.",
advanced=True,
),
IntInput(
name="batch_size",
display_name="Batch Size",
info="Optional number of data to process in a single batch.",
value=16,
advanced=True,
),
DropdownInput(
name="setup_mode",
display_name="Setup Mode",
info="Configuration mode for setting up the Cassandra table, with options like 'Sync', 'Async', or 'Off'.",
options=["Sync", "Async", "Off"],
value="Sync",
advanced=True,
),
DictInput(
name="cluster_kwargs",
display_name="Cluster arguments",
info="Optional dictionary of additional keyword arguments for the Cassandra cluster.",
advanced=True,
list=True,
),
*LCVectorStoreComponent.inputs,
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
DropdownInput(
name="search_type",
display_name="Search Type",
info="Search type to use",
options=["Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)"],
value="Similarity",
advanced=True,
),
FloatInput(
name="search_score_threshold",
display_name="Search Score Threshold",
info="Minimum similarity score threshold for search results. "
"(when using 'Similarity with score threshold')",
value=0,
advanced=True,
),
DictInput(
name="search_filter",
display_name="Search Metadata Filter",
info="Optional dictionary of filters to apply to the search query.",
advanced=True,
list=True,
),
MessageTextInput(
name="body_search",
display_name="Search Body",
info="Document textual search terms to apply to the search query.",
advanced=True,
),
BoolInput(
name="enable_body_search",
display_name="Enable Body Search",
info="Flag to enable body search. This must be enabled BEFORE the table is created.",
value=False,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> Cassandra:
try:
import cassio
from langchain_community.utilities.cassandra import SetupMode
except ImportError as e:
msg = "Could not import cassio integration package. Please install it with `pip install cassio`."
raise ImportError(msg) from e
from uuid import UUID
database_ref = self.database_ref
try:
UUID(self.database_ref)
is_astra = True
except ValueError:
is_astra = False
if "," in self.database_ref:
# use a copy because we can't change the type of the parameter
database_ref = self.database_ref.split(",")
if is_astra:
cassio.init(
database_id=database_ref,
token=self.token,
cluster_kwargs=self.cluster_kwargs,
)
else:
cassio.init(
contact_points=database_ref,
username=self.username,
password=self.token,
cluster_kwargs=self.cluster_kwargs,
)
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
body_index_options = [("index_analyzer", "STANDARD")] if self.enable_body_search else None
if self.setup_mode == "Off":
setup_mode = SetupMode.OFF
elif self.setup_mode == "Sync":
setup_mode = SetupMode.SYNC
else:
setup_mode = SetupMode.ASYNC
if documents:
self.log(f"Adding {len(documents)} documents to the Vector Store.")
table = Cassandra.from_documents(
documents=documents,
embedding=self.embedding,
table_name=self.table_name,
keyspace=self.keyspace,
ttl_seconds=self.ttl_seconds or None,
batch_size=self.batch_size,
body_index_options=body_index_options,
)
else:
self.log("No documents to add to the Vector Store.")
table = Cassandra(
embedding=self.embedding,
table_name=self.table_name,
keyspace=self.keyspace,
ttl_seconds=self.ttl_seconds or None,
body_index_options=body_index_options,
setup_mode=setup_mode,
)
return table
def _map_search_type(self) -> str:
if self.search_type == "Similarity with score threshold":
return "similarity_score_threshold"
if self.search_type == "MMR (Max Marginal Relevance)":
return "mmr"
return "similarity"
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
self.log(f"Search input: {self.search_query}")
self.log(f"Search type: {self.search_type}")
self.log(f"Number of results: {self.number_of_results}")
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
try:
search_type = self._map_search_type()
search_args = self._build_search_args()
self.log(f"Search args: {search_args}")
docs = vector_store.search(query=self.search_query, search_type=search_type, **search_args)
except KeyError as e:
if "content" in str(e):
msg = (
"You should ingest data through Langflow (or LangChain) to query it in Langflow. "
"Your collection does not contain a field name 'content'."
)
raise ValueError(msg) from e
raise
self.log(f"Retrieved documents: {len(docs)}")
data = docs_to_data(docs)
self.status = data
return data
return []
def _build_search_args(self):
args = {
"k": self.number_of_results,
"score_threshold": self.search_score_threshold,
}
if self.search_filter:
clean_filter = {k: v for k, v in self.search_filter.items() if k and v}
if len(clean_filter) > 0:
args["filter"] = clean_filter
if self.body_search:
if not self.enable_body_search:
msg = "You should enable body search when creating the table to search the body field."
raise ValueError(msg)
args["body_search"] = self.body_search
return args
def get_retriever_kwargs(self):
search_args = self._build_search_args()
return {
"search_type": self._map_search_type(),
"search_kwargs": search_args,
}
Cassandra Graph Vector Store
This component implements a Cassandra Graph Vector Store with search capabilities.
Parameters
Name | Display Name | Info |
---|---|---|
database_ref |
Contact Points / Astra Database ID |
Contact points for the database or AstraDB database ID (required). |
username |
Username |
Username for the database (leave empty for AstraDB). |
token |
Password / AstraDB Token |
User password for the database or AstraDB token (required). |
keyspace |
Keyspace |
Table Keyspace or AstraDB namespace (required). |
table_name |
Table Name |
The name of the table or AstraDB collection where vectors will be stored (required). |
setup_mode |
Setup Mode |
Configuration mode for setting up the Cassandra table (options: "Sync", "Off", default: "Sync"). |
cluster_kwargs |
Cluster arguments |
Optional dictionary of additional keyword arguments for the Cassandra cluster. |
search_query |
Search Query |
Query string for similarity search. |
ingest_data |
Ingest Data |
Data to be ingested into the vector store (list of Data objects). |
embedding |
Embedding |
Embedding model to use. |
number_of_results |
Number of Results |
Number of results to return in similarity search (default: 4). |
search_type |
Search Type |
Search type to use (options: "Traversal", "MMR traversal", "Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)", default: "Traversal"). |
depth |
Depth of traversal |
The maximum depth of edges to traverse (for "Traversal" or "MMR traversal" search types, default: 1). |
search_score_threshold |
Search Score Threshold |
Minimum similarity score threshold for search results (for "Similarity with score threshold" search type). |
search_filter |
Search Metadata Filter |
Optional dictionary of filters to apply to the search query. |
Name | Display Name | Info |
---|---|---|
vector_store |
Vector Store |
Built Cassandra Graph vector store. |
search_results |
Search Results |
Results of the similarity search as a list of Data objects. |
Component code
cassandra_graph.py
from uuid import UUID
from langchain_community.graph_vectorstores import CassandraGraphVectorStore
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.inputs import DictInput, FloatInput
from langflow.io import (
DropdownInput,
HandleInput,
IntInput,
MessageTextInput,
SecretStrInput,
)
from langflow.schema import Data
class CassandraGraphVectorStoreComponent(LCVectorStoreComponent):
display_name = "Cassandra Graph"
description = "Cassandra Graph Vector Store"
name = "CassandraGraph"
icon = "Cassandra"
inputs = [
MessageTextInput(
name="database_ref",
display_name="Contact Points / Astra Database ID",
info="Contact points for the database (or AstraDB database ID)",
required=True,
),
MessageTextInput(
name="username", display_name="Username", info="Username for the database (leave empty for AstraDB)."
),
SecretStrInput(
name="token",
display_name="Password / AstraDB Token",
info="User password for the database (or AstraDB token).",
required=True,
),
MessageTextInput(
name="keyspace",
display_name="Keyspace",
info="Table Keyspace (or AstraDB namespace).",
required=True,
),
MessageTextInput(
name="table_name",
display_name="Table Name",
info="The name of the table (or AstraDB collection) where vectors will be stored.",
required=True,
),
DropdownInput(
name="setup_mode",
display_name="Setup Mode",
info="Configuration mode for setting up the Cassandra table, with options like 'Sync' or 'Off'.",
options=["Sync", "Off"],
value="Sync",
advanced=True,
),
DictInput(
name="cluster_kwargs",
display_name="Cluster arguments",
info="Optional dictionary of additional keyword arguments for the Cassandra cluster.",
advanced=True,
list=True,
),
*LCVectorStoreComponent.inputs,
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
DropdownInput(
name="search_type",
display_name="Search Type",
info="Search type to use",
options=[
"Traversal",
"MMR traversal",
"Similarity",
"Similarity with score threshold",
"MMR (Max Marginal Relevance)",
],
value="Traversal",
advanced=True,
),
IntInput(
name="depth",
display_name="Depth of traversal",
info="The maximum depth of edges to traverse. (when using 'Traversal' or 'MMR traversal')",
value=1,
advanced=True,
),
FloatInput(
name="search_score_threshold",
display_name="Search Score Threshold",
info="Minimum similarity score threshold for search results. "
"(when using 'Similarity with score threshold')",
value=0,
advanced=True,
),
DictInput(
name="search_filter",
display_name="Search Metadata Filter",
info="Optional dictionary of filters to apply to the search query.",
advanced=True,
list=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> CassandraGraphVectorStore:
try:
import cassio
from langchain_community.utilities.cassandra import SetupMode
except ImportError as e:
msg = "Could not import cassio integration package. Please install it with `pip install cassio`."
raise ImportError(msg) from e
database_ref = self.database_ref
try:
UUID(self.database_ref)
is_astra = True
except ValueError:
is_astra = False
if "," in self.database_ref:
# use a copy because we can't change the type of the parameter
database_ref = self.database_ref.split(",")
if is_astra:
cassio.init(
database_id=database_ref,
token=self.token,
cluster_kwargs=self.cluster_kwargs,
)
else:
cassio.init(
contact_points=database_ref,
username=self.username,
password=self.token,
cluster_kwargs=self.cluster_kwargs,
)
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
setup_mode = SetupMode.OFF if self.setup_mode == "Off" else SetupMode.SYNC
if documents:
self.log(f"Adding {len(documents)} documents to the Vector Store.")
store = CassandraGraphVectorStore.from_documents(
documents=documents,
embedding=self.embedding,
node_table=self.table_name,
keyspace=self.keyspace,
)
else:
self.log("No documents to add to the Vector Store.")
store = CassandraGraphVectorStore(
embedding=self.embedding,
node_table=self.table_name,
keyspace=self.keyspace,
setup_mode=setup_mode,
)
return store
def _map_search_type(self) -> str:
if self.search_type == "Similarity":
return "similarity"
if self.search_type == "Similarity with score threshold":
return "similarity_score_threshold"
if self.search_type == "MMR (Max Marginal Relevance)":
return "mmr"
if self.search_type == "MMR Traversal":
return "mmr_traversal"
return "traversal"
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
self.log(f"Search input: {self.search_query}")
self.log(f"Search type: {self.search_type}")
self.log(f"Number of results: {self.number_of_results}")
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
try:
search_type = self._map_search_type()
search_args = self._build_search_args()
self.log(f"Search args: {search_args}")
docs = vector_store.search(query=self.search_query, search_type=search_type, **search_args)
except KeyError as e:
if "content" in str(e):
msg = (
"You should ingest data through Langflow (or LangChain) to query it in Langflow. "
"Your collection does not contain a field name 'content'."
)
raise ValueError(msg) from e
raise
self.log(f"Retrieved documents: {len(docs)}")
data = docs_to_data(docs)
self.status = data
return data
return []
def _build_search_args(self):
args = {
"k": self.number_of_results,
"score_threshold": self.search_score_threshold,
"depth": self.depth,
}
if self.search_filter:
clean_filter = {k: v for k, v in self.search_filter.items() if k and v}
if len(clean_filter) > 0:
args["filter"] = clean_filter
return args
def get_retriever_kwargs(self):
search_args = self._build_search_args()
return {
"search_type": self._map_search_type(),
"search_kwargs": search_args,
}
Chroma DB
This component creates a Chroma Vector Store with search capabilities. For more information, see the Chroma documentation.
Parameters
Name | Type | Description |
---|---|---|
collection_name |
String |
The name of the Chroma collection. Default: "langflow". |
persist_directory |
String |
The directory to persist the Chroma database. |
search_query |
String |
The query to search for in the vector store. |
ingest_data |
Data |
The data to ingest into the vector store (list of Data objects). |
embedding |
Embeddings |
The embedding function to use for the vector store. |
chroma_server_cors_allow_origins |
String |
CORS allow origins for the Chroma server. |
chroma_server_host |
String |
Host for the Chroma server. |
chroma_server_http_port |
Integer |
HTTP port for the Chroma server. |
chroma_server_grpc_port |
Integer |
gRPC port for the Chroma server. |
chroma_server_ssl_enabled |
Boolean |
Enable SSL for the Chroma server. |
allow_duplicates |
Boolean |
Allow duplicate documents in the vector store. |
search_type |
String |
Type of search to perform: "Similarity" or "MMR". |
number_of_results |
Integer |
Number of results to return from the search. Default: 10. |
limit |
Integer |
Limit the number of records to compare when Allow Duplicates is False. |
Component code
chroma.py
from copy import deepcopy
from chromadb.config import Settings
from langchain_chroma import Chroma
from typing_extensions import override
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.base.vectorstores.utils import chroma_collection_to_data
from langflow.io import BoolInput, DropdownInput, HandleInput, IntInput, StrInput
from langflow.schema import Data
class ChromaVectorStoreComponent(LCVectorStoreComponent):
"""Chroma Vector Store with search capabilities."""
display_name: str = "Chroma DB"
description: str = "Chroma Vector Store with search capabilities"
name = "Chroma"
icon = "Chroma"
inputs = [
StrInput(
name="collection_name",
display_name="Collection Name",
value="langflow",
),
StrInput(
name="persist_directory",
display_name="Persist Directory",
),
*LCVectorStoreComponent.inputs,
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
StrInput(
name="chroma_server_cors_allow_origins",
display_name="Server CORS Allow Origins",
advanced=True,
),
StrInput(
name="chroma_server_host",
display_name="Server Host",
advanced=True,
),
IntInput(
name="chroma_server_http_port",
display_name="Server HTTP Port",
advanced=True,
),
IntInput(
name="chroma_server_grpc_port",
display_name="Server gRPC Port",
advanced=True,
),
BoolInput(
name="chroma_server_ssl_enabled",
display_name="Server SSL Enabled",
advanced=True,
),
BoolInput(
name="allow_duplicates",
display_name="Allow Duplicates",
advanced=True,
info="If false, will not add documents that are already in the Vector Store.",
),
DropdownInput(
name="search_type",
display_name="Search Type",
options=["Similarity", "MMR"],
value="Similarity",
advanced=True,
),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
advanced=True,
value=10,
),
IntInput(
name="limit",
display_name="Limit",
advanced=True,
info="Limit the number of records to compare when Allow Duplicates is False.",
),
]
@override
@check_cached_vector_store
def build_vector_store(self) -> Chroma:
"""Builds the Chroma object."""
try:
from chromadb import Client
from langchain_chroma import Chroma
except ImportError as e:
msg = "Could not import Chroma integration package. Please install it with `pip install langchain-chroma`."
raise ImportError(msg) from e
# Chroma settings
chroma_settings = None
client = None
if self.chroma_server_host:
chroma_settings = Settings(
chroma_server_cors_allow_origins=self.chroma_server_cors_allow_origins or [],
chroma_server_host=self.chroma_server_host,
chroma_server_http_port=self.chroma_server_http_port or None,
chroma_server_grpc_port=self.chroma_server_grpc_port or None,
chroma_server_ssl_enabled=self.chroma_server_ssl_enabled,
)
client = Client(settings=chroma_settings)
# Check persist_directory and expand it if it is a relative path
persist_directory = self.resolve_path(self.persist_directory) if self.persist_directory is not None else None
chroma = Chroma(
persist_directory=persist_directory,
client=client,
embedding_function=self.embedding,
collection_name=self.collection_name,
)
self._add_documents_to_vector_store(chroma)
self.status = chroma_collection_to_data(chroma.get(limit=self.limit))
return chroma
def _add_documents_to_vector_store(self, vector_store: "Chroma") -> None:
"""Adds documents to the Vector Store."""
if not self.ingest_data:
self.status = ""
return
stored_documents_without_id = []
if self.allow_duplicates:
stored_data = []
else:
stored_data = chroma_collection_to_data(vector_store.get(limit=self.limit))
for value in deepcopy(stored_data):
del value.id
stored_documents_without_id.append(value)
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
if _input not in stored_documents_without_id:
documents.append(_input.to_lc_document())
else:
msg = "Vector Store Inputs must be Data objects."
raise TypeError(msg)
if documents and self.embedding is not None:
self.log(f"Adding {len(documents)} documents to the Vector Store.")
vector_store.add_documents(documents)
else:
self.log("No documents to add to the Vector Store.")
Clickhouse
This component implements a Clickhouse Vector Store with search capabilities using the LangChain framework.
Parameters
Name | Display Name | Info |
---|---|---|
host |
hostname |
Clickhouse server hostname (required, default: "localhost") |
port |
port |
Clickhouse server port (required, default: 8123) |
database |
database |
Clickhouse database name (required) |
table |
Table name |
Clickhouse table name (required) |
username |
The ClickHouse user name. |
Username for authentication (required) |
password |
The password for username. |
Password for authentication (required) |
index_type |
index_type |
Type of the index (options: "annoy", "vector_similarity", default: "annoy") |
metric |
metric |
Metric to compute distance (options: "angular", "euclidean", "manhattan", "hamming", "dot", default: "angular") |
secure |
Use https/TLS |
Overrides inferred values from the interface or port arguments (default: false) |
index_param |
Param of the index |
Index parameters (default: "'L2Distance',100") |
index_query_params |
index query params |
Additional index query parameters |
search_query |
Search Query |
Query string for similarity search |
ingest_data |
Ingest Data |
Data to be ingested into the vector store |
embedding |
Embedding |
Embedding model to use |
number_of_results |
Number of Results |
Number of results to return in similarity search (default: 4) |
score_threshold |
Score threshold |
Threshold for similarity scores |
Name | Display Name | Info |
---|---|---|
vector_store |
Vector Store |
Built Clickhouse vector store |
search_results |
Search Results |
Results of the similarity search as a list of Data objects |
Component code
clickhouse.py
from langchain_community.vectorstores import Clickhouse, ClickhouseSettings
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.inputs import BoolInput, FloatInput
from langflow.io import (
DictInput,
DropdownInput,
HandleInput,
IntInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data
class ClickhouseVectorStoreComponent(LCVectorStoreComponent):
display_name = "Clickhouse"
description = "Clickhouse Vector Store with search capabilities"
name = "Clickhouse"
icon = "Clickhouse"
inputs = [
StrInput(name="host", display_name="hostname", required=True, value="localhost"),
IntInput(name="port", display_name="port", required=True, value=8123),
StrInput(name="database", display_name="database", required=True),
StrInput(name="table", display_name="Table name", required=True),
StrInput(name="username", display_name="The ClickHouse user name.", required=True),
SecretStrInput(name="password", display_name="The password for username.", required=True),
DropdownInput(
name="index_type",
display_name="index_type",
options=["annoy", "vector_similarity"],
info="Type of the index.",
value="annoy",
advanced=True,
),
DropdownInput(
name="metric",
display_name="metric",
options=["angular", "euclidean", "manhattan", "hamming", "dot"],
info="Metric to compute distance.",
value="angular",
advanced=True,
),
BoolInput(
name="secure",
display_name="Use https/TLS. This overrides inferred values from the interface or port arguments.",
value=False,
advanced=True,
),
StrInput(name="index_param", display_name="Param of the index", value="100,'L2Distance'", advanced=True),
DictInput(name="index_query_params", display_name="index query params", advanced=True),
*LCVectorStoreComponent.inputs,
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
FloatInput(name="score_threshold", display_name="Score threshold", advanced=True),
]
@check_cached_vector_store
def build_vector_store(self) -> Clickhouse:
try:
import clickhouse_connect
except ImportError as e:
msg = (
"Failed to import Clickhouse dependencies. "
"Install it using `pip install langflow[clickhouse-connect] --pre`"
)
raise ImportError(msg) from e
try:
client = clickhouse_connect.get_client(
host=self.host, port=self.port, username=self.username, password=self.password
)
client.command("SELECT 1")
except Exception as e:
msg = f"Failed to connect to Clickhouse: {e}"
raise ValueError(msg) from e
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
kwargs = {}
if self.index_param:
kwargs["index_param"] = self.index_param.split(",")
if self.index_query_params:
kwargs["index_query_params"] = self.index_query_params
settings = ClickhouseSettings(
table=self.table,
database=self.database,
host=self.host,
index_type=self.index_type,
metric=self.metric,
password=self.password,
port=self.port,
secure=self.secure,
username=self.username,
**kwargs,
)
if documents:
clickhouse_vs = Clickhouse.from_documents(documents=documents, embedding=self.embedding, config=settings)
else:
clickhouse_vs = Clickhouse(embedding=self.embedding, config=settings)
return clickhouse_vs
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
kwargs = {}
if self.score_threshold:
kwargs["score_threshold"] = self.score_threshold
docs = vector_store.similarity_search(query=self.search_query, k=self.number_of_results, **kwargs)
data = docs_to_data(docs)
self.status = data
return data
return []
Couchbase
This component creates a Couchbase Vector Store with search capabilities. For more information, see the Couchbase documentation.
Parameters
Name | Type | Description |
---|---|---|
couchbase_connection_string |
SecretString |
Couchbase Cluster connection string (required). |
couchbase_username |
String |
Couchbase username (required). |
couchbase_password |
SecretString |
Couchbase password (required). |
bucket_name |
String |
Name of the Couchbase bucket (required). |
scope_name |
String |
Name of the Couchbase scope (required). |
collection_name |
String |
Name of the Couchbase collection (required). |
index_name |
String |
Name of the Couchbase index (required). |
search_query |
String |
The query to search for in the vector store. |
ingest_data |
Data |
The data to ingest into the vector store (list of Data objects). |
embedding |
Embeddings |
The embedding function to use for the vector store. |
number_of_results |
Integer |
Number of results to return from the search. Default: 4 (advanced). |
Name | Type | Description |
---|---|---|
vector_store |
CouchbaseVectorStore |
A Couchbase vector store instance configured with the specified parameters. |
Component code
couchbase.py
from datetime import timedelta
from langchain_community.vectorstores import CouchbaseVectorStore
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import HandleInput, IntInput, SecretStrInput, StrInput
from langflow.schema import Data
class CouchbaseVectorStoreComponent(LCVectorStoreComponent):
display_name = "Couchbase"
description = "Couchbase Vector Store with search capabilities"
name = "Couchbase"
icon = "Couchbase"
inputs = [
SecretStrInput(
name="couchbase_connection_string", display_name="Couchbase Cluster connection string", required=True
),
StrInput(name="couchbase_username", display_name="Couchbase username", required=True),
SecretStrInput(name="couchbase_password", display_name="Couchbase password", required=True),
StrInput(name="bucket_name", display_name="Bucket Name", required=True),
StrInput(name="scope_name", display_name="Scope Name", required=True),
StrInput(name="collection_name", display_name="Collection Name", required=True),
StrInput(name="index_name", display_name="Index Name", required=True),
*LCVectorStoreComponent.inputs,
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> CouchbaseVectorStore:
try:
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions
except ImportError as e:
msg = "Failed to import Couchbase dependencies. Install it using `pip install langflow[couchbase] --pre`"
raise ImportError(msg) from e
try:
auth = PasswordAuthenticator(self.couchbase_username, self.couchbase_password)
options = ClusterOptions(auth)
cluster = Cluster(self.couchbase_connection_string, options)
cluster.wait_until_ready(timedelta(seconds=5))
except Exception as e:
msg = f"Failed to connect to Couchbase: {e}"
raise ValueError(msg) from e
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
if documents:
couchbase_vs = CouchbaseVectorStore.from_documents(
documents=documents,
cluster=cluster,
bucket_name=self.bucket_name,
scope_name=self.scope_name,
collection_name=self.collection_name,
embedding=self.embedding,
index_name=self.index_name,
)
else:
couchbase_vs = CouchbaseVectorStore(
cluster=cluster,
bucket_name=self.bucket_name,
scope_name=self.scope_name,
collection_name=self.collection_name,
embedding=self.embedding,
index_name=self.index_name,
)
return couchbase_vs
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
data = docs_to_data(docs)
self.status = data
return data
return []
Elasticsearch
This component creates an Elasticsearch Vector Store with search capabilities. For more information, see the Elasticsearch vector store documentation.
Parameters
Name | Type | Description |
---|---|---|
elasticsearch_url |
String |
URL for self-managed Elasticsearch deployments, such as http://localhost:9200. |
cloud_id |
SecretString |
Elastic Cloud ID for cloud deployments. |
index_name |
String |
The index name where vectors will be stored in Elasticsearch cluster. |
search_input |
Multiline |
Search query for retrieving documents. |
username |
String |
Elasticsearch username for authentication. |
password |
SecretString |
Elasticsearch password for authentication. |
ingest_data |
Data |
Data to be ingested into the vector store. |
embedding |
Handle |
Embedding model to use. |
search_type |
Dropdown |
Type of search to perform (similarity or mmr). |
number_of_results |
Integer |
Number of results to return. |
search_score_threshold |
Float |
Minimum similarity score threshold for search results. |
api_key |
SecretString |
API Key for Elastic Cloud authentication. |
Name | Type | Description |
---|---|---|
vector_store |
VectorStore |
An instance of ElasticsearchStore for storing and searching vectors. |
Component code
elasticsearch.py
from typing import Any
from langchain.schema import Document
from langchain_elasticsearch import ElasticsearchStore
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.io import (
DropdownInput,
FloatInput,
HandleInput,
IntInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data
class ElasticsearchVectorStoreComponent(LCVectorStoreComponent):
"""Elasticsearch Vector Store with with advanced, customizable search capabilities."""
display_name: str = "Elasticsearch"
description: str = "Elasticsearch Vector Store with with advanced, customizable search capabilities."
name = "Elasticsearch"
icon = "ElasticsearchStore"
inputs = [
StrInput(
name="elasticsearch_url",
display_name="Elasticsearch URL",
value="http://localhost:9200",
info="URL for self-managed Elasticsearch deployments (e.g., http://localhost:9200). "
"Do not use with Elastic Cloud deployments, use Elastic Cloud ID instead.",
),
SecretStrInput(
name="cloud_id",
display_name="Elastic Cloud ID",
value="",
info="Use this for Elastic Cloud deployments. Do not use together with 'Elasticsearch URL'.",
),
StrInput(
name="index_name",
display_name="Index Name",
value="langflow",
info="The index name where the vectors will be stored in Elasticsearch cluster.",
),
*LCVectorStoreComponent.inputs,
StrInput(
name="username",
display_name="Username",
value="",
advanced=False,
info=(
"Elasticsearch username (e.g., 'elastic'). "
"Required for both local and Elastic Cloud setups unless API keys are used."
),
),
SecretStrInput(
name="password",
display_name="Password",
value="",
advanced=False,
info=(
"Elasticsearch password for the specified user. "
"Required for both local and Elastic Cloud setups unless API keys are used."
),
),
HandleInput(
name="embedding",
display_name="Embedding",
input_types=["Embeddings"],
),
DropdownInput(
name="search_type",
display_name="Search Type",
options=["similarity", "mmr"],
value="similarity",
advanced=True,
),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
advanced=True,
value=4,
),
FloatInput(
name="search_score_threshold",
display_name="Search Score Threshold",
info="Minimum similarity score threshold for search results.",
value=0.0,
advanced=True,
),
SecretStrInput(
name="api_key",
display_name="Elastic API Key",
value="",
advanced=True,
info="API Key for Elastic Cloud authentication. If used, 'username' and 'password' are not required.",
),
]
@check_cached_vector_store
def build_vector_store(self) -> ElasticsearchStore:
"""Builds the Elasticsearch Vector Store object."""
if self.cloud_id and self.elasticsearch_url:
msg = (
"Both 'cloud_id' and 'elasticsearch_url' provided. "
"Please use only one based on your deployment (Cloud or Local)."
)
raise ValueError(msg)
es_params = {
"index_name": self.index_name,
"embedding": self.embedding,
"es_user": self.username or None,
"es_password": self.password or None,
}
if self.cloud_id:
es_params["es_cloud_id"] = self.cloud_id
else:
es_params["es_url"] = self.elasticsearch_url
if self.api_key:
es_params["api_key"] = self.api_key
elasticsearch = ElasticsearchStore(**es_params)
# If documents are provided, add them to the store
if self.ingest_data:
documents = self._prepare_documents()
if documents:
elasticsearch.add_documents(documents)
return elasticsearch
def _prepare_documents(self) -> list[Document]:
"""Prepares documents from the input data to add to the vector store."""
documents = []
for data in self.ingest_data:
if isinstance(data, Data):
documents.append(data.to_lc_document())
else:
error_message = "Vector Store Inputs must be Data objects."
self.log(error_message)
raise TypeError(error_message)
return documents
def _add_documents_to_vector_store(self, vector_store: "ElasticsearchStore") -> None:
"""Adds documents to the Vector Store."""
documents = self._prepare_documents()
if documents and self.embedding:
self.log(f"Adding {len(documents)} documents to the Vector Store.")
vector_store.add_documents(documents)
else:
self.log("No documents to add to the Vector Store.")
def search(self, query: str | None = None) -> list[dict[str, Any]]:
"""Search for similar documents in the vector store or retrieve all documents if no query is provided."""
vector_store = self.build_vector_store()
search_kwargs = {
"k": self.number_of_results,
"score_threshold": self.search_score_threshold,
}
if query:
search_type = self.search_type.lower()
if search_type not in {"similarity", "mmr"}:
msg = f"Invalid search type: {self.search_type}"
self.log(msg)
raise ValueError(msg)
try:
if search_type == "similarity":
results = vector_store.similarity_search_with_score(query, **search_kwargs)
elif search_type == "mmr":
results = vector_store.max_marginal_relevance_search(query, **search_kwargs)
except Exception as e:
msg = (
"Error occurred while querying the Elasticsearch VectorStore,"
" there is no Data into the VectorStore."
)
self.log(msg)
raise ValueError(msg) from e
return [
{"page_content": doc.page_content, "metadata": doc.metadata, "score": score} for doc, score in results
]
results = self.get_all_documents(vector_store, **search_kwargs)
return [{"page_content": doc.page_content, "metadata": doc.metadata, "score": score} for doc, score in results]
def get_all_documents(self, vector_store: ElasticsearchStore, **kwargs) -> list[tuple[Document, float]]:
"""Retrieve all documents from the vector store."""
client = vector_store.client
index_name = self.index_name
query = {
"query": {"match_all": {}},
"size": kwargs.get("k", self.number_of_results),
}
response = client.search(index=index_name, body=query)
results = []
for hit in response["hits"]["hits"]:
doc = Document(
page_content=hit["_source"].get("text", ""),
metadata=hit["_source"].get("metadata", {}),
)
score = hit["_score"]
results.append((doc, score))
return results
def search_documents(self) -> list[Data]:
"""Search for documents in the vector store based on the search input.
If no search input is provided, retrieve all documents.
"""
results = self.search(self.search_query)
retrieved_data = [
Data(
text=result["page_content"],
file_path=result["metadata"].get("file_path", ""),
)
for result in results
]
self.status = retrieved_data
return retrieved_data
def get_retriever_kwargs(self):
"""Get the keyword arguments for the retriever."""
return {
"search_type": self.search_type.lower(),
"search_kwargs": {
"k": self.number_of_results,
"score_threshold": self.search_score_threshold,
},
}
FAISS
This component creates a FAISS Vector Store with search capabilities.
For more information, see the FAISS documentation.
Parameters
Name | Type | Description |
---|---|---|
index_name |
String |
The name of the FAISS index. Default: "langflow_index". |
persist_directory |
String |
Path to save the FAISS index. It will be relative to where Langflow is running. |
search_query |
String |
The query to search for in the vector store. |
ingest_data |
Data |
The data to ingest into the vector store (list of Data objects or documents). |
allow_dangerous_deserialization |
Boolean |
Set to True to allow loading pickle files from untrusted sources. Default: True (advanced). |
embedding |
Embeddings |
The embedding function to use for the vector store. |
number_of_results |
Integer |
Number of results to return from the search. Default: 4 (advanced). |
Name | Type | Description |
---|---|---|
vector_store |
FAISS |
A FAISS vector store instance configured with the specified parameters. |
Component code
faiss.py
from pathlib import Path
from langchain_community.vectorstores import FAISS
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import BoolInput, HandleInput, IntInput, StrInput
from langflow.schema import Data
class FaissVectorStoreComponent(LCVectorStoreComponent):
"""FAISS Vector Store with search capabilities."""
display_name: str = "FAISS"
description: str = "FAISS Vector Store with search capabilities"
name = "FAISS"
icon = "FAISS"
inputs = [
StrInput(
name="index_name",
display_name="Index Name",
value="langflow_index",
),
StrInput(
name="persist_directory",
display_name="Persist Directory",
info="Path to save the FAISS index. It will be relative to where Langflow is running.",
),
*LCVectorStoreComponent.inputs,
BoolInput(
name="allow_dangerous_deserialization",
display_name="Allow Dangerous Deserialization",
info="Set to True to allow loading pickle files from untrusted sources. "
"Only enable this if you trust the source of the data.",
advanced=True,
value=True,
),
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
advanced=True,
value=4,
),
]
@staticmethod
def resolve_path(path: str) -> str:
"""Resolve the path relative to the Langflow root.
Args:
path: The path to resolve
Returns:
str: The resolved path as a string
"""
return str(Path(path).resolve())
def get_persist_directory(self) -> Path:
"""Returns the resolved persist directory path or the current directory if not set."""
if self.persist_directory:
return Path(self.resolve_path(self.persist_directory))
return Path()
@check_cached_vector_store
def build_vector_store(self) -> FAISS:
"""Builds the FAISS object."""
path = self.get_persist_directory()
path.mkdir(parents=True, exist_ok=True)
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
faiss = FAISS.from_documents(documents=documents, embedding=self.embedding)
faiss.save_local(str(path), self.index_name)
return faiss
def search_documents(self) -> list[Data]:
"""Search for documents in the FAISS vector store."""
path = self.get_persist_directory()
index_path = path / f"{self.index_name}.faiss"
if not index_path.exists():
vector_store = self.build_vector_store()
else:
vector_store = FAISS.load_local(
folder_path=str(path),
embeddings=self.embedding,
index_name=self.index_name,
allow_dangerous_deserialization=self.allow_dangerous_deserialization,
)
if not vector_store:
msg = "Failed to load the FAISS index."
raise ValueError(msg)
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
return docs_to_data(docs)
return []
Hyper-Converged Database (HCD) Vector Store
This component implements a Vector Store using Hyper-Converged Database (HCD).
Parameters
Name | Display Name | Info |
---|---|---|
collection_name |
Collection Name |
The name of the collection within HCD where the vectors will be stored (required) |
username |
HCD Username |
Authentication username for accessing HCD (default: "hcd-superuser", required) |
password |
HCD Password |
Authentication password for accessing HCD (required) |
api_endpoint |
HCD API Endpoint |
API endpoint URL for the HCD service (required) |
search_input |
Search Input |
Query string for similarity search |
ingest_data |
Ingest Data |
Data to be ingested into the vector store |
namespace |
Namespace |
Optional namespace within HCD to use for the collection (default: "default_namespace") |
ca_certificate |
CA Certificate |
Optional CA certificate for TLS connections to HCD |
metric |
Metric |
Optional distance metric for vector comparisons (options: "cosine", "dot_product", "euclidean") |
batch_size |
Batch Size |
Optional number of data to process in a single batch |
bulk_insert_batch_concurrency |
Bulk Insert Batch Concurrency |
Optional concurrency level for bulk insert operations |
bulk_insert_overwrite_concurrency |
Bulk Insert Overwrite Concurrency |
Optional concurrency level for bulk insert operations that overwrite existing data |
bulk_delete_concurrency |
Bulk Delete Concurrency |
Optional concurrency level for bulk delete operations |
setup_mode |
Setup Mode |
Configuration mode for setting up the vector store (options: "Sync", "Async", "Off", default: "Sync") |
pre_delete_collection |
Pre Delete Collection |
Boolean flag to determine whether to delete the collection before creating a new one |
metadata_indexing_include |
Metadata Indexing Include |
Optional list of metadata fields to include in the indexing |
embedding |
Embedding or Astra Vectorize |
Allows either an embedding model or an Astra Vectorize configuration |
metadata_indexing_exclude |
Metadata Indexing Exclude |
Optional list of metadata fields to exclude from the indexing |
collection_indexing_policy |
Collection Indexing Policy |
Optional dictionary defining the indexing policy for the collection |
number_of_results |
Number of Results |
Number of results to return in similarity search (default: 4) |
search_type |
Search Type |
Search type to use (options: "Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)", default: "Similarity") |
search_score_threshold |
Search Score Threshold |
Minimum similarity score threshold for search results (default: 0) |
search_filter |
Search Metadata Filter |
Optional dictionary of filters to apply to the search query |
Name | Display Name | Info |
---|---|---|
vector_store |
Vector Store |
Built HCD vector store instance |
search_results |
Search Results |
Results of similarity search as a list of Data objects |
Component code
hcd.py
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers import docs_to_data
from langflow.inputs import DictInput, FloatInput
from langflow.io import (
BoolInput,
DropdownInput,
HandleInput,
IntInput,
MultilineInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data
class HCDVectorStoreComponent(LCVectorStoreComponent):
display_name: str = "Hyper-Converged Database"
description: str = "Implementation of Vector Store using Hyper-Converged Database (HCD) with search capabilities"
name = "HCD"
icon: str = "HCD"
inputs = [
StrInput(
name="collection_name",
display_name="Collection Name",
info="The name of the collection within HCD where the vectors will be stored.",
required=True,
),
StrInput(
name="username",
display_name="HCD Username",
info="Authentication username for accessing HCD.",
value="hcd-superuser",
required=True,
),
SecretStrInput(
name="password",
display_name="HCD Password",
info="Authentication password for accessing HCD.",
value="HCD_PASSWORD",
required=True,
),
SecretStrInput(
name="api_endpoint",
display_name="HCD API Endpoint",
info="API endpoint URL for the HCD service.",
value="HCD_API_ENDPOINT",
required=True,
),
*LCVectorStoreComponent.inputs,
StrInput(
name="namespace",
display_name="Namespace",
info="Optional namespace within HCD to use for the collection.",
value="default_namespace",
advanced=True,
),
MultilineInput(
name="ca_certificate",
display_name="CA Certificate",
info="Optional CA certificate for TLS connections to HCD.",
advanced=True,
),
DropdownInput(
name="metric",
display_name="Metric",
info="Optional distance metric for vector comparisons in the vector store.",
options=["cosine", "dot_product", "euclidean"],
advanced=True,
),
IntInput(
name="batch_size",
display_name="Batch Size",
info="Optional number of data to process in a single batch.",
advanced=True,
),
IntInput(
name="bulk_insert_batch_concurrency",
display_name="Bulk Insert Batch Concurrency",
info="Optional concurrency level for bulk insert operations.",
advanced=True,
),
IntInput(
name="bulk_insert_overwrite_concurrency",
display_name="Bulk Insert Overwrite Concurrency",
info="Optional concurrency level for bulk insert operations that overwrite existing data.",
advanced=True,
),
IntInput(
name="bulk_delete_concurrency",
display_name="Bulk Delete Concurrency",
info="Optional concurrency level for bulk delete operations.",
advanced=True,
),
DropdownInput(
name="setup_mode",
display_name="Setup Mode",
info="Configuration mode for setting up the vector store, with options like 'Sync', 'Async', or 'Off'.",
options=["Sync", "Async", "Off"],
advanced=True,
value="Sync",
),
BoolInput(
name="pre_delete_collection",
display_name="Pre Delete Collection",
info="Boolean flag to determine whether to delete the collection before creating a new one.",
advanced=True,
),
StrInput(
name="metadata_indexing_include",
display_name="Metadata Indexing Include",
info="Optional list of metadata fields to include in the indexing.",
advanced=True,
),
HandleInput(
name="embedding",
display_name="Embedding or Astra Vectorize",
input_types=["Embeddings", "dict"],
# TODO: This should be optional, but need to refactor langchain-astradb first.
info="Allows either an embedding model or an Astra Vectorize configuration.",
),
StrInput(
name="metadata_indexing_exclude",
display_name="Metadata Indexing Exclude",
info="Optional list of metadata fields to exclude from the indexing.",
advanced=True,
),
StrInput(
name="collection_indexing_policy",
display_name="Collection Indexing Policy",
info="Optional dictionary defining the indexing policy for the collection.",
advanced=True,
),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
advanced=True,
value=4,
),
DropdownInput(
name="search_type",
display_name="Search Type",
info="Search type to use",
options=["Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)"],
value="Similarity",
advanced=True,
),
FloatInput(
name="search_score_threshold",
display_name="Search Score Threshold",
info="Minimum similarity score threshold for search results. "
"(when using 'Similarity with score threshold')",
value=0,
advanced=True,
),
DictInput(
name="search_filter",
display_name="Search Metadata Filter",
info="Optional dictionary of filters to apply to the search query.",
advanced=True,
is_list=True,
),
]
@check_cached_vector_store
def build_vector_store(self):
try:
from langchain_astradb import AstraDBVectorStore
from langchain_astradb.utils.astradb import SetupMode
except ImportError as e:
msg = (
"Could not import langchain Astra DB integration package. "
"Please install it with `pip install langchain-astradb`."
)
raise ImportError(msg) from e
try:
from astrapy.authentication import UsernamePasswordTokenProvider
from astrapy.constants import Environment
except ImportError as e:
msg = "Could not import astrapy integration package. Please install it with `pip install astrapy`."
raise ImportError(msg) from e
try:
if not self.setup_mode:
self.setup_mode = self._inputs["setup_mode"].options[0]
setup_mode_value = SetupMode[self.setup_mode.upper()]
except KeyError as e:
msg = f"Invalid setup mode: {self.setup_mode}"
raise ValueError(msg) from e
if not isinstance(self.embedding, dict):
embedding_dict = {"embedding": self.embedding}
else:
from astrapy.info import CollectionVectorServiceOptions
dict_options = self.embedding.get("collection_vector_service_options", {})
dict_options["authentication"] = {
k: v for k, v in dict_options.get("authentication", {}).items() if k and v
}
dict_options["parameters"] = {k: v for k, v in dict_options.get("parameters", {}).items() if k and v}
embedding_dict = {
"collection_vector_service_options": CollectionVectorServiceOptions.from_dict(dict_options)
}
collection_embedding_api_key = self.embedding.get("collection_embedding_api_key")
if collection_embedding_api_key:
embedding_dict["collection_embedding_api_key"] = collection_embedding_api_key
token_provider = UsernamePasswordTokenProvider(self.username, self.password)
vector_store_kwargs = {
**embedding_dict,
"collection_name": self.collection_name,
"token": token_provider,
"api_endpoint": self.api_endpoint,
"namespace": self.namespace,
"metric": self.metric or None,
"batch_size": self.batch_size or None,
"bulk_insert_batch_concurrency": self.bulk_insert_batch_concurrency or None,
"bulk_insert_overwrite_concurrency": self.bulk_insert_overwrite_concurrency or None,
"bulk_delete_concurrency": self.bulk_delete_concurrency or None,
"setup_mode": setup_mode_value,
"pre_delete_collection": self.pre_delete_collection or False,
"environment": Environment.HCD,
}
if self.metadata_indexing_include:
vector_store_kwargs["metadata_indexing_include"] = self.metadata_indexing_include
elif self.metadata_indexing_exclude:
vector_store_kwargs["metadata_indexing_exclude"] = self.metadata_indexing_exclude
elif self.collection_indexing_policy:
vector_store_kwargs["collection_indexing_policy"] = self.collection_indexing_policy
try:
vector_store = AstraDBVectorStore(**vector_store_kwargs)
except Exception as e:
msg = f"Error initializing AstraDBVectorStore: {e}"
raise ValueError(msg) from e
self._add_documents_to_vector_store(vector_store)
return vector_store
def _add_documents_to_vector_store(self, vector_store) -> None:
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
msg = "Vector Store Inputs must be Data objects."
raise TypeError(msg)
if documents:
self.log(f"Adding {len(documents)} documents to the Vector Store.")
try:
vector_store.add_documents(documents)
except Exception as e:
msg = f"Error adding documents to AstraDBVectorStore: {e}"
raise ValueError(msg) from e
else:
self.log("No documents to add to the Vector Store.")
def _map_search_type(self) -> str:
if self.search_type == "Similarity with score threshold":
return "similarity_score_threshold"
if self.search_type == "MMR (Max Marginal Relevance)":
return "mmr"
return "similarity"
def _build_search_args(self):
args = {
"k": self.number_of_results,
"score_threshold": self.search_score_threshold,
}
if self.search_filter:
clean_filter = {k: v for k, v in self.search_filter.items() if k and v}
if len(clean_filter) > 0:
args["filter"] = clean_filter
return args
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
self.log(f"Search query: {self.search_query}")
self.log(f"Search type: {self.search_type}")
self.log(f"Number of results: {self.number_of_results}")
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
try:
search_type = self._map_search_type()
search_args = self._build_search_args()
docs = vector_store.search(query=self.search_query, search_type=search_type, **search_args)
except Exception as e:
msg = f"Error performing search in AstraDBVectorStore: {e}"
raise ValueError(msg) from e
self.log(f"Retrieved documents: {len(docs)}")
data = docs_to_data(docs)
self.log(f"Converted documents to data: {len(data)}")
self.status = data
return data
self.log("No search input provided. Skipping search.")
return []
def get_retriever_kwargs(self):
search_args = self._build_search_args()
return {
"search_type": self._map_search_type(),
"search_kwargs": search_args,
}
Milvus
This component creates a Milvus Vector Store with search capabilities.
For more information, see the Milvus documentation.
Parameters
Name | Type | Description |
---|---|---|
collection_name |
String |
Name of the Milvus collection |
collection_description |
String |
Description of the Milvus collection |
uri |
String |
Connection URI for Milvus |
password |
SecretString |
Connection password (if required) |
connection_args |
Dict |
Additional connection arguments |
primary_field |
String |
Name of the primary field |
text_field |
String |
Name of the text field |
vector_field |
String |
Name of the vector field |
consistency_level |
String |
Consistency level for operations |
index_params |
Dict |
Parameters for indexing |
search_params |
Dict |
Parameters for searching |
drop_old |
Boolean |
Whether to drop old collection |
timeout |
Float |
Timeout for operations |
search_query |
String |
Query for similarity search |
ingest_data |
Data |
Data to be ingested into the vector store |
embedding |
Embeddings |
Embedding function to use |
number_of_results |
Integer |
Number of results to return in search |
Name | Type | Description |
---|---|---|
vector_store |
Milvus |
Milvus vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
milvus.py
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import (
BoolInput,
DictInput,
DropdownInput,
FloatInput,
HandleInput,
IntInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data
class MilvusVectorStoreComponent(LCVectorStoreComponent):
"""Milvus vector store with search capabilities."""
display_name: str = "Milvus"
description: str = "Milvus vector store with search capabilities"
name = "Milvus"
icon = "Milvus"
inputs = [
StrInput(name="collection_name", display_name="Collection Name", value="langflow"),
StrInput(name="collection_description", display_name="Collection Description", value=""),
StrInput(
name="uri",
display_name="Connection URI",
value="http://localhost:19530",
),
SecretStrInput(
name="password",
display_name="Token",
value="",
info="Ignore this field if no token is required to make connection.",
),
DictInput(name="connection_args", display_name="Other Connection Arguments", advanced=True),
StrInput(name="primary_field", display_name="Primary Field Name", value="pk"),
StrInput(name="text_field", display_name="Text Field Name", value="text"),
StrInput(name="vector_field", display_name="Vector Field Name", value="vector"),
DropdownInput(
name="consistency_level",
display_name="Consistencey Level",
options=["Bounded", "Session", "Strong", "Eventual"],
value="Session",
advanced=True,
),
DictInput(name="index_params", display_name="Index Parameters", advanced=True),
DictInput(name="search_params", display_name="Search Parameters", advanced=True),
BoolInput(name="drop_old", display_name="Drop Old Collection", value=False, advanced=True),
FloatInput(name="timeout", display_name="Timeout", advanced=True),
*LCVectorStoreComponent.inputs,
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self):
try:
from langchain_milvus.vectorstores import Milvus as LangchainMilvus
except ImportError as e:
msg = "Could not import Milvus integration package. Please install it with `pip install langchain-milvus`."
raise ImportError(msg) from e
self.connection_args.update(uri=self.uri, token=self.password)
milvus_store = LangchainMilvus(
embedding_function=self.embedding,
collection_name=self.collection_name,
collection_description=self.collection_description,
connection_args=self.connection_args,
consistency_level=self.consistency_level,
index_params=self.index_params,
search_params=self.search_params,
drop_old=self.drop_old,
auto_id=True,
primary_field=self.primary_field,
text_field=self.text_field,
vector_field=self.vector_field,
timeout=self.timeout,
)
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
if documents:
milvus_store.add_documents(documents)
return milvus_store
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
data = docs_to_data(docs)
self.status = data
return data
return []
MongoDB Atlas
This component creates a MongoDB Atlas Vector Store with search capabilities. For more information, see the MongoDB Atlas documentation.
Parameters
Name | Type | Description |
---|---|---|
mongodb_atlas_cluster_uri |
SecretString |
MongoDB Atlas Cluster URI |
db_name |
String |
Database name |
collection_name |
String |
Collection name |
index_name |
String |
Index name |
search_query |
String |
Query for similarity search |
ingest_data |
Data |
Data to be ingested into the vector store |
embedding |
Embeddings |
Embedding function to use |
number_of_results |
Integer |
Number of results to return in search |
Name | Type | Description |
---|---|---|
vector_store |
MongoDBAtlasVectorSearch |
MongoDB Atlas vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
mongodb_atlas.py
import tempfile
import certifi
from langchain_community.vectorstores import MongoDBAtlasVectorSearch
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import BoolInput, HandleInput, IntInput, SecretStrInput, StrInput
from langflow.schema import Data
class MongoVectorStoreComponent(LCVectorStoreComponent):
display_name = "MongoDB Atlas"
description = "MongoDB Atlas Vector Store with search capabilities"
name = "MongoDBAtlasVector"
icon = "MongoDB"
inputs = [
SecretStrInput(name="mongodb_atlas_cluster_uri", display_name="MongoDB Atlas Cluster URI", required=True),
BoolInput(name="enable_mtls", display_name="Enable mTLS", value=False, advanced=True, required=True),
SecretStrInput(
name="mongodb_atlas_client_cert",
display_name="MongoDB Atlas Combined Client Certificate",
required=False,
info="Client Certificate combined with the private key in the following format:\n "
"-----BEGIN PRIVATE KEY-----\n...\n -----END PRIVATE KEY-----\n-----BEGIN CERTIFICATE-----\n"
"...\n-----END CERTIFICATE-----\n",
),
StrInput(name="db_name", display_name="Database Name", required=True),
StrInput(name="collection_name", display_name="Collection Name", required=True),
StrInput(name="index_name", display_name="Index Name", required=True),
*LCVectorStoreComponent.inputs,
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> MongoDBAtlasVectorSearch:
try:
from pymongo import MongoClient
except ImportError as e:
msg = "Please install pymongo to use MongoDB Atlas Vector Store"
raise ImportError(msg) from e
# Create temporary files for the client certificate
if self.enable_mtls:
client_cert_path = None
try:
client_cert = self.mongodb_atlas_client_cert.replace(" ", "\n")
client_cert = client_cert.replace("-----BEGIN\nPRIVATE\nKEY-----", "-----BEGIN PRIVATE KEY-----")
client_cert = client_cert.replace(
"-----END\nPRIVATE\nKEY-----\n-----BEGIN\nCERTIFICATE-----",
"-----END PRIVATE KEY-----\n-----BEGIN CERTIFICATE-----",
)
client_cert = client_cert.replace("-----END\nCERTIFICATE-----", "-----END CERTIFICATE-----")
with tempfile.NamedTemporaryFile(delete=False) as client_cert_file:
client_cert_file.write(client_cert.encode("utf-8"))
client_cert_path = client_cert_file.name
except Exception as e:
msg = f"Failed to write certificate to temporary file: {e}"
raise ValueError(msg) from e
try:
mongo_client: MongoClient = (
MongoClient(
self.mongodb_atlas_cluster_uri,
tls=True,
tlsCertificateKeyFile=client_cert_path,
tlsCAFile=certifi.where(),
)
if self.enable_mtls
else MongoClient(self.mongodb_atlas_cluster_uri)
)
collection = mongo_client[self.db_name][self.collection_name]
collection.drop() # Drop collection to override the vector store
except Exception as e:
msg = f"Failed to connect to MongoDB Atlas: {e}"
raise ValueError(msg) from e
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
if documents:
return MongoDBAtlasVectorSearch.from_documents(
documents=documents, embedding=self.embedding, collection=collection, index_name=self.index_name
)
return MongoDBAtlasVectorSearch(
embedding=self.embedding,
collection=collection,
index_name=self.index_name,
)
def search_documents(self) -> list[Data]:
from bson.objectid import ObjectId
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str):
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
for doc in docs:
doc.metadata = {
key: str(value) if isinstance(value, ObjectId) else value for key, value in doc.metadata.items()
}
data = docs_to_data(docs)
self.status = data
return data
return []
Opensearch
This component creates an OpenSearch Vector Store with search capabilities. For more information, see the Opensearch vector store documentation.
Parameters
Name | Type | Description |
---|---|---|
opensearch_url |
String |
URL for OpenSearch cluster, for example https://192.168.1.1:9200. |
index_name |
String |
The index name where the vectors will be stored in OpenSearch cluster. |
search_input |
Multiline |
Enter a search query. Leave empty to retrieve all documents. |
ingest_data |
Data |
Data to be ingested into the vector store. |
embedding |
Handle |
Embedding model to use. |
search_type |
Dropdown |
Search type to use (similarity, similarity_score_threshold, or mmr). |
number_of_results |
Integer |
Number of results to return. |
search_score_threshold |
Float |
Minimum similarity score threshold for search results. |
username |
String |
Username for OpenSearch authentication. |
password |
SecretString |
Password for OpenSearch authentication. |
use_ssl |
Boolean |
Whether to use SSL for connection. |
verify_certs |
Boolean |
Whether to verify SSL certificates. |
hybrid_search_query |
Multiline |
Custom hybrid search query in JSON format. |
Name | Type | Description |
---|---|---|
vector_store |
VectorStore |
An instance of OpenSearchVectorSearch for storing and searching vectors. |
Component code
opensearch.py
import json
from typing import Any
from langchain_community.vectorstores import OpenSearchVectorSearch
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.io import (
BoolInput,
DropdownInput,
FloatInput,
HandleInput,
IntInput,
MultilineInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data
class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
"""OpenSearch Vector Store with advanced, customizable search capabilities."""
display_name: str = "OpenSearch"
description: str = "OpenSearch Vector Store with advanced, customizable search capabilities."
name = "OpenSearch"
icon = "OpenSearch"
inputs = [
StrInput(
name="opensearch_url",
display_name="OpenSearch URL",
value="http://localhost:9200",
info="URL for OpenSearch cluster (e.g. https://192.168.1.1:9200).",
),
StrInput(
name="index_name",
display_name="Index Name",
value="langflow",
info="The index name where the vectors will be stored in OpenSearch cluster.",
),
*LCVectorStoreComponent.inputs,
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
DropdownInput(
name="search_type",
display_name="Search Type",
options=["similarity", "similarity_score_threshold", "mmr"],
value="similarity",
advanced=True,
),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
advanced=True,
value=4,
),
FloatInput(
name="search_score_threshold",
display_name="Search Score Threshold",
info="Minimum similarity score threshold for search results.",
value=0.0,
advanced=True,
),
StrInput(
name="username",
display_name="Username",
value="admin",
advanced=True,
),
SecretStrInput(
name="password",
display_name="Password",
value="admin",
advanced=True,
),
BoolInput(
name="use_ssl",
display_name="Use SSL",
value=True,
advanced=True,
),
BoolInput(
name="verify_certs",
display_name="Verify Certificates",
value=False,
advanced=True,
),
MultilineInput(
name="hybrid_search_query",
display_name="Hybrid Search Query",
value="",
advanced=True,
info=(
"Provide a custom hybrid search query in JSON format. This allows you to combine "
"vector similarity and keyword matching."
),
),
]
@check_cached_vector_store
def build_vector_store(self) -> OpenSearchVectorSearch:
"""Builds the OpenSearch Vector Store object."""
try:
from langchain_community.vectorstores import OpenSearchVectorSearch
except ImportError as e:
error_message = f"Failed to import required modules: {e}"
self.log(error_message)
raise ImportError(error_message) from e
try:
opensearch = OpenSearchVectorSearch(
index_name=self.index_name,
embedding_function=self.embedding,
opensearch_url=self.opensearch_url,
http_auth=(self.username, self.password),
use_ssl=self.use_ssl,
verify_certs=self.verify_certs,
ssl_assert_hostname=False,
ssl_show_warn=False,
)
except Exception as e:
error_message = f"Failed to create OpenSearchVectorSearch instance: {e}"
self.log(error_message)
raise RuntimeError(error_message) from e
if self.ingest_data:
self._add_documents_to_vector_store(opensearch)
return opensearch
def _add_documents_to_vector_store(self, vector_store: "OpenSearchVectorSearch") -> None:
"""Adds documents to the Vector Store."""
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
error_message = f"Expected Data object, got {type(_input)}"
self.log(error_message)
raise TypeError(error_message)
if documents and self.embedding is not None:
self.log(f"Adding {len(documents)} documents to the Vector Store.")
try:
vector_store.add_documents(documents)
except Exception as e:
error_message = f"Error adding documents to Vector Store: {e}"
self.log(error_message)
raise RuntimeError(error_message) from e
else:
self.log("No documents to add to the Vector Store.")
def search(self, query: str | None = None) -> list[dict[str, Any]]:
"""Search for similar documents in the vector store or retrieve all documents if no query is provided."""
try:
vector_store = self.build_vector_store()
query = query or ""
if self.hybrid_search_query.strip():
try:
hybrid_query = json.loads(self.hybrid_search_query)
except json.JSONDecodeError as e:
error_message = f"Invalid hybrid search query JSON: {e}"
self.log(error_message)
raise ValueError(error_message) from e
results = vector_store.client.search(index=self.index_name, body=hybrid_query)
processed_results = []
for hit in results.get("hits", {}).get("hits", []):
source = hit.get("_source", {})
text = source.get("text", "")
metadata = source.get("metadata", {})
if isinstance(text, dict):
text = text.get("text", "")
processed_results.append(
{
"page_content": text,
"metadata": metadata,
}
)
return processed_results
search_kwargs = {"k": self.number_of_results}
search_type = self.search_type.lower()
if search_type == "similarity":
results = vector_store.similarity_search(query, **search_kwargs)
return [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in results]
if search_type == "similarity_score_threshold":
search_kwargs["score_threshold"] = self.search_score_threshold
results = vector_store.similarity_search_with_relevance_scores(query, **search_kwargs)
return [
{
"page_content": doc.page_content,
"metadata": doc.metadata,
"score": score,
}
for doc, score in results
]
if search_type == "mmr":
results = vector_store.max_marginal_relevance_search(query, **search_kwargs)
return [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in results]
except Exception as e:
error_message = f"Error during search: {e}"
self.log(error_message)
raise RuntimeError(error_message) from e
error_message = f"Error during search. Invalid search type: {self.search_type}"
self.log(error_message)
raise ValueError(error_message)
def search_documents(self) -> list[Data]:
"""Search for documents in the vector store based on the search input.
If no search input is provided, retrieve all documents.
"""
try:
query = self.search_query.strip() if self.search_query else None
results = self.search(query)
retrieved_data = [
Data(
file_path=result["metadata"].get("file_path", ""),
text=result["page_content"],
)
for result in results
]
except Exception as e:
error_message = f"Error during document search: {e}"
self.log(error_message)
raise RuntimeError(error_message) from e
self.status = retrieved_data
return retrieved_data
PGVector
This component creates a PGVector Vector Store with search capabilities. For more information, see the PGVector documentation.
Parameters
Name | Type | Description |
---|---|---|
pg_server_url |
SecretString |
PostgreSQL server connection string |
collection_name |
String |
Table name for the vector store |
search_query |
String |
Query for similarity search |
ingest_data |
Data |
Data to be ingested into the vector store |
embedding |
Embeddings |
Embedding function to use |
number_of_results |
Integer |
Number of results to return in search |
Name | Type | Description |
---|---|---|
vector_store |
PGVector |
PGVector vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
pgvector.py
from langchain_community.vectorstores import PGVector
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import HandleInput, IntInput, SecretStrInput, StrInput
from langflow.schema import Data
from langflow.utils.connection_string_parser import transform_connection_string
class PGVectorStoreComponent(LCVectorStoreComponent):
display_name = "PGVector"
description = "PGVector Vector Store with search capabilities"
name = "pgvector"
icon = "cpu"
inputs = [
SecretStrInput(name="pg_server_url", display_name="PostgreSQL Server Connection String", required=True),
StrInput(name="collection_name", display_name="Table", required=True),
*LCVectorStoreComponent.inputs,
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"], required=True),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> PGVector:
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
connection_string_parsed = transform_connection_string(self.pg_server_url)
if documents:
pgvector = PGVector.from_documents(
embedding=self.embedding,
documents=documents,
collection_name=self.collection_name,
connection_string=connection_string_parsed,
)
else:
pgvector = PGVector.from_existing_index(
embedding=self.embedding,
collection_name=self.collection_name,
connection_string=connection_string_parsed,
)
return pgvector
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
data = docs_to_data(docs)
self.status = data
return data
return []
Pinecone
This component creates a Pinecone Vector Store with search capabilities.
For more information, see the Pinecone documentation.
Parameters
Name | Type | Description |
---|---|---|
index_name |
String |
Name of the Pinecone index |
namespace |
String |
Namespace for the index |
distance_strategy |
String |
Strategy for calculating distance between vectors |
pinecone_api_key |
SecretString |
API key for Pinecone |
text_key |
String |
Key in the record to use as text |
search_query |
String |
Query for similarity search |
ingest_data |
Data |
Data to be ingested into the vector store |
embedding |
Embeddings |
Embedding function to use |
number_of_results |
Integer |
Number of results to return in search |
Name | Type | Description |
---|---|---|
vector_store |
Pinecone |
Pinecone vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
pinecone.py
import numpy as np
from langchain_core.vectorstores import VectorStore
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import DropdownInput, HandleInput, IntInput, SecretStrInput, StrInput
from langflow.schema import Data
class PineconeVectorStoreComponent(LCVectorStoreComponent):
display_name = "Pinecone"
description = "Pinecone Vector Store with search capabilities"
name = "Pinecone"
icon = "Pinecone"
inputs = [
StrInput(name="index_name", display_name="Index Name", required=True),
StrInput(name="namespace", display_name="Namespace", info="Namespace for the index."),
DropdownInput(
name="distance_strategy",
display_name="Distance Strategy",
options=["Cosine", "Euclidean", "Dot Product"],
value="Cosine",
advanced=True,
),
SecretStrInput(name="pinecone_api_key", display_name="Pinecone API Key", required=True),
StrInput(
name="text_key",
display_name="Text Key",
info="Key in the record to use as text.",
value="text",
advanced=True,
),
*LCVectorStoreComponent.inputs,
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> VectorStore:
"""Build and return a Pinecone vector store instance."""
try:
from langchain_pinecone import PineconeVectorStore
except ImportError as e:
msg = "langchain-pinecone is not installed. Please install it with `pip install langchain-pinecone`."
raise ValueError(msg) from e
try:
from langchain_pinecone._utilities import DistanceStrategy
# Wrap the embedding model to ensure float32 output
wrapped_embeddings = Float32Embeddings(self.embedding)
# Convert distance strategy
distance_strategy = self.distance_strategy.replace(" ", "_").upper()
distance_strategy = DistanceStrategy[distance_strategy]
# Initialize Pinecone instance with wrapped embeddings
pinecone = PineconeVectorStore(
index_name=self.index_name,
embedding=wrapped_embeddings, # Use wrapped embeddings
text_key=self.text_key,
namespace=self.namespace,
distance_strategy=distance_strategy,
pinecone_api_key=self.pinecone_api_key,
)
except Exception as e:
error_msg = "Error building Pinecone vector store"
raise ValueError(error_msg) from e
else:
# Process documents if any
documents = []
if self.ingest_data:
for doc in self.ingest_data:
if isinstance(doc, Data):
documents.append(doc.to_lc_document())
else:
documents.append(doc)
if documents:
pinecone.add_documents(documents)
return pinecone
def search_documents(self) -> list[Data]:
"""Search documents in the vector store."""
try:
if not self.search_query or not isinstance(self.search_query, str) or not self.search_query.strip():
return []
vector_store = self.build_vector_store()
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
except Exception as e:
error_msg = "Error searching documents"
raise ValueError(error_msg) from e
else:
data = docs_to_data(docs)
self.status = data
return data
class Float32Embeddings:
"""Wrapper class to ensure float32 embeddings."""
def __init__(self, base_embeddings):
self.base_embeddings = base_embeddings
def embed_documents(self, texts):
embeddings = self.base_embeddings.embed_documents(texts)
if isinstance(embeddings, np.ndarray):
return [[self._force_float32(x) for x in vec] for vec in embeddings]
return [[self._force_float32(x) for x in vec] for vec in embeddings]
def embed_query(self, text):
embedding = self.base_embeddings.embed_query(text)
if isinstance(embedding, np.ndarray):
return [self._force_float32(x) for x in embedding]
return [self._force_float32(x) for x in embedding]
def _force_float32(self, value):
"""Convert any numeric type to Python float."""
return float(np.float32(value))
Qdrant
This component creates a Qdrant Vector Store with search capabilities. For more information, see the Qdrant documentation.
Parameters
Name | Type | Description |
---|---|---|
collection_name |
String |
Name of the Qdrant collection |
host |
String |
Qdrant server host |
port |
Integer |
Qdrant server port |
grpc_port |
Integer |
Qdrant gRPC port |
api_key |
SecretString |
API key for Qdrant |
prefix |
String |
Prefix for Qdrant |
timeout |
Integer |
Timeout for Qdrant operations |
path |
String |
Path for Qdrant |
url |
String |
URL for Qdrant |
distance_func |
String |
Distance function for vector similarity |
content_payload_key |
String |
Key for content payload |
metadata_payload_key |
String |
Key for metadata payload |
search_query |
String |
Query for similarity search |
ingest_data |
Data |
Data to be ingested into the vector store |
embedding |
Embeddings |
Embedding function to use |
number_of_results |
Integer |
Number of results to return in search |
Name | Type | Description |
---|---|---|
vector_store |
Qdrant |
Qdrant vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
qdrant.py
from langchain.embeddings.base import Embeddings
from langchain_community.vectorstores import Qdrant
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import (
DropdownInput,
HandleInput,
IntInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data
class QdrantVectorStoreComponent(LCVectorStoreComponent):
display_name = "Qdrant"
description = "Qdrant Vector Store with search capabilities"
icon = "Qdrant"
inputs = [
StrInput(name="collection_name", display_name="Collection Name", required=True),
StrInput(name="host", display_name="Host", value="localhost", advanced=True),
IntInput(name="port", display_name="Port", value=6333, advanced=True),
IntInput(name="grpc_port", display_name="gRPC Port", value=6334, advanced=True),
SecretStrInput(name="api_key", display_name="API Key", advanced=True),
StrInput(name="prefix", display_name="Prefix", advanced=True),
IntInput(name="timeout", display_name="Timeout", advanced=True),
StrInput(name="path", display_name="Path", advanced=True),
StrInput(name="url", display_name="URL", advanced=True),
DropdownInput(
name="distance_func",
display_name="Distance Function",
options=["Cosine", "Euclidean", "Dot Product"],
value="Cosine",
advanced=True,
),
StrInput(name="content_payload_key", display_name="Content Payload Key", value="page_content", advanced=True),
StrInput(name="metadata_payload_key", display_name="Metadata Payload Key", value="metadata", advanced=True),
*LCVectorStoreComponent.inputs,
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> Qdrant:
qdrant_kwargs = {
"collection_name": self.collection_name,
"content_payload_key": self.content_payload_key,
"metadata_payload_key": self.metadata_payload_key,
}
server_kwargs = {
"host": self.host or None,
"port": int(self.port), # Ensure port is an integer
"grpc_port": int(self.grpc_port), # Ensure grpc_port is an integer
"api_key": self.api_key,
"prefix": self.prefix,
# Ensure timeout is an integer
"timeout": int(self.timeout) if self.timeout else None,
"path": self.path or None,
"url": self.url or None,
}
server_kwargs = {k: v for k, v in server_kwargs.items() if v is not None}
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
if not isinstance(self.embedding, Embeddings):
msg = "Invalid embedding object"
raise TypeError(msg)
if documents:
qdrant = Qdrant.from_documents(documents, embedding=self.embedding, **qdrant_kwargs, **server_kwargs)
else:
from qdrant_client import QdrantClient
client = QdrantClient(**server_kwargs)
qdrant = Qdrant(embeddings=self.embedding, client=client, **qdrant_kwargs)
return qdrant
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
data = docs_to_data(docs)
self.status = data
return data
return []
Redis
This component creates a Redis Vector Store with search capabilities. For more information, see the Redis documentation.
Parameters
Name | Type | Description |
---|---|---|
redis_server_url |
SecretString |
Redis server connection string |
redis_index_name |
String |
Name of the Redis index |
code |
String |
Custom code for Redis (advanced) |
schema |
String |
Schema for Redis index |
search_query |
String |
Query for similarity search |
ingest_data |
Data |
Data to be ingested into the vector store |
number_of_results |
Integer |
Number of results to return in search |
embedding |
Embeddings |
Embedding function to use |
Name | Type | Description |
---|---|---|
vector_store |
Redis |
Redis vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
redis.py
from pathlib import Path
from langchain.text_splitter import CharacterTextSplitter
from langchain_community.vectorstores.redis import Redis
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import HandleInput, IntInput, SecretStrInput, StrInput
from langflow.schema import Data
class RedisVectorStoreComponent(LCVectorStoreComponent):
"""A custom component for implementing a Vector Store using Redis."""
display_name: str = "Redis"
description: str = "Implementation of Vector Store using Redis"
name = "Redis"
icon = "Redis"
inputs = [
SecretStrInput(name="redis_server_url", display_name="Redis Server Connection String", required=True),
StrInput(
name="redis_index_name",
display_name="Redis Index",
),
StrInput(name="code", display_name="Code", advanced=True),
StrInput(
name="schema",
display_name="Schema",
),
*LCVectorStoreComponent.inputs,
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
]
@check_cached_vector_store
def build_vector_store(self) -> Redis:
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
Path("docuemnts.txt").write_text(str(documents), encoding="utf-8")
if not documents:
if self.schema is None:
msg = "If no documents are provided, a schema must be provided."
raise ValueError(msg)
redis_vs = Redis.from_existing_index(
embedding=self.embedding,
index_name=self.redis_index_name,
schema=self.schema,
key_prefix=None,
redis_url=self.redis_server_url,
)
else:
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
redis_vs = Redis.from_documents(
documents=docs,
embedding=self.embedding,
redis_url=self.redis_server_url,
index_name=self.redis_index_name,
)
return redis_vs
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
data = docs_to_data(docs)
self.status = data
return data
return []
Supabase
This component creates a connection to a Supabase Vector Store with search capabilities.
For more information, see the Supabase documentation.
Parameters
Name | Type | Description |
---|---|---|
supabase_url |
String |
URL of the Supabase instance |
supabase_service_key |
SecretString |
Service key for Supabase authentication |
table_name |
String |
Name of the table in Supabase |
query_name |
String |
Name of the query to use |
search_query |
String |
Query for similarity search |
ingest_data |
Data |
Data to be ingested into the vector store |
embedding |
Embeddings |
Embedding function to use |
number_of_results |
Integer |
Number of results to return in search |
Name | Type | Description |
---|---|---|
vector_store |
SupabaseVectorStore |
Supabase vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
supabase.py
from langchain_community.vectorstores import SupabaseVectorStore
from supabase.client import Client, create_client
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import HandleInput, IntInput, SecretStrInput, StrInput
from langflow.schema import Data
class SupabaseVectorStoreComponent(LCVectorStoreComponent):
display_name = "Supabase"
description = "Supabase Vector Store with search capabilities"
name = "SupabaseVectorStore"
icon = "Supabase"
inputs = [
StrInput(name="supabase_url", display_name="Supabase URL", required=True),
SecretStrInput(name="supabase_service_key", display_name="Supabase Service Key", required=True),
StrInput(name="table_name", display_name="Table Name", advanced=True),
StrInput(name="query_name", display_name="Query Name"),
*LCVectorStoreComponent.inputs,
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> SupabaseVectorStore:
supabase: Client = create_client(self.supabase_url, supabase_key=self.supabase_service_key)
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
if documents:
supabase_vs = SupabaseVectorStore.from_documents(
documents=documents,
embedding=self.embedding,
query_name=self.query_name,
client=supabase,
table_name=self.table_name,
)
else:
supabase_vs = SupabaseVectorStore(
client=supabase,
embedding=self.embedding,
table_name=self.table_name,
query_name=self.query_name,
)
return supabase_vs
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
data = docs_to_data(docs)
self.status = data
return data
return []
Upstash
This component creates an Upstash Vector Store with search capabilities. For more information, see the Upstash documentation.
Parameters
Name | Type | Description |
---|---|---|
index_url |
String |
The URL of the Upstash index |
index_token |
SecretString |
The token for the Upstash index |
text_key |
String |
The key in the record to use as text |
namespace |
String |
Namespace for the index |
search_query |
String |
Query for similarity search |
metadata_filter |
String |
Filters documents by metadata |
ingest_data |
Data |
Data to be ingested into the vector store |
embedding |
Embeddings |
Embedding function to use (optional) |
number_of_results |
Integer |
Number of results to return in search |
Name | Type | Description |
---|---|---|
vector_store |
UpstashVectorStore |
Upstash vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
upstash.py
from langchain_community.vectorstores import UpstashVectorStore
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import (
HandleInput,
IntInput,
MultilineInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data
class UpstashVectorStoreComponent(LCVectorStoreComponent):
display_name = "Upstash"
description = "Upstash Vector Store with search capabilities"
name = "Upstash"
icon = "Upstash"
inputs = [
StrInput(
name="index_url",
display_name="Index URL",
info="The URL of the Upstash index.",
required=True,
),
SecretStrInput(
name="index_token",
display_name="Index Token",
info="The token for the Upstash index.",
required=True,
),
StrInput(
name="text_key",
display_name="Text Key",
info="The key in the record to use as text.",
value="text",
advanced=True,
),
StrInput(
name="namespace",
display_name="Namespace",
info="Leave empty for default namespace.",
),
*LCVectorStoreComponent.inputs,
MultilineInput(
name="metadata_filter",
display_name="Metadata Filter",
info="Filters documents by metadata. Look at the documentation for more information.",
),
HandleInput(
name="embedding",
display_name="Embedding",
input_types=["Embeddings"],
info="To use Upstash's embeddings, don't provide an embedding.",
),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> UpstashVectorStore:
use_upstash_embedding = self.embedding is None
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
if documents:
if use_upstash_embedding:
upstash_vs = UpstashVectorStore(
embedding=use_upstash_embedding,
text_key=self.text_key,
index_url=self.index_url,
index_token=self.index_token,
namespace=self.namespace,
)
upstash_vs.add_documents(documents)
else:
upstash_vs = UpstashVectorStore.from_documents(
documents=documents,
embedding=self.embedding,
text_key=self.text_key,
index_url=self.index_url,
index_token=self.index_token,
namespace=self.namespace,
)
else:
upstash_vs = UpstashVectorStore(
embedding=self.embedding or use_upstash_embedding,
text_key=self.text_key,
index_url=self.index_url,
index_token=self.index_token,
namespace=self.namespace,
)
return upstash_vs
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
filter=self.metadata_filter,
)
data = docs_to_data(docs)
self.status = data
return data
return []
Vectara
This component creates a Vectara Vector Store with search capabilities. For more information, see the Vectara documentation.
Parameters
Name | Type | Description |
---|---|---|
vectara_customer_id |
String |
Vectara customer ID |
vectara_corpus_id |
String |
Vectara corpus ID |
vectara_api_key |
SecretString |
Vectara API key |
embedding |
Embeddings |
Embedding function to use (optional) |
ingest_data |
List[Document/Data] |
Data to be ingested into the vector store |
search_query |
String |
Query for similarity search |
number_of_results |
Integer |
Number of results to return in search |
Name | Type | Description |
---|---|---|
vector_store |
Vectara |
Vectara vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
vectara.py
from typing import TYPE_CHECKING
from langchain_community.vectorstores import Vectara
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import HandleInput, IntInput, SecretStrInput, StrInput
from langflow.schema import Data
if TYPE_CHECKING:
from langchain_community.vectorstores import Vectara
class VectaraVectorStoreComponent(LCVectorStoreComponent):
"""Vectara Vector Store with search capabilities."""
display_name: str = "Vectara"
description: str = "Vectara Vector Store with search capabilities"
name = "Vectara"
icon = "Vectara"
inputs = [
StrInput(name="vectara_customer_id", display_name="Vectara Customer ID", required=True),
StrInput(name="vectara_corpus_id", display_name="Vectara Corpus ID", required=True),
SecretStrInput(name="vectara_api_key", display_name="Vectara API Key", required=True),
HandleInput(
name="embedding",
display_name="Embedding",
input_types=["Embeddings"],
),
*LCVectorStoreComponent.inputs,
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> "Vectara":
"""Builds the Vectara object."""
try:
from langchain_community.vectorstores import Vectara
except ImportError as e:
msg = "Could not import Vectara. Please install it with `pip install langchain-community`."
raise ImportError(msg) from e
vectara = Vectara(
vectara_customer_id=self.vectara_customer_id,
vectara_corpus_id=self.vectara_corpus_id,
vectara_api_key=self.vectara_api_key,
)
self._add_documents_to_vector_store(vectara)
return vectara
def _add_documents_to_vector_store(self, vector_store: "Vectara") -> None:
"""Adds documents to the Vector Store."""
if not self.ingest_data:
self.status = "No documents to add to Vectara"
return
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
if documents:
self.log(f"Adding {len(documents)} documents to Vectara.")
vector_store.add_documents(documents)
self.status = f"Added {len(documents)} documents to Vectara"
else:
self.log("No documents to add to Vectara.")
self.status = "No valid documents to add to Vectara"
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
data = docs_to_data(docs)
self.status = f"Found {len(data)} results for the query: {self.search_query}"
return data
self.status = "No search query provided"
return []
Vectara RAG
This component creates a Vectara RAG pipeline. For more information, see the Vectara documentation.
Parameters
Name | Type | Description |
---|---|---|
vectara_customer_id |
String |
Vectara customer ID |
vectara_corpus_id |
String |
Vectara corpus ID |
vectara_api_key |
SecretString |
Vectara API key |
search_query |
String |
The query to receive an answer on |
lexical_interpolation |
Float |
Hybrid search factor |
filter |
String |
Metadata filters for narrowing search |
reranker |
String |
Type of reranker to use |
reranker_k |
Integer |
Number of results to rerank |
diversity_bias |
Float |
Diversity bias for MMR reranker |
max_results |
Integer |
Maximum results to summarize |
response_lang |
String |
Language code for response |
prompt |
String |
Name of the summarizer prompt |
Name | Type | Description |
---|---|---|
answer |
Message |
Generated response from Vectara RAG |
Component code
vectara_rag.py
from langflow.custom import Component
from langflow.field_typing.range_spec import RangeSpec
from langflow.io import DropdownInput, FloatInput, IntInput, MessageTextInput, Output, SecretStrInput, StrInput
from langflow.schema.message import Message
class VectaraRagComponent(Component):
display_name = "Vectara RAG"
description = "Vectara's full end to end RAG"
documentation = "https://docs.vectara.com/docs"
icon = "Vectara"
name = "VectaraRAG"
SUMMARIZER_PROMPTS = [
"vectara-summary-ext-24-05-sml",
"vectara-summary-ext-24-05-med-omni",
"vectara-summary-ext-24-05-large",
"vectara-summary-ext-24-05-med",
"vectara-summary-ext-v1.3.0",
]
RERANKER_TYPES = ["mmr", "rerank_multilingual_v1", "none"]
RESPONSE_LANGUAGES = [
"auto",
"eng",
"spa",
"fra",
"zho",
"deu",
"hin",
"ara",
"por",
"ita",
"jpn",
"kor",
"rus",
"tur",
"fas",
"vie",
"tha",
"heb",
"nld",
"ind",
"pol",
"ukr",
"ron",
"swe",
"ces",
"ell",
"ben",
"msa",
"urd",
]
field_order = ["vectara_customer_id", "vectara_corpus_id", "vectara_api_key", "search_query", "reranker"]
inputs = [
StrInput(name="vectara_customer_id", display_name="Vectara Customer ID", required=True),
StrInput(name="vectara_corpus_id", display_name="Vectara Corpus ID", required=True),
SecretStrInput(name="vectara_api_key", display_name="Vectara API Key", required=True),
MessageTextInput(
name="search_query",
display_name="Search Query",
info="The query to receive an answer on.",
tool_mode=True,
),
FloatInput(
name="lexical_interpolation",
display_name="Hybrid Search Factor",
range_spec=RangeSpec(min=0.005, max=0.1, step=0.005),
value=0.005,
advanced=True,
info="How much to weigh lexical scores compared to the embedding score. "
"0 means lexical search is not used at all, and 1 means only lexical search is used.",
),
MessageTextInput(
name="filter",
display_name="Metadata Filters",
value="",
advanced=True,
info="The filter string to narrow the search to according to metadata attributes.",
),
DropdownInput(
name="reranker",
display_name="Reranker Type",
options=RERANKER_TYPES,
value=RERANKER_TYPES[0],
info="How to rerank the retrieved search results.",
),
IntInput(
name="reranker_k",
display_name="Number of Results to Rerank",
value=50,
range_spec=RangeSpec(min=1, max=100, step=1),
advanced=True,
),
FloatInput(
name="diversity_bias",
display_name="Diversity Bias",
value=0.2,
range_spec=RangeSpec(min=0, max=1, step=0.01),
advanced=True,
info="Ranges from 0 to 1, with higher values indicating greater diversity (only applies to MMR reranker).",
),
IntInput(
name="max_results",
display_name="Max Results to Summarize",
value=7,
range_spec=RangeSpec(min=1, max=100, step=1),
advanced=True,
info="The maximum number of search results to be available to the prompt.",
),
DropdownInput(
name="response_lang",
display_name="Response Language",
options=RESPONSE_LANGUAGES,
value="eng",
advanced=True,
info="Use the ISO 639-1 or 639-3 language code or auto to automatically detect the language.",
),
DropdownInput(
name="prompt",
display_name="Prompt Name",
options=SUMMARIZER_PROMPTS,
value=SUMMARIZER_PROMPTS[0],
advanced=True,
info="Only vectara-summary-ext-24-05-sml is for Growth customers; "
"all other prompts are for Scale customers only.",
),
]
outputs = [
Output(name="answer", display_name="Answer", method="generate_response"),
]
def generate_response(
self,
) -> Message:
text_output = ""
try:
from langchain_community.vectorstores import Vectara
from langchain_community.vectorstores.vectara import RerankConfig, SummaryConfig, VectaraQueryConfig
except ImportError as e:
msg = "Could not import Vectara. Please install it with `pip install langchain-community`."
raise ImportError(msg) from e
vectara = Vectara(self.vectara_customer_id, self.vectara_corpus_id, self.vectara_api_key)
rerank_config = RerankConfig(self.reranker, self.reranker_k, self.diversity_bias)
summary_config = SummaryConfig(
is_enabled=True, max_results=self.max_results, response_lang=self.response_lang, prompt_name=self.prompt
)
config = VectaraQueryConfig(
lambda_val=self.lexical_interpolation,
filter=self.filter,
summary_config=summary_config,
rerank_config=rerank_config,
)
rag = vectara.as_rag(config)
response = rag.invoke(self.search_query, config={"callbacks": self.get_langchain_callbacks()})
text_output = response["answer"]
return Message(text=text_output)