Vector Stores
Vector databases are used to store and search for vectors. They can be used to store embeddings, search for similar vectors, and perform other vector operations.
Astra DB Vector Store
This component implements a Vector Store using Astra DB Serverless with search capabilities.
Parameters
Name | Display Name | Info |
---|---|---|
collection_name |
Collection Name |
The name of the collection within Astra DB Serverless where the vectors will be stored (required) |
token |
Astra DB Application Token |
Authentication token for accessing Astra DB Serverless (required) |
api_endpoint |
API Endpoint |
API endpoint URL for the Astra DB Serverless service (required) |
search_input |
Search Input |
Query string for similarity search |
ingest_data |
Ingest Data |
Data to be ingested into the vector store |
namespace |
Namespace |
Optional namespace within Astra DB Serverless to use for the collection |
embedding_service |
Embedding Model or Astra Vectorize |
Determines whether to use an Embedding Model or Astra Vectorize for the collection |
embedding |
Embedding Model |
Allows an embedding model configuration (when using Embedding Model) |
provider |
Vectorize Provider |
Provider for Astra Vectorize (when using Astra Vectorize) |
metric |
Metric |
Optional distance metric for vector comparisons |
batch_size |
Batch Size |
Optional number of data to process in a single batch |
setup_mode |
Setup Mode |
Configuration mode for setting up the vector store (options: "Sync", "Async", "Off", default: "Sync") |
pre_delete_collection |
Pre Delete Collection |
Boolean flag to determine whether to delete the collection before creating a new one |
number_of_results |
Number of Results |
Number of results to return in similarity search (default: 4) |
search_type |
Search Type |
Search type to use (options: "Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)") |
search_score_threshold |
Search Score Threshold |
Minimum similarity score threshold for search results |
search_filter |
Search Metadata Filter |
Optional dictionary of filters to apply to the search query |
Name | Display Name | Info |
---|---|---|
vector_store |
Vector Store |
Built Astra DB Serverless vector store |
search_results |
Search Results |
Results of the similarity search as a list of Data objects |
Component code
astradb.py
import os
from collections import defaultdict
import orjson
from astrapy import DataAPIClient
from astrapy.admin import parse_api_endpoint
from langchain_astradb import AstraDBVectorStore
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers import docs_to_data
from langflow.inputs import DictInput, FloatInput, MessageTextInput, NestedDictInput
from langflow.io import (
BoolInput,
DataInput,
DropdownInput,
HandleInput,
IntInput,
MultilineInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data
from langflow.utils.version import get_version_info
class AstraDBVectorStoreComponent(LCVectorStoreComponent):
display_name: str = "Astra DB"
description: str = "Implementation of Vector Store using Astra DB with search capabilities"
documentation: str = "https://docs.langflow.org/starter-projects-vector-store-rag"
name = "AstraDB"
icon: str = "AstraDB"
_cached_vector_store: AstraDBVectorStore | None = None
VECTORIZE_PROVIDERS_MAPPING = defaultdict(
list,
{
"Azure OpenAI": [
"azureOpenAI",
["text-embedding-3-small", "text-embedding-3-large", "text-embedding-ada-002"],
],
"Hugging Face - Dedicated": ["huggingfaceDedicated", ["endpoint-defined-model"]],
"Hugging Face - Serverless": [
"huggingface",
[
"sentence-transformers/all-MiniLM-L6-v2",
"intfloat/multilingual-e5-large",
"intfloat/multilingual-e5-large-instruct",
"BAAI/bge-small-en-v1.5",
"BAAI/bge-base-en-v1.5",
"BAAI/bge-large-en-v1.5",
],
],
"Jina AI": [
"jinaAI",
[
"jina-embeddings-v2-base-en",
"jina-embeddings-v2-base-de",
"jina-embeddings-v2-base-es",
"jina-embeddings-v2-base-code",
"jina-embeddings-v2-base-zh",
],
],
"Mistral AI": ["mistral", ["mistral-embed"]],
"Nvidia": ["nvidia", ["NV-Embed-QA"]],
"OpenAI": ["openai", ["text-embedding-3-small", "text-embedding-3-large", "text-embedding-ada-002"]],
"Upstage": ["upstageAI", ["solar-embedding-1-large"]],
"Voyage AI": [
"voyageAI",
["voyage-large-2-instruct", "voyage-law-2", "voyage-code-2", "voyage-large-2", "voyage-2"],
],
},
)
inputs = [
SecretStrInput(
name="token",
display_name="Astra DB Application Token",
info="Authentication token for accessing Astra DB.",
value="ASTRA_DB_APPLICATION_TOKEN",
required=True,
advanced=os.getenv("ASTRA_ENHANCED", "false").lower() == "true",
real_time_refresh=True,
),
SecretStrInput(
name="api_endpoint",
display_name="Database" if os.getenv("ASTRA_ENHANCED", "false").lower() == "true" else "API Endpoint",
info="API endpoint URL for the Astra DB service.",
value="ASTRA_DB_API_ENDPOINT",
required=True,
real_time_refresh=True,
),
DropdownInput(
name="collection_name",
display_name="Collection",
info="The name of the collection within Astra DB where the vectors will be stored.",
required=True,
refresh_button=True,
real_time_refresh=True,
options=["+ Create new collection"],
value="+ Create new collection",
),
StrInput(
name="collection_name_new",
display_name="Collection Name",
info="Name of the new collection to create.",
advanced=os.getenv("LANGFLOW_HOST") is not None,
required=os.getenv("LANGFLOW_HOST") is None,
),
StrInput(
name="keyspace",
display_name="Keyspace",
info="Optional keyspace within Astra DB to use for the collection.",
advanced=True,
),
MultilineInput(
name="search_input",
display_name="Search Input",
),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
advanced=True,
value=4,
),
DropdownInput(
name="search_type",
display_name="Search Type",
info="Search type to use",
options=["Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)"],
value="Similarity",
advanced=True,
),
FloatInput(
name="search_score_threshold",
display_name="Search Score Threshold",
info="Minimum similarity score threshold for search results. "
"(when using 'Similarity with score threshold')",
value=0,
advanced=True,
),
NestedDictInput(
name="advanced_search_filter",
display_name="Search Metadata Filter",
info="Optional dictionary of filters to apply to the search query.",
advanced=True,
),
DictInput(
name="search_filter",
display_name="[DEPRECATED] Search Metadata Filter",
info="Deprecated: use advanced_search_filter. Optional dictionary of filters to apply to the search query.",
advanced=True,
list=True,
),
DataInput(
name="ingest_data",
display_name="Ingest Data",
),
DropdownInput(
name="embedding_choice",
display_name="Embedding Model or Astra Vectorize",
info="Determines whether to use Astra Vectorize for the collection.",
options=["Embedding Model", "Astra Vectorize"],
real_time_refresh=True,
value="Embedding Model",
),
HandleInput(
name="embedding_model",
display_name="Embedding Model",
input_types=["Embeddings"],
info="Allows an embedding model configuration.",
),
DropdownInput(
name="metric",
display_name="Metric",
info="Optional distance metric for vector comparisons in the vector store.",
options=["cosine", "dot_product", "euclidean"],
value="cosine",
advanced=True,
),
IntInput(
name="batch_size",
display_name="Batch Size",
info="Optional number of data to process in a single batch.",
advanced=True,
),
IntInput(
name="bulk_insert_batch_concurrency",
display_name="Bulk Insert Batch Concurrency",
info="Optional concurrency level for bulk insert operations.",
advanced=True,
),
IntInput(
name="bulk_insert_overwrite_concurrency",
display_name="Bulk Insert Overwrite Concurrency",
info="Optional concurrency level for bulk insert operations that overwrite existing data.",
advanced=True,
),
IntInput(
name="bulk_delete_concurrency",
display_name="Bulk Delete Concurrency",
info="Optional concurrency level for bulk delete operations.",
advanced=True,
),
DropdownInput(
name="setup_mode",
display_name="Setup Mode",
info="Configuration mode for setting up the vector store, with options like 'Sync' or 'Off'.",
options=["Sync", "Off"],
advanced=True,
value="Sync",
),
BoolInput(
name="pre_delete_collection",
display_name="Pre Delete Collection",
info="Boolean flag to determine whether to delete the collection before creating a new one.",
advanced=True,
),
StrInput(
name="metadata_indexing_include",
display_name="Metadata Indexing Include",
info="Optional list of metadata fields to include in the indexing.",
list=True,
advanced=True,
),
StrInput(
name="metadata_indexing_exclude",
display_name="Metadata Indexing Exclude",
info="Optional list of metadata fields to exclude from the indexing.",
list=True,
advanced=True,
),
StrInput(
name="collection_indexing_policy",
display_name="Collection Indexing Policy",
info='Optional JSON string for the "indexing" field of the collection. '
"See https://docs.datastax.com/en/astra-db-serverless/api-reference/collections.html#the-indexing-option",
advanced=True,
),
]
def del_fields(self, build_config, field_list):
for field in field_list:
if field in build_config:
del build_config[field]
return build_config
def insert_in_dict(self, build_config, field_name, new_parameters):
# Insert the new key-value pair after the found key
for new_field_name, new_parameter in new_parameters.items():
# Get all the items as a list of tuples (key, value)
items = list(build_config.items())
# Find the index of the key to insert after
idx = len(items)
for i, (key, _) in enumerate(items):
if key == field_name:
idx = i + 1
break
items.insert(idx, (new_field_name, new_parameter))
# Clear the original dictionary and update with the modified items
build_config.clear()
build_config.update(items)
return build_config
def update_providers_mapping(self):
# If we don't have token or api_endpoint, we can't fetch the list of providers
if not self.token or not self.api_endpoint:
self.log("Astra DB token and API endpoint are required to fetch the list of Vectorize providers.")
return self.VECTORIZE_PROVIDERS_MAPPING
try:
self.log("Dynamically updating list of Vectorize providers.")
# Get the admin object
client = DataAPIClient(token=self.token)
admin = client.get_admin()
# Get the embedding providers
db_admin = admin.get_database_admin(self.api_endpoint)
embedding_providers = db_admin.find_embedding_providers().as_dict()
vectorize_providers_mapping = {}
# Map the provider display name to the provider key and models
for provider_key, provider_data in embedding_providers["embeddingProviders"].items():
display_name = provider_data["displayName"]
models = [model["name"] for model in provider_data["models"]]
vectorize_providers_mapping[display_name] = [provider_key, models]
# Sort the resulting dictionary
return defaultdict(list, dict(sorted(vectorize_providers_mapping.items())))
except Exception as e: # noqa: BLE001
self.log(f"Error fetching Vectorize providers: {e}")
return self.VECTORIZE_PROVIDERS_MAPPING
def get_database(self):
try:
client = DataAPIClient(token=self.token)
return client.get_database(
self.api_endpoint,
token=self.token,
)
except Exception as e: # noqa: BLE001
self.log(f"Error getting database: {e}")
return None
def _initialize_collection_options(self):
database = self.get_database()
if database is None:
return ["+ Create new collection"]
try:
collections = [collection.name for collection in database.list_collections()]
except Exception as e: # noqa: BLE001
self.log(f"Error fetching collections: {e}")
return ["+ Create new collection"]
return [*collections, "+ Create new collection"]
def get_collection_choice(self):
collection_name = self.collection_name
if collection_name == "+ Create new collection":
return self.collection_name_new
return collection_name
def get_collection_options(self):
# Only get the options if the collection exists
database = self.get_database()
if database is None:
return None
collection_name = self.get_collection_choice()
try:
collection = database.get_collection(collection_name)
collection_options = collection.options()
except Exception as _: # noqa: BLE001
return None
return collection_options.vector
def update_build_config(self, build_config: dict, field_value: str, field_name: str | None = None):
# Refresh the collection name options
build_config["collection_name"]["options"] = self._initialize_collection_options()
# If the collection name is set to "+ Create new collection", show embedding choice
if field_name == "collection_name" and field_value == "+ Create new collection":
build_config["embedding_choice"]["advanced"] = False
build_config["embedding_choice"]["value"] = "Embedding Model"
build_config["embedding_model"]["advanced"] = False
build_config["collection_name_new"]["advanced"] = False
build_config["collection_name_new"]["required"] = True
# But if it's not, hide embedding choice
elif field_name == "collection_name" and field_value != "+ Create new collection":
build_config["embedding_choice"]["advanced"] = True
build_config["collection_name_new"]["advanced"] = True
build_config["collection_name_new"]["required"] = False
build_config["collection_name_new"]["value"] = ""
# Get the collection options for the selected collection
collection_options = self.get_collection_options()
# If the collection options are available (DB exists), show the advanced options
if collection_options:
build_config["embedding_choice"]["advanced"] = True
if collection_options.service:
self.del_fields(
build_config,
[
"embedding_provider",
"model",
"z_01_model_parameters",
"z_02_api_key_name",
"z_03_provider_api_key",
"z_04_authentication",
],
)
build_config["embedding_model"]["advanced"] = True
build_config["embedding_choice"]["value"] = "Astra Vectorize"
else:
build_config["embedding_model"]["advanced"] = False
build_config["embedding_provider"]["advanced"] = False
build_config["embedding_choice"]["value"] = "Embedding Model"
elif field_name == "embedding_choice":
if field_value == "Astra Vectorize":
build_config["embedding_model"]["advanced"] = True
# Update the providers mapping
vectorize_providers = self.update_providers_mapping()
new_parameter = DropdownInput(
name="embedding_provider",
display_name="Embedding Provider",
options=vectorize_providers.keys(),
value="",
required=True,
real_time_refresh=True,
).to_dict()
self.insert_in_dict(build_config, "embedding_choice", {"embedding_provider": new_parameter})
else:
build_config["embedding_model"]["advanced"] = False
self.del_fields(
build_config,
[
"embedding_provider",
"model",
"z_01_model_parameters",
"z_02_api_key_name",
"z_03_provider_api_key",
"z_04_authentication",
],
)
elif field_name == "embedding_provider":
self.del_fields(
build_config,
["model", "z_01_model_parameters", "z_02_api_key_name", "z_03_provider_api_key", "z_04_authentication"],
)
# Update the providers mapping
vectorize_providers = self.update_providers_mapping()
model_options = vectorize_providers[field_value][1]
new_parameter = DropdownInput(
name="model",
display_name="Model",
info="The embedding model to use for the selected provider. Each provider has a different set of "
"models available (full list at "
"https://docs.datastax.com/en/astra-db-serverless/databases/embedding-generation.html):\n\n"
f"{', '.join(model_options)}",
options=model_options,
value=None,
required=True,
real_time_refresh=True,
).to_dict()
self.insert_in_dict(build_config, "embedding_provider", {"model": new_parameter})
elif field_name == "model":
self.del_fields(
build_config,
["z_01_model_parameters", "z_02_api_key_name", "z_03_provider_api_key", "z_04_authentication"],
)
new_parameter_1 = DictInput(
name="z_01_model_parameters",
display_name="Model Parameters",
list=True,
).to_dict()
new_parameter_2 = MessageTextInput(
name="z_02_api_key_name",
display_name="API Key Name",
info="The name of the embeddings provider API key stored on Astra. "
"If set, it will override the 'ProviderKey' in the authentication parameters.",
).to_dict()
new_parameter_3 = SecretStrInput(
load_from_db=False,
name="z_03_provider_api_key",
display_name="Provider API Key",
info="An alternative to the Astra Authentication that passes an API key for the provider "
"with each request to Astra DB. "
"This may be used when Vectorize is configured for the collection, "
"but no corresponding provider secret is stored within Astra's key management system.",
).to_dict()
new_parameter_4 = DictInput(
name="z_04_authentication",
display_name="Authentication Parameters",
list=True,
).to_dict()
self.insert_in_dict(
build_config,
"model",
{
"z_01_model_parameters": new_parameter_1,
"z_02_api_key_name": new_parameter_2,
"z_03_provider_api_key": new_parameter_3,
"z_04_authentication": new_parameter_4,
},
)
return build_config
def build_vectorize_options(self, **kwargs):
for attribute in [
"embedding_provider",
"model",
"z_01_model_parameters",
"z_02_api_key_name",
"z_03_provider_api_key",
"z_04_authentication",
]:
if not hasattr(self, attribute):
setattr(self, attribute, None)
# Fetch values from kwargs if any self.* attributes are None
provider_mapping = self.update_providers_mapping()
provider_value = provider_mapping.get(self.embedding_provider, [None])[0] or kwargs.get("embedding_provider")
model_name = self.model or kwargs.get("model")
authentication = {**(self.z_04_authentication or {}), **kwargs.get("z_04_authentication", {})}
parameters = self.z_01_model_parameters or kwargs.get("z_01_model_parameters", {})
# Set the API key name if provided
api_key_name = self.z_02_api_key_name or kwargs.get("z_02_api_key_name")
provider_key = self.z_03_provider_api_key or kwargs.get("z_03_provider_api_key")
if api_key_name:
authentication["providerKey"] = api_key_name
if authentication:
provider_key = None
authentication["providerKey"] = authentication["providerKey"].split(".")[0]
# Set authentication and parameters to None if no values are provided
if not authentication:
authentication = None
if not parameters:
parameters = None
return {
# must match astrapy.info.CollectionVectorServiceOptions
"collection_vector_service_options": {
"provider": provider_value,
"modelName": model_name,
"authentication": authentication,
"parameters": parameters,
},
"collection_embedding_api_key": provider_key,
}
@check_cached_vector_store
def build_vector_store(self, vectorize_options=None):
try:
from langchain_astradb import AstraDBVectorStore
from langchain_astradb.utils.astradb import SetupMode
except ImportError as e:
msg = (
"Could not import langchain Astra DB integration package. "
"Please install it with `pip install langchain-astradb`."
)
raise ImportError(msg) from e
try:
if not self.setup_mode:
self.setup_mode = self._inputs["setup_mode"].options[0]
setup_mode_value = SetupMode[self.setup_mode.upper()]
except KeyError as e:
msg = f"Invalid setup mode: {self.setup_mode}"
raise ValueError(msg) from e
metric_value = self.metric or None
autodetect = False
if self.embedding_choice == "Embedding Model":
embedding_dict = {"embedding": self.embedding_model}
# Use autodetect if the collection name is NOT set to "+ Create new collection"
elif self.collection_name != "+ Create new collection":
autodetect = True
metric_value = None
setup_mode_value = None
embedding_dict = {}
else:
from astrapy.info import CollectionVectorServiceOptions
# Grab the collection options if available
collection_options = self.get_collection_options()
# Ensure collection_options and its nested attributes are handled safely
authentication = getattr(self, "z_04_authentication", {}) or (
collection_options.service.authentication
if collection_options and collection_options.service and collection_options.service.authentication
else {}
)
# Build the vectorize options dictionary
dict_options = vectorize_options or self.build_vectorize_options(
embedding_provider=(
getattr(self, "embedding_provider", None)
or (
collection_options.service.provider
if collection_options and collection_options.service
else None
)
),
model=(
getattr(self, "model", None)
or (
collection_options.service.model_name
if collection_options and collection_options.service
else None
)
),
z_01_model_parameters=(
getattr(self, "z_01_model_parameters", None)
or (
collection_options.service.parameters
if collection_options and collection_options.service
else None
)
),
z_02_api_key_name=(
getattr(self, "z_02_api_key_name", None)
or (authentication.get("apiKey") if authentication else None)
),
z_03_provider_api_key=(
getattr(self, "z_03_provider_api_key", None)
or (authentication.get("providerKey") if authentication else None)
),
z_04_authentication=authentication,
)
# Set the embedding dictionary
embedding_dict = {
"collection_vector_service_options": CollectionVectorServiceOptions.from_dict(
dict_options.get("collection_vector_service_options")
),
"collection_embedding_api_key": dict_options.get("collection_embedding_api_key"),
}
# Get Langflow version and platform information
__version__ = get_version_info()["version"]
langflow_prefix = ""
if os.getenv("LANGFLOW_HOST") is not None:
langflow_prefix = "ds-"
try:
vector_store = AstraDBVectorStore(
token=self.token,
api_endpoint=self.api_endpoint,
namespace=self.keyspace or None,
collection_name=self.get_collection_choice(),
autodetect_collection=autodetect,
environment=(
parse_api_endpoint(getattr(self, "api_endpoint", None)).environment
if getattr(self, "api_endpoint", None)
else None
),
metric=metric_value,
batch_size=self.batch_size or None,
bulk_insert_batch_concurrency=self.bulk_insert_batch_concurrency or None,
bulk_insert_overwrite_concurrency=self.bulk_insert_overwrite_concurrency or None,
bulk_delete_concurrency=self.bulk_delete_concurrency or None,
setup_mode=setup_mode_value,
pre_delete_collection=self.pre_delete_collection,
metadata_indexing_include=[s for s in self.metadata_indexing_include if s] or None,
metadata_indexing_exclude=[s for s in self.metadata_indexing_exclude if s] or None,
collection_indexing_policy=orjson.dumps(self.collection_indexing_policy)
if self.collection_indexing_policy
else None,
ext_callers=[(f"{langflow_prefix}langflow", __version__)],
**embedding_dict,
)
except Exception as e:
msg = f"Error initializing AstraDBVectorStore: {e}"
raise ValueError(msg) from e
self._add_documents_to_vector_store(vector_store)
return vector_store
def _add_documents_to_vector_store(self, vector_store) -> None:
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
msg = "Vector Store Inputs must be Data objects."
raise TypeError(msg)
if documents:
self.log(f"Adding {len(documents)} documents to the Vector Store.")
try:
vector_store.add_documents(documents)
except Exception as e:
msg = f"Error adding documents to AstraDBVectorStore: {e}"
raise ValueError(msg) from e
else:
self.log("No documents to add to the Vector Store.")
def _map_search_type(self) -> str:
if self.search_type == "Similarity with score threshold":
return "similarity_score_threshold"
if self.search_type == "MMR (Max Marginal Relevance)":
return "mmr"
return "similarity"
def _build_search_args(self):
query = self.search_input if isinstance(self.search_input, str) and self.search_input.strip() else None
search_filter = (
{k: v for k, v in self.search_filter.items() if k and v and k.strip()} if self.search_filter else None
)
if query:
args = {
"query": query,
"search_type": self._map_search_type(),
"k": self.number_of_results,
"score_threshold": self.search_score_threshold,
}
elif self.advanced_search_filter or search_filter:
args = {
"n": self.number_of_results,
}
else:
return {}
filter_arg = self.advanced_search_filter or {}
if search_filter:
self.log(self.log(f"`search_filter` is deprecated. Use `advanced_search_filter`. Cleaned: {search_filter}"))
filter_arg.update(search_filter)
if filter_arg:
args["filter"] = filter_arg
return args
def search_documents(self, vector_store=None) -> list[Data]:
vector_store = vector_store or self.build_vector_store()
self.log(f"Search input: {self.search_input}")
self.log(f"Search type: {self.search_type}")
self.log(f"Number of results: {self.number_of_results}")
try:
search_args = self._build_search_args()
except Exception as e:
msg = f"Error in AstraDBVectorStore._build_search_args: {e}"
raise ValueError(msg) from e
if not search_args:
self.log("No search input or filters provided. Skipping search.")
return []
docs = []
search_method = "search" if "query" in search_args else "metadata_search"
try:
self.log(f"Calling vector_store.{search_method} with args: {search_args}")
docs = getattr(vector_store, search_method)(**search_args)
except Exception as e:
msg = f"Error performing {search_method} in AstraDBVectorStore: {e}"
raise ValueError(msg) from e
self.log(f"Retrieved documents: {len(docs)}")
data = docs_to_data(docs)
self.log(f"Converted documents to data: {len(data)}")
self.status = data
return data
def get_retriever_kwargs(self):
search_args = self._build_search_args()
return {
"search_type": self._map_search_type(),
"search_kwargs": search_args,
}
Cassandra
This component creates a Cassandra Vector Store with search capabilities. For more information, see the Cassandra documentation.
Parameters
Name | Type | Description |
---|---|---|
database_ref |
String |
Contact points for the database or AstraDB database ID |
username |
String |
Username for the database (leave empty for AstraDB) |
token |
SecretString |
User password for the database or AstraDB token |
keyspace |
String |
Table Keyspace or AstraDB namespace |
table_name |
String |
Name of the table or AstraDB collection |
ttl_seconds |
Integer |
Time-to-live for added texts |
batch_size |
Integer |
Number of data to process in a single batch |
setup_mode |
String |
Configuration mode for setting up the Cassandra table |
cluster_kwargs |
Dict |
Additional keyword arguments for the Cassandra cluster |
search_query |
String |
Query for similarity search |
ingest_data |
Data |
Data to be ingested into the vector store |
embedding |
Embeddings |
Embedding function to use |
number_of_results |
Integer |
Number of results to return in search |
search_type |
String |
Type of search to perform |
search_score_threshold |
Float |
Minimum similarity score for search results |
search_filter |
Dict |
Metadata filters for search query |
body_search |
String |
Document textual search terms |
enable_body_search |
Boolean |
Flag to enable body search |
Name | Type | Description |
---|---|---|
vector_store |
Cassandra |
Cassandra vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
cassandra.py
from langchain_community.vectorstores import Cassandra
from loguru import logger
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.inputs import BoolInput, DictInput, FloatInput
from langflow.io import (
DataInput,
DropdownInput,
HandleInput,
IntInput,
MessageTextInput,
MultilineInput,
SecretStrInput,
)
from langflow.schema import Data
class CassandraVectorStoreComponent(LCVectorStoreComponent):
display_name = "Cassandra"
description = "Cassandra Vector Store with search capabilities"
documentation = "https://python.langchain.com/docs/modules/data_connection/vectorstores/integrations/cassandra"
name = "Cassandra"
icon = "Cassandra"
inputs = [
MessageTextInput(
name="database_ref",
display_name="Contact Points / Astra Database ID",
info="Contact points for the database (or AstraDB database ID)",
required=True,
),
MessageTextInput(
name="username", display_name="Username", info="Username for the database (leave empty for AstraDB)."
),
SecretStrInput(
name="token",
display_name="Password / AstraDB Token",
info="User password for the database (or AstraDB token).",
required=True,
),
MessageTextInput(
name="keyspace",
display_name="Keyspace",
info="Table Keyspace (or AstraDB namespace).",
required=True,
),
MessageTextInput(
name="table_name",
display_name="Table Name",
info="The name of the table (or AstraDB collection) where vectors will be stored.",
required=True,
),
IntInput(
name="ttl_seconds",
display_name="TTL Seconds",
info="Optional time-to-live for the added texts.",
advanced=True,
),
IntInput(
name="batch_size",
display_name="Batch Size",
info="Optional number of data to process in a single batch.",
value=16,
advanced=True,
),
DropdownInput(
name="setup_mode",
display_name="Setup Mode",
info="Configuration mode for setting up the Cassandra table, with options like 'Sync', 'Async', or 'Off'.",
options=["Sync", "Async", "Off"],
value="Sync",
advanced=True,
),
DictInput(
name="cluster_kwargs",
display_name="Cluster arguments",
info="Optional dictionary of additional keyword arguments for the Cassandra cluster.",
advanced=True,
is_list=True,
),
MultilineInput(name="search_query", display_name="Search Query"),
DataInput(
name="ingest_data",
display_name="Ingest Data",
is_list=True,
),
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
DropdownInput(
name="search_type",
display_name="Search Type",
info="Search type to use",
options=["Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)"],
value="Similarity",
advanced=True,
),
FloatInput(
name="search_score_threshold",
display_name="Search Score Threshold",
info="Minimum similarity score threshold for search results. "
"(when using 'Similarity with score threshold')",
value=0,
advanced=True,
),
DictInput(
name="search_filter",
display_name="Search Metadata Filter",
info="Optional dictionary of filters to apply to the search query.",
advanced=True,
is_list=True,
),
MessageTextInput(
name="body_search",
display_name="Search Body",
info="Document textual search terms to apply to the search query.",
advanced=True,
),
BoolInput(
name="enable_body_search",
display_name="Enable Body Search",
info="Flag to enable body search. This must be enabled BEFORE the table is created.",
value=False,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> Cassandra:
try:
import cassio
from langchain_community.utilities.cassandra import SetupMode
except ImportError as e:
msg = "Could not import cassio integration package. Please install it with `pip install cassio`."
raise ImportError(msg) from e
from uuid import UUID
database_ref = self.database_ref
try:
UUID(self.database_ref)
is_astra = True
except ValueError:
is_astra = False
if "," in self.database_ref:
# use a copy because we can't change the type of the parameter
database_ref = self.database_ref.split(",")
if is_astra:
cassio.init(
database_id=database_ref,
token=self.token,
cluster_kwargs=self.cluster_kwargs,
)
else:
cassio.init(
contact_points=database_ref,
username=self.username,
password=self.token,
cluster_kwargs=self.cluster_kwargs,
)
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
body_index_options = [("index_analyzer", "STANDARD")] if self.enable_body_search else None
if self.setup_mode == "Off":
setup_mode = SetupMode.OFF
elif self.setup_mode == "Sync":
setup_mode = SetupMode.SYNC
else:
setup_mode = SetupMode.ASYNC
if documents:
logger.debug(f"Adding {len(documents)} documents to the Vector Store.")
table = Cassandra.from_documents(
documents=documents,
embedding=self.embedding,
table_name=self.table_name,
keyspace=self.keyspace,
ttl_seconds=self.ttl_seconds or None,
batch_size=self.batch_size,
body_index_options=body_index_options,
)
else:
logger.debug("No documents to add to the Vector Store.")
table = Cassandra(
embedding=self.embedding,
table_name=self.table_name,
keyspace=self.keyspace,
ttl_seconds=self.ttl_seconds or None,
body_index_options=body_index_options,
setup_mode=setup_mode,
)
return table
def _map_search_type(self) -> str:
if self.search_type == "Similarity with score threshold":
return "similarity_score_threshold"
if self.search_type == "MMR (Max Marginal Relevance)":
return "mmr"
return "similarity"
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
logger.debug(f"Search input: {self.search_query}")
logger.debug(f"Search type: {self.search_type}")
logger.debug(f"Number of results: {self.number_of_results}")
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
try:
search_type = self._map_search_type()
search_args = self._build_search_args()
logger.debug(f"Search args: {search_args}")
docs = vector_store.search(query=self.search_query, search_type=search_type, **search_args)
except KeyError as e:
if "content" in str(e):
msg = (
"You should ingest data through Langflow (or LangChain) to query it in Langflow. "
"Your collection does not contain a field name 'content'."
)
raise ValueError(msg) from e
raise
logger.debug(f"Retrieved documents: {len(docs)}")
data = docs_to_data(docs)
self.status = data
return data
return []
def _build_search_args(self):
args = {
"k": self.number_of_results,
"score_threshold": self.search_score_threshold,
}
if self.search_filter:
clean_filter = {k: v for k, v in self.search_filter.items() if k and v}
if len(clean_filter) > 0:
args["filter"] = clean_filter
if self.body_search:
if not self.enable_body_search:
msg = "You should enable body search when creating the table to search the body field."
raise ValueError(msg)
args["body_search"] = self.body_search
return args
def get_retriever_kwargs(self):
search_args = self._build_search_args()
return {
"search_type": self._map_search_type(),
"search_kwargs": search_args,
}
Cassandra Graph Vector Store
This component implements a Cassandra Graph Vector Store with search capabilities.
Parameters
Name | Display Name | Info |
---|---|---|
database_ref |
Contact Points / Astra Database ID |
Contact points for the database or AstraDB database ID (required) |
username |
Username |
Username for the database (leave empty for AstraDB) |
token |
Password / AstraDB Token |
User password for the database or AstraDB token (required) |
keyspace |
Keyspace |
Table Keyspace or AstraDB namespace (required) |
table_name |
Table Name |
The name of the table or AstraDB collection where vectors will be stored (required) |
setup_mode |
Setup Mode |
Configuration mode for setting up the Cassandra table (options: "Sync", "Off", default: "Sync") |
cluster_kwargs |
Cluster arguments |
Optional dictionary of additional keyword arguments for the Cassandra cluster |
search_query |
Search Query |
Query string for similarity search |
ingest_data |
Ingest Data |
Data to be ingested into the vector store (list of Data objects) |
embedding |
Embedding |
Embedding model to use |
number_of_results |
Number of Results |
Number of results to return in similarity search (default: 4) |
search_type |
Search Type |
Search type to use (options: "Traversal", "MMR traversal", "Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)", default: "Traversal") |
depth |
Depth of traversal |
The maximum depth of edges to traverse (for "Traversal" or "MMR traversal" search types, default: 1) |
search_score_threshold |
Search Score Threshold |
Minimum similarity score threshold for search results (for "Similarity with score threshold" search type) |
search_filter |
Search Metadata Filter |
Optional dictionary of filters to apply to the search query |
Name | Display Name | Info |
---|---|---|
vector_store |
Vector Store |
Built Cassandra Graph vector store |
search_results |
Search Results |
Results of the similarity search as a list of Data objects |
Component code
cassandra_graph.py
from uuid import UUID
from langchain_community.graph_vectorstores import CassandraGraphVectorStore
from loguru import logger
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.inputs import DictInput, FloatInput
from langflow.io import (
DataInput,
DropdownInput,
HandleInput,
IntInput,
MessageTextInput,
MultilineInput,
SecretStrInput,
)
from langflow.schema import Data
class CassandraGraphVectorStoreComponent(LCVectorStoreComponent):
display_name = "Cassandra Graph"
description = "Cassandra Graph Vector Store"
documentation = "https://python.langchain.com/v0.2/api_reference/community/graph_vectorstores.html"
name = "CassandraGraph"
icon = "Cassandra"
inputs = [
MessageTextInput(
name="database_ref",
display_name="Contact Points / Astra Database ID",
info="Contact points for the database (or AstraDB database ID)",
required=True,
),
MessageTextInput(
name="username", display_name="Username", info="Username for the database (leave empty for AstraDB)."
),
SecretStrInput(
name="token",
display_name="Password / AstraDB Token",
info="User password for the database (or AstraDB token).",
required=True,
),
MessageTextInput(
name="keyspace",
display_name="Keyspace",
info="Table Keyspace (or AstraDB namespace).",
required=True,
),
MessageTextInput(
name="table_name",
display_name="Table Name",
info="The name of the table (or AstraDB collection) where vectors will be stored.",
required=True,
),
DropdownInput(
name="setup_mode",
display_name="Setup Mode",
info="Configuration mode for setting up the Cassandra table, with options like 'Sync' or 'Off'.",
options=["Sync", "Off"],
value="Sync",
advanced=True,
),
DictInput(
name="cluster_kwargs",
display_name="Cluster arguments",
info="Optional dictionary of additional keyword arguments for the Cassandra cluster.",
advanced=True,
is_list=True,
),
MultilineInput(name="search_query", display_name="Search Query"),
DataInput(
name="ingest_data",
display_name="Ingest Data",
is_list=True,
),
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
DropdownInput(
name="search_type",
display_name="Search Type",
info="Search type to use",
options=[
"Traversal",
"MMR traversal",
"Similarity",
"Similarity with score threshold",
"MMR (Max Marginal Relevance)",
],
value="Traversal",
advanced=True,
),
IntInput(
name="depth",
display_name="Depth of traversal",
info="The maximum depth of edges to traverse. (when using 'Traversal' or 'MMR traversal')",
value=1,
advanced=True,
),
FloatInput(
name="search_score_threshold",
display_name="Search Score Threshold",
info="Minimum similarity score threshold for search results. "
"(when using 'Similarity with score threshold')",
value=0,
advanced=True,
),
DictInput(
name="search_filter",
display_name="Search Metadata Filter",
info="Optional dictionary of filters to apply to the search query.",
advanced=True,
is_list=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> CassandraGraphVectorStore:
try:
import cassio
from langchain_community.utilities.cassandra import SetupMode
except ImportError as e:
msg = "Could not import cassio integration package. Please install it with `pip install cassio`."
raise ImportError(msg) from e
database_ref = self.database_ref
try:
UUID(self.database_ref)
is_astra = True
except ValueError:
is_astra = False
if "," in self.database_ref:
# use a copy because we can't change the type of the parameter
database_ref = self.database_ref.split(",")
if is_astra:
cassio.init(
database_id=database_ref,
token=self.token,
cluster_kwargs=self.cluster_kwargs,
)
else:
cassio.init(
contact_points=database_ref,
username=self.username,
password=self.token,
cluster_kwargs=self.cluster_kwargs,
)
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
setup_mode = SetupMode.OFF if self.setup_mode == "Off" else SetupMode.SYNC
if documents:
logger.debug(f"Adding {len(documents)} documents to the Vector Store.")
store = CassandraGraphVectorStore.from_documents(
documents=documents,
embedding=self.embedding,
node_table=self.table_name,
keyspace=self.keyspace,
)
else:
logger.debug("No documents to add to the Vector Store.")
store = CassandraGraphVectorStore(
embedding=self.embedding,
node_table=self.table_name,
keyspace=self.keyspace,
setup_mode=setup_mode,
)
return store
def _map_search_type(self) -> str:
if self.search_type == "Similarity":
return "similarity"
if self.search_type == "Similarity with score threshold":
return "similarity_score_threshold"
if self.search_type == "MMR (Max Marginal Relevance)":
return "mmr"
if self.search_type == "MMR Traversal":
return "mmr_traversal"
return "traversal"
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
logger.debug(f"Search input: {self.search_query}")
logger.debug(f"Search type: {self.search_type}")
logger.debug(f"Number of results: {self.number_of_results}")
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
try:
search_type = self._map_search_type()
search_args = self._build_search_args()
logger.debug(f"Search args: {search_args}")
docs = vector_store.search(query=self.search_query, search_type=search_type, **search_args)
except KeyError as e:
if "content" in str(e):
msg = (
"You should ingest data through Langflow (or LangChain) to query it in Langflow. "
"Your collection does not contain a field name 'content'."
)
raise ValueError(msg) from e
raise
logger.debug(f"Retrieved documents: {len(docs)}")
data = docs_to_data(docs)
self.status = data
return data
return []
def _build_search_args(self):
args = {
"k": self.number_of_results,
"score_threshold": self.search_score_threshold,
"depth": self.depth,
}
if self.search_filter:
clean_filter = {k: v for k, v in self.search_filter.items() if k and v}
if len(clean_filter) > 0:
args["filter"] = clean_filter
return args
def get_retriever_kwargs(self):
search_args = self._build_search_args()
return {
"search_type": self._map_search_type(),
"search_kwargs": search_args,
}
Chroma DB
This component creates a Chroma Vector Store with search capabilities. For more information, see the Chroma documentation.
Parameters
Name | Type | Description |
---|---|---|
collection_name |
String |
The name of the Chroma collection. Default: "langflow". |
persist_directory |
String |
The directory to persist the Chroma database. |
search_query |
String |
The query to search for in the vector store. |
ingest_data |
Data |
The data to ingest into the vector store (list of Data objects). |
embedding |
Embeddings |
The embedding function to use for the vector store. |
chroma_server_cors_allow_origins |
String |
CORS allow origins for the Chroma server. |
chroma_server_host |
String |
Host for the Chroma server. |
chroma_server_http_port |
Integer |
HTTP port for the Chroma server. |
chroma_server_grpc_port |
Integer |
gRPC port for the Chroma server. |
chroma_server_ssl_enabled |
Boolean |
Enable SSL for the Chroma server. |
allow_duplicates |
Boolean |
Allow duplicate documents in the vector store. |
search_type |
String |
Type of search to perform: "Similarity" or "MMR". |
number_of_results |
Integer |
Number of results to return from the search. Default: 10. |
limit |
Integer |
Limit the number of records to compare when Allow Duplicates is False. |
Component code
chroma.py
from copy import deepcopy
from chromadb.config import Settings
from langchain_chroma import Chroma
from loguru import logger
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.base.vectorstores.utils import chroma_collection_to_data
from langflow.io import BoolInput, DataInput, DropdownInput, HandleInput, IntInput, MultilineInput, StrInput
from langflow.schema import Data
class ChromaVectorStoreComponent(LCVectorStoreComponent):
"""Chroma Vector Store with search capabilities."""
display_name: str = "Chroma DB"
description: str = "Chroma Vector Store with search capabilities"
documentation = "https://python.langchain.com/docs/integrations/vectorstores/chroma"
name = "Chroma"
icon = "Chroma"
inputs = [
StrInput(
name="collection_name",
display_name="Collection Name",
value="langflow",
),
StrInput(
name="persist_directory",
display_name="Persist Directory",
),
MultilineInput(
name="search_query",
display_name="Search Query",
),
DataInput(
name="ingest_data",
display_name="Ingest Data",
is_list=True,
),
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
StrInput(
name="chroma_server_cors_allow_origins",
display_name="Server CORS Allow Origins",
advanced=True,
),
StrInput(
name="chroma_server_host",
display_name="Server Host",
advanced=True,
),
IntInput(
name="chroma_server_http_port",
display_name="Server HTTP Port",
advanced=True,
),
IntInput(
name="chroma_server_grpc_port",
display_name="Server gRPC Port",
advanced=True,
),
BoolInput(
name="chroma_server_ssl_enabled",
display_name="Server SSL Enabled",
advanced=True,
),
BoolInput(
name="allow_duplicates",
display_name="Allow Duplicates",
advanced=True,
info="If false, will not add documents that are already in the Vector Store.",
),
DropdownInput(
name="search_type",
display_name="Search Type",
options=["Similarity", "MMR"],
value="Similarity",
advanced=True,
),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
advanced=True,
value=10,
),
IntInput(
name="limit",
display_name="Limit",
advanced=True,
info="Limit the number of records to compare when Allow Duplicates is False.",
),
]
@check_cached_vector_store
def build_vector_store(self) -> Chroma:
"""Builds the Chroma object."""
try:
from chromadb import Client
from langchain_chroma import Chroma
except ImportError as e:
msg = "Could not import Chroma integration package. Please install it with `pip install langchain-chroma`."
raise ImportError(msg) from e
# Chroma settings
chroma_settings = None
client = None
if self.chroma_server_host:
chroma_settings = Settings(
chroma_server_cors_allow_origins=self.chroma_server_cors_allow_origins or [],
chroma_server_host=self.chroma_server_host,
chroma_server_http_port=self.chroma_server_http_port or None,
chroma_server_grpc_port=self.chroma_server_grpc_port or None,
chroma_server_ssl_enabled=self.chroma_server_ssl_enabled,
)
client = Client(settings=chroma_settings)
# Check persist_directory and expand it if it is a relative path
persist_directory = self.resolve_path(self.persist_directory) if self.persist_directory is not None else None
chroma = Chroma(
persist_directory=persist_directory,
client=client,
embedding_function=self.embedding,
collection_name=self.collection_name,
)
self._add_documents_to_vector_store(chroma)
self.status = chroma_collection_to_data(chroma.get(limit=self.limit))
return chroma
def _add_documents_to_vector_store(self, vector_store: "Chroma") -> None:
"""Adds documents to the Vector Store."""
if not self.ingest_data:
self.status = ""
return
_stored_documents_without_id = []
if self.allow_duplicates:
stored_data = []
else:
stored_data = chroma_collection_to_data(vector_store.get(limit=self.limit))
for value in deepcopy(stored_data):
del value.id
_stored_documents_without_id.append(value)
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
if _input not in _stored_documents_without_id:
documents.append(_input.to_lc_document())
else:
msg = "Vector Store Inputs must be Data objects."
raise TypeError(msg)
if documents and self.embedding is not None:
logger.debug(f"Adding {len(documents)} documents to the Vector Store.")
vector_store.add_documents(documents)
else:
logger.debug("No documents to add to the Vector Store.")
Clickhouse
This component implements a Clickhouse Vector Store with search capabilities using the LangChain framework.
Parameters
Name | Display Name | Info |
---|---|---|
host |
hostname |
Clickhouse server hostname (required, default: "localhost") |
port |
port |
Clickhouse server port (required, default: 8123) |
database |
database |
Clickhouse database name (required) |
table |
Table name |
Clickhouse table name (required) |
username |
The ClickHouse user name. |
Username for authentication (required) |
password |
The password for username. |
Password for authentication (required) |
index_type |
index_type |
Type of the index (options: "annoy", "vector_similarity", default: "annoy") |
metric |
metric |
Metric to compute distance (options: "angular", "euclidean", "manhattan", "hamming", "dot", default: "angular") |
secure |
Use https/TLS |
Overrides inferred values from the interface or port arguments (default: false) |
index_param |
Param of the index |
Index parameters (default: "'L2Distance',100") |
index_query_params |
index query params |
Additional index query parameters |
search_query |
Search Query |
Query string for similarity search |
ingest_data |
Ingest Data |
Data to be ingested into the vector store |
embedding |
Embedding |
Embedding model to use |
number_of_results |
Number of Results |
Number of results to return in similarity search (default: 4) |
score_threshold |
Score threshold |
Threshold for similarity scores |
Name | Display Name | Info |
---|---|---|
vector_store |
Vector Store |
Built Clickhouse vector store |
search_results |
Search Results |
Results of the similarity search as a list of Data objects |
Component code
clickhouse.py
from langchain_community.vectorstores import Clickhouse, ClickhouseSettings
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.inputs import BoolInput, FloatInput
from langflow.io import (
DataInput,
DictInput,
DropdownInput,
HandleInput,
IntInput,
MultilineInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data
class ClickhouseVectorStoreComponent(LCVectorStoreComponent):
display_name = "Clickhouse"
description = "Clickhouse Vector Store with search capabilities"
documentation = "https://python.langchain.com/v0.2/docs/integrations/vectorstores/clickhouse/"
name = "Clickhouse"
icon = "Clickhouse"
inputs = [
StrInput(name="host", display_name="hostname", required=True, value="localhost"),
IntInput(name="port", display_name="port", required=True, value=8123),
StrInput(name="database", display_name="database", required=True),
StrInput(name="table", display_name="Table name", required=True),
StrInput(name="username", display_name="The ClickHouse user name.", required=True),
SecretStrInput(name="password", display_name="The password for username.", required=True),
DropdownInput(
name="index_type",
display_name="index_type",
options=["annoy", "vector_similarity"],
info="Type of the index.",
value="annoy",
advanced=True,
),
DropdownInput(
name="metric",
display_name="metric",
options=["angular", "euclidean", "manhattan", "hamming", "dot"],
info="Metric to compute distance.",
value="angular",
advanced=True,
),
BoolInput(
name="secure",
display_name="Use https/TLS. This overrides inferred values from the interface or port arguments.",
value=False,
advanced=True,
),
StrInput(name="index_param", display_name="Param of the index", value="'L2Distance',100", advanced=True),
DictInput(name="index_query_params", display_name="index query params", advanced=True),
MultilineInput(name="search_query", display_name="Search Query"),
DataInput(name="ingest_data", display_name="Ingest Data", is_list=True),
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
FloatInput(name="score_threshold", display_name="Score threshold", advanced=True),
]
@check_cached_vector_store
def build_vector_store(self) -> Clickhouse:
try:
import clickhouse_connect
except ImportError as e:
msg = (
"Failed to import Clickhouse dependencies. "
"Install it using `pip install langflow[clickhouse-connect] --pre`"
)
raise ImportError(msg) from e
try:
client = clickhouse_connect.get_client(host=self.host, username=self.username, password=self.password)
client.command("SELECT 1")
except Exception as e:
msg = f"Failed to connect to Clickhouse: {e}"
raise ValueError(msg) from e
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
kwargs = {}
if self.index_param:
kwargs["index_param"] = self.index_param.split(",")
if self.index_query_params:
kwargs["index_query_params"] = self.index_query_params
settings = ClickhouseSettings(
table=self.table,
database=self.database,
host=self.host,
index_type=self.index_type,
metric=self.metric,
password=self.password,
port=self.port,
secure=self.secure,
username=self.username,
**kwargs,
)
if documents:
clickhouse_vs = Clickhouse.from_documents(documents=documents, embedding=self.embedding, config=settings)
else:
clickhouse_vs = Clickhouse(embedding=self.embedding, config=settings)
return clickhouse_vs
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
kwargs = {}
if self.score_threshold:
kwargs["score_threshold"] = self.score_threshold
docs = vector_store.similarity_search(query=self.search_query, k=self.number_of_results, **kwargs)
data = docs_to_data(docs)
self.status = data
return data
return []
Couchbase
This component creates a Couchbase Vector Store with search capabilities. For more information, see the Couchbase documentation.
Parameters
Name | Type | Description |
---|---|---|
couchbase_connection_string |
SecretString |
Couchbase Cluster connection string (required). |
couchbase_username |
String |
Couchbase username (required). |
couchbase_password |
SecretString |
Couchbase password (required). |
bucket_name |
String |
Name of the Couchbase bucket (required). |
scope_name |
String |
Name of the Couchbase scope (required). |
collection_name |
String |
Name of the Couchbase collection (required). |
index_name |
String |
Name of the Couchbase index (required). |
search_query |
String |
The query to search for in the vector store. |
ingest_data |
Data |
The data to ingest into the vector store (list of Data objects). |
embedding |
Embeddings |
The embedding function to use for the vector store. |
number_of_results |
Integer |
Number of results to return from the search. Default: 4 (advanced). |
Name | Type | Description |
---|---|---|
vector_store |
CouchbaseVectorStore |
A Couchbase vector store instance configured with the specified parameters. |
Component code
couchbase.py
from datetime import timedelta
from langchain_community.vectorstores import CouchbaseVectorStore
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import DataInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput
from langflow.schema import Data
class CouchbaseVectorStoreComponent(LCVectorStoreComponent):
display_name = "Couchbase"
description = "Couchbase Vector Store with search capabilities"
documentation = "https://python.langchain.com/v0.1/docs/integrations/document_loaders/couchbase/"
name = "Couchbase"
icon = "Couchbase"
inputs = [
SecretStrInput(
name="couchbase_connection_string", display_name="Couchbase Cluster connection string", required=True
),
StrInput(name="couchbase_username", display_name="Couchbase username", required=True),
SecretStrInput(name="couchbase_password", display_name="Couchbase password", required=True),
StrInput(name="bucket_name", display_name="Bucket Name", required=True),
StrInput(name="scope_name", display_name="Scope Name", required=True),
StrInput(name="collection_name", display_name="Collection Name", required=True),
StrInput(name="index_name", display_name="Index Name", required=True),
MultilineInput(name="search_query", display_name="Search Query"),
DataInput(
name="ingest_data",
display_name="Ingest Data",
is_list=True,
),
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> CouchbaseVectorStore:
try:
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions
except ImportError as e:
msg = "Failed to import Couchbase dependencies. Install it using `pip install langflow[couchbase] --pre`"
raise ImportError(msg) from e
try:
auth = PasswordAuthenticator(self.couchbase_username, self.couchbase_password)
options = ClusterOptions(auth)
cluster = Cluster(self.couchbase_connection_string, options)
cluster.wait_until_ready(timedelta(seconds=5))
except Exception as e:
msg = f"Failed to connect to Couchbase: {e}"
raise ValueError(msg) from e
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
if documents:
couchbase_vs = CouchbaseVectorStore.from_documents(
documents=documents,
cluster=cluster,
bucket_name=self.bucket_name,
scope_name=self.scope_name,
collection_name=self.collection_name,
embedding=self.embedding,
index_name=self.index_name,
)
else:
couchbase_vs = CouchbaseVectorStore(
cluster=cluster,
bucket_name=self.bucket_name,
scope_name=self.scope_name,
collection_name=self.collection_name,
embedding=self.embedding,
index_name=self.index_name,
)
return couchbase_vs
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
data = docs_to_data(docs)
self.status = data
return data
return []
FAISS
This component creates a FAISS Vector Store with search capabilities.
For more information, see the FAISS documentation.
Parameters
Name | Type | Description |
---|---|---|
index_name |
String |
The name of the FAISS index. Default: "langflow_index". |
persist_directory |
String |
Path to save the FAISS index. It will be relative to where Langflow is running. |
search_query |
String |
The query to search for in the vector store. |
ingest_data |
Data |
The data to ingest into the vector store (list of Data objects or documents). |
allow_dangerous_deserialization |
Boolean |
Set to True to allow loading pickle files from untrusted sources. Default: True (advanced). |
embedding |
Embeddings |
The embedding function to use for the vector store. |
number_of_results |
Integer |
Number of results to return from the search. Default: 4 (advanced). |
Name | Type | Description |
---|---|---|
vector_store |
FAISS |
A FAISS vector store instance configured with the specified parameters. |
Component code
faiss.py
from langchain_community.vectorstores import FAISS
from loguru import logger
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import BoolInput, DataInput, HandleInput, IntInput, MultilineInput, StrInput
from langflow.schema import Data
class FaissVectorStoreComponent(LCVectorStoreComponent):
"""FAISS Vector Store with search capabilities."""
display_name: str = "FAISS"
description: str = "FAISS Vector Store with search capabilities"
documentation = "https://python.langchain.com/docs/modules/data_connection/vectorstores/integrations/faiss"
name = "FAISS"
icon = "FAISS"
inputs = [
StrInput(
name="index_name",
display_name="Index Name",
value="langflow_index",
),
StrInput(
name="persist_directory",
display_name="Persist Directory",
info="Path to save the FAISS index. It will be relative to where Langflow is running.",
),
MultilineInput(
name="search_query",
display_name="Search Query",
),
DataInput(
name="ingest_data",
display_name="Ingest Data",
is_list=True,
),
BoolInput(
name="allow_dangerous_deserialization",
display_name="Allow Dangerous Deserialization",
info="Set to True to allow loading pickle files from untrusted sources. "
"Only enable this if you trust the source of the data.",
advanced=True,
value=True,
),
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
advanced=True,
value=4,
),
]
@check_cached_vector_store
def build_vector_store(self) -> FAISS:
"""Builds the FAISS object."""
if not self.persist_directory:
msg = "Folder path is required to save the FAISS index."
raise ValueError(msg)
path = self.resolve_path(self.persist_directory)
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
faiss = FAISS.from_documents(documents=documents, embedding=self.embedding)
faiss.save_local(str(path), self.index_name)
return faiss
def search_documents(self) -> list[Data]:
"""Search for documents in the FAISS vector store."""
if not self.persist_directory:
msg = "Folder path is required to load the FAISS index."
raise ValueError(msg)
path = self.resolve_path(self.persist_directory)
vector_store = FAISS.load_local(
folder_path=path,
embeddings=self.embedding,
index_name=self.index_name,
allow_dangerous_deserialization=self.allow_dangerous_deserialization,
)
if not vector_store:
msg = "Failed to load the FAISS index."
raise ValueError(msg)
logger.debug(f"Search input: {self.search_query}")
logger.debug(f"Number of results: {self.number_of_results}")
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
logger.debug(f"Retrieved documents: {len(docs)}")
data = docs_to_data(docs)
logger.debug(f"Converted documents to data: {len(data)}")
logger.debug(data)
return data # Return the search results data
logger.debug("No search input provided. Skipping search.")
return []
Hyper-Converged Database (HCD) Vector Store
This component implements a Vector Store using Hyper-Converged Database (HCD).
Parameters
Name | Display Name | Info |
---|---|---|
collection_name |
Collection Name |
The name of the collection within HCD where the vectors will be stored (required) |
username |
HCD Username |
Authentication username for accessing HCD (default: "hcd-superuser", required) |
password |
HCD Password |
Authentication password for accessing HCD (required) |
api_endpoint |
HCD API Endpoint |
API endpoint URL for the HCD service (required) |
search_input |
Search Input |
Query string for similarity search |
ingest_data |
Ingest Data |
Data to be ingested into the vector store |
namespace |
Namespace |
Optional namespace within HCD to use for the collection (default: "default_namespace") |
ca_certificate |
CA Certificate |
Optional CA certificate for TLS connections to HCD |
metric |
Metric |
Optional distance metric for vector comparisons (options: "cosine", "dot_product", "euclidean") |
batch_size |
Batch Size |
Optional number of data to process in a single batch |
bulk_insert_batch_concurrency |
Bulk Insert Batch Concurrency |
Optional concurrency level for bulk insert operations |
bulk_insert_overwrite_concurrency |
Bulk Insert Overwrite Concurrency |
Optional concurrency level for bulk insert operations that overwrite existing data |
bulk_delete_concurrency |
Bulk Delete Concurrency |
Optional concurrency level for bulk delete operations |
setup_mode |
Setup Mode |
Configuration mode for setting up the vector store (options: "Sync", "Async", "Off", default: "Sync") |
pre_delete_collection |
Pre Delete Collection |
Boolean flag to determine whether to delete the collection before creating a new one |
metadata_indexing_include |
Metadata Indexing Include |
Optional list of metadata fields to include in the indexing |
embedding |
Embedding or Astra Vectorize |
Allows either an embedding model or an Astra Vectorize configuration |
metadata_indexing_exclude |
Metadata Indexing Exclude |
Optional list of metadata fields to exclude from the indexing |
collection_indexing_policy |
Collection Indexing Policy |
Optional dictionary defining the indexing policy for the collection |
number_of_results |
Number of Results |
Number of results to return in similarity search (default: 4) |
search_type |
Search Type |
Search type to use (options: "Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)", default: "Similarity") |
search_score_threshold |
Search Score Threshold |
Minimum similarity score threshold for search results (default: 0) |
search_filter |
Search Metadata Filter |
Optional dictionary of filters to apply to the search query |
Name | Display Name | Info |
---|---|---|
vector_store |
Vector Store |
Built HCD vector store instance |
search_results |
Search Results |
Results of similarity search as a list of Data objects |
Component code
hcd.py
from loguru import logger
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers import docs_to_data
from langflow.inputs import DictInput, FloatInput
from langflow.io import (
BoolInput,
DataInput,
DropdownInput,
HandleInput,
IntInput,
MultilineInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data
class HCDVectorStoreComponent(LCVectorStoreComponent):
display_name: str = "Hyper-Converged Database"
description: str = "Implementation of Vector Store using Hyper-Converged Database (HCD) with search capabilities"
documentation: str = "https://python.langchain.com/docs/integrations/vectorstores/astradb"
name = "HCD"
icon: str = "HCD"
inputs = [
StrInput(
name="collection_name",
display_name="Collection Name",
info="The name of the collection within HCD where the vectors will be stored.",
required=True,
),
StrInput(
name="username",
display_name="HCD Username",
info="Authentication username for accessing HCD.",
value="hcd-superuser",
required=True,
),
SecretStrInput(
name="password",
display_name="HCD Password",
info="Authentication password for accessing HCD.",
value="HCD_PASSWORD",
required=True,
),
SecretStrInput(
name="api_endpoint",
display_name="HCD API Endpoint",
info="API endpoint URL for the HCD service.",
value="HCD_API_ENDPOINT",
required=True,
),
MultilineInput(
name="search_input",
display_name="Search Input",
),
DataInput(
name="ingest_data",
display_name="Ingest Data",
is_list=True,
),
StrInput(
name="namespace",
display_name="Namespace",
info="Optional namespace within HCD to use for the collection.",
value="default_namespace",
advanced=True,
),
MultilineInput(
name="ca_certificate",
display_name="CA Certificate",
info="Optional CA certificate for TLS connections to HCD.",
advanced=True,
),
DropdownInput(
name="metric",
display_name="Metric",
info="Optional distance metric for vector comparisons in the vector store.",
options=["cosine", "dot_product", "euclidean"],
advanced=True,
),
IntInput(
name="batch_size",
display_name="Batch Size",
info="Optional number of data to process in a single batch.",
advanced=True,
),
IntInput(
name="bulk_insert_batch_concurrency",
display_name="Bulk Insert Batch Concurrency",
info="Optional concurrency level for bulk insert operations.",
advanced=True,
),
IntInput(
name="bulk_insert_overwrite_concurrency",
display_name="Bulk Insert Overwrite Concurrency",
info="Optional concurrency level for bulk insert operations that overwrite existing data.",
advanced=True,
),
IntInput(
name="bulk_delete_concurrency",
display_name="Bulk Delete Concurrency",
info="Optional concurrency level for bulk delete operations.",
advanced=True,
),
DropdownInput(
name="setup_mode",
display_name="Setup Mode",
info="Configuration mode for setting up the vector store, with options like 'Sync', 'Async', or 'Off'.",
options=["Sync", "Async", "Off"],
advanced=True,
value="Sync",
),
BoolInput(
name="pre_delete_collection",
display_name="Pre Delete Collection",
info="Boolean flag to determine whether to delete the collection before creating a new one.",
advanced=True,
),
StrInput(
name="metadata_indexing_include",
display_name="Metadata Indexing Include",
info="Optional list of metadata fields to include in the indexing.",
advanced=True,
),
HandleInput(
name="embedding",
display_name="Embedding or Astra Vectorize",
input_types=["Embeddings", "dict"],
# TODO: This should be optional, but need to refactor langchain-astradb first.
info="Allows either an embedding model or an Astra Vectorize configuration.",
),
StrInput(
name="metadata_indexing_exclude",
display_name="Metadata Indexing Exclude",
info="Optional list of metadata fields to exclude from the indexing.",
advanced=True,
),
StrInput(
name="collection_indexing_policy",
display_name="Collection Indexing Policy",
info="Optional dictionary defining the indexing policy for the collection.",
advanced=True,
),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
advanced=True,
value=4,
),
DropdownInput(
name="search_type",
display_name="Search Type",
info="Search type to use",
options=["Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)"],
value="Similarity",
advanced=True,
),
FloatInput(
name="search_score_threshold",
display_name="Search Score Threshold",
info="Minimum similarity score threshold for search results. "
"(when using 'Similarity with score threshold')",
value=0,
advanced=True,
),
DictInput(
name="search_filter",
display_name="Search Metadata Filter",
info="Optional dictionary of filters to apply to the search query.",
advanced=True,
is_list=True,
),
]
@check_cached_vector_store
def build_vector_store(self):
try:
from langchain_astradb import AstraDBVectorStore
from langchain_astradb.utils.astradb import SetupMode
except ImportError as e:
msg = (
"Could not import langchain Astra DB integration package. "
"Please install it with `pip install langchain-astradb`."
)
raise ImportError(msg) from e
try:
from astrapy.authentication import UsernamePasswordTokenProvider
from astrapy.constants import Environment
except ImportError as e:
msg = "Could not import astrapy integration package. Please install it with `pip install astrapy`."
raise ImportError(msg) from e
try:
if not self.setup_mode:
self.setup_mode = self._inputs["setup_mode"].options[0]
setup_mode_value = SetupMode[self.setup_mode.upper()]
except KeyError as e:
msg = f"Invalid setup mode: {self.setup_mode}"
raise ValueError(msg) from e
if not isinstance(self.embedding, dict):
embedding_dict = {"embedding": self.embedding}
else:
from astrapy.info import CollectionVectorServiceOptions
dict_options = self.embedding.get("collection_vector_service_options", {})
dict_options["authentication"] = {
k: v for k, v in dict_options.get("authentication", {}).items() if k and v
}
dict_options["parameters"] = {k: v for k, v in dict_options.get("parameters", {}).items() if k and v}
embedding_dict = {
"collection_vector_service_options": CollectionVectorServiceOptions.from_dict(dict_options)
}
collection_embedding_api_key = self.embedding.get("collection_embedding_api_key")
if collection_embedding_api_key:
embedding_dict["collection_embedding_api_key"] = collection_embedding_api_key
token_provider = UsernamePasswordTokenProvider(self.username, self.password)
vector_store_kwargs = {
**embedding_dict,
"collection_name": self.collection_name,
"token": token_provider,
"api_endpoint": self.api_endpoint,
"namespace": self.namespace,
"metric": self.metric or None,
"batch_size": self.batch_size or None,
"bulk_insert_batch_concurrency": self.bulk_insert_batch_concurrency or None,
"bulk_insert_overwrite_concurrency": self.bulk_insert_overwrite_concurrency or None,
"bulk_delete_concurrency": self.bulk_delete_concurrency or None,
"setup_mode": setup_mode_value,
"pre_delete_collection": self.pre_delete_collection or False,
"environment": Environment.HCD,
}
if self.metadata_indexing_include:
vector_store_kwargs["metadata_indexing_include"] = self.metadata_indexing_include
elif self.metadata_indexing_exclude:
vector_store_kwargs["metadata_indexing_exclude"] = self.metadata_indexing_exclude
elif self.collection_indexing_policy:
vector_store_kwargs["collection_indexing_policy"] = self.collection_indexing_policy
try:
vector_store = AstraDBVectorStore(**vector_store_kwargs)
except Exception as e:
msg = f"Error initializing AstraDBVectorStore: {e}"
raise ValueError(msg) from e
self._add_documents_to_vector_store(vector_store)
return vector_store
def _add_documents_to_vector_store(self, vector_store) -> None:
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
msg = "Vector Store Inputs must be Data objects."
raise TypeError(msg)
if documents:
logger.debug(f"Adding {len(documents)} documents to the Vector Store.")
try:
vector_store.add_documents(documents)
except Exception as e:
msg = f"Error adding documents to AstraDBVectorStore: {e}"
raise ValueError(msg) from e
else:
logger.debug("No documents to add to the Vector Store.")
def _map_search_type(self) -> str:
if self.search_type == "Similarity with score threshold":
return "similarity_score_threshold"
if self.search_type == "MMR (Max Marginal Relevance)":
return "mmr"
return "similarity"
def _build_search_args(self):
args = {
"k": self.number_of_results,
"score_threshold": self.search_score_threshold,
}
if self.search_filter:
clean_filter = {k: v for k, v in self.search_filter.items() if k and v}
if len(clean_filter) > 0:
args["filter"] = clean_filter
return args
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
logger.debug(f"Search input: {self.search_input}")
logger.debug(f"Search type: {self.search_type}")
logger.debug(f"Number of results: {self.number_of_results}")
if self.search_input and isinstance(self.search_input, str) and self.search_input.strip():
try:
search_type = self._map_search_type()
search_args = self._build_search_args()
docs = vector_store.search(query=self.search_input, search_type=search_type, **search_args)
except Exception as e:
msg = f"Error performing search in AstraDBVectorStore: {e}"
raise ValueError(msg) from e
logger.debug(f"Retrieved documents: {len(docs)}")
data = docs_to_data(docs)
logger.debug(f"Converted documents to data: {len(data)}")
self.status = data
return data
logger.debug("No search input provided. Skipping search.")
return []
def get_retriever_kwargs(self):
search_args = self._build_search_args()
return {
"search_type": self._map_search_type(),
"search_kwargs": search_args,
}
Milvus
This component creates a Milvus Vector Store with search capabilities.
For more information, see the Milvus documentation.
Parameters
Name | Type | Description |
---|---|---|
collection_name |
String |
Name of the Milvus collection |
collection_description |
String |
Description of the Milvus collection |
uri |
String |
Connection URI for Milvus |
password |
SecretString |
Connection password (if required) |
connection_args |
Dict |
Additional connection arguments |
primary_field |
String |
Name of the primary field |
text_field |
String |
Name of the text field |
vector_field |
String |
Name of the vector field |
consistency_level |
String |
Consistency level for operations |
index_params |
Dict |
Parameters for indexing |
search_params |
Dict |
Parameters for searching |
drop_old |
Boolean |
Whether to drop old collection |
timeout |
Float |
Timeout for operations |
search_query |
String |
Query for similarity search |
ingest_data |
Data |
Data to be ingested into the vector store |
embedding |
Embeddings |
Embedding function to use |
number_of_results |
Integer |
Number of results to return in search |
Name | Type | Description |
---|---|---|
vector_store |
Milvus |
Milvus vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
milvus.py
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import (
BoolInput,
DataInput,
DictInput,
DropdownInput,
FloatInput,
HandleInput,
IntInput,
MultilineInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data
class MilvusVectorStoreComponent(LCVectorStoreComponent):
"""Milvus vector store with search capabilities."""
display_name: str = "Milvus"
description: str = "Milvus vector store with search capabilities"
documentation = "https://python.langchain.com/docs/integrations/vectorstores/milvus"
name = "Milvus"
icon = "Milvus"
inputs = [
StrInput(name="collection_name", display_name="Collection Name", value="langflow"),
StrInput(name="collection_description", display_name="Collection Description", value=""),
StrInput(
name="uri",
display_name="Connection URI",
value="http://localhost:19530",
),
SecretStrInput(
name="password",
display_name="Token",
value="",
info="Ignore this field if no token is required to make connection.",
),
DictInput(name="connection_args", display_name="Other Connection Arguments", advanced=True),
StrInput(name="primary_field", display_name="Primary Field Name", value="pk"),
StrInput(name="text_field", display_name="Text Field Name", value="text"),
StrInput(name="vector_field", display_name="Vector Field Name", value="vector"),
DropdownInput(
name="consistency_level",
display_name="Consistencey Level",
options=["Bounded", "Session", "Strong", "Eventual"],
value="Session",
advanced=True,
),
DictInput(name="index_params", display_name="Index Parameters", advanced=True),
DictInput(name="search_params", display_name="Search Parameters", advanced=True),
BoolInput(name="drop_old", display_name="Drop Old Collection", value=False, advanced=True),
FloatInput(name="timeout", display_name="Timeout", advanced=True),
MultilineInput(name="search_query", display_name="Search Query"),
DataInput(
name="ingest_data",
display_name="Ingest Data",
is_list=True,
),
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self):
try:
from langchain_milvus.vectorstores import Milvus as LangchainMilvus
except ImportError as e:
msg = "Could not import Milvus integration package. Please install it with `pip install langchain-milvus`."
raise ImportError(msg) from e
self.connection_args.update(uri=self.uri, token=self.password)
milvus_store = LangchainMilvus(
embedding_function=self.embedding,
collection_name=self.collection_name,
collection_description=self.collection_description,
connection_args=self.connection_args,
consistency_level=self.consistency_level,
index_params=self.index_params,
search_params=self.search_params,
drop_old=self.drop_old,
auto_id=True,
primary_field=self.primary_field,
text_field=self.text_field,
vector_field=self.vector_field,
timeout=self.timeout,
)
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
if documents:
milvus_store.add_documents(documents)
return milvus_store
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
data = docs_to_data(docs)
self.status = data
return data
return []
MongoDB Atlas
This component creates a MongoDB Atlas Vector Store with search capabilities. For more information, see the MongoDB Atlas documentation.
Parameters
Name | Type | Description |
---|---|---|
mongodb_atlas_cluster_uri |
SecretString |
MongoDB Atlas Cluster URI |
db_name |
String |
Database name |
collection_name |
String |
Collection name |
index_name |
String |
Index name |
search_query |
String |
Query for similarity search |
ingest_data |
Data |
Data to be ingested into the vector store |
embedding |
Embeddings |
Embedding function to use |
number_of_results |
Integer |
Number of results to return in search |
Name | Type | Description |
---|---|---|
vector_store |
MongoDBAtlasVectorSearch |
MongoDB Atlas vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
mongodb_atlas.py
import tempfile
import certifi
from langchain_community.vectorstores import MongoDBAtlasVectorSearch
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import BoolInput, DataInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput
from langflow.schema import Data
class MongoVectorStoreComponent(LCVectorStoreComponent):
display_name = "MongoDB Atlas"
description = "MongoDB Atlas Vector Store with search capabilities"
documentation = "https://python.langchain.com/docs/modules/data_connection/vectorstores/integrations/mongodb_atlas"
name = "MongoDBAtlasVector"
icon = "MongoDB"
inputs = [
SecretStrInput(name="mongodb_atlas_cluster_uri", display_name="MongoDB Atlas Cluster URI", required=True),
BoolInput(name="enable_mtls", display_name="Enable mTLS", value=False, advanced=True, required=True),
SecretStrInput(
name="mongodb_atlas_client_cert",
display_name="MongoDB Atlas Combined Client Certificate",
required=False,
info="Client Certificate combined with the private key in the following format:\n "
"-----BEGIN PRIVATE KEY-----\n...\n -----END PRIVATE KEY-----\n-----BEGIN CERTIFICATE-----\n"
"...\n-----END CERTIFICATE-----\n",
),
StrInput(name="db_name", display_name="Database Name", required=True),
StrInput(name="collection_name", display_name="Collection Name", required=True),
StrInput(name="index_name", display_name="Index Name", required=True),
MultilineInput(name="search_query", display_name="Search Query"),
DataInput(
name="ingest_data",
display_name="Ingest Data",
is_list=True,
),
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> MongoDBAtlasVectorSearch:
try:
from pymongo import MongoClient
except ImportError as e:
msg = "Please install pymongo to use MongoDB Atlas Vector Store"
raise ImportError(msg) from e
# Create temporary files for the client certificate
if self.enable_mtls:
client_cert_path = None
try:
client_cert = self.mongodb_atlas_client_cert.replace(" ", "\n")
client_cert = client_cert.replace("-----BEGIN\nPRIVATE\nKEY-----", "-----BEGIN PRIVATE KEY-----")
client_cert = client_cert.replace(
"-----END\nPRIVATE\nKEY-----\n-----BEGIN\nCERTIFICATE-----",
"-----END PRIVATE KEY-----\n-----BEGIN CERTIFICATE-----",
)
client_cert = client_cert.replace("-----END\nCERTIFICATE-----", "-----END CERTIFICATE-----")
with tempfile.NamedTemporaryFile(delete=False) as client_cert_file:
client_cert_file.write(client_cert.encode("utf-8"))
client_cert_path = client_cert_file.name
except Exception as e:
msg = f"Failed to write certificate to temporary file: {e}"
raise ValueError(msg) from e
try:
mongo_client: MongoClient = (
MongoClient(
self.mongodb_atlas_cluster_uri,
tls=True,
tlsCertificateKeyFile=client_cert_path,
tlsCAFile=certifi.where(),
)
if self.enable_mtls
else MongoClient(self.mongodb_atlas_cluster_uri)
)
collection = mongo_client[self.db_name][self.collection_name]
collection.drop() # Drop collection to override the vector store
except Exception as e:
msg = f"Failed to connect to MongoDB Atlas: {e}"
raise ValueError(msg) from e
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
if documents:
return MongoDBAtlasVectorSearch.from_documents(
documents=documents, embedding=self.embedding, collection=collection, index_name=self.index_name
)
return MongoDBAtlasVectorSearch(
embedding=self.embedding,
collection=collection,
index_name=self.index_name,
)
def search_documents(self) -> list[Data]:
from bson.objectid import ObjectId
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str):
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
for doc in docs:
doc.metadata = {
key: str(value) if isinstance(value, ObjectId) else value for key, value in doc.metadata.items()
}
data = docs_to_data(docs)
self.status = data
return data
return []
PGVector
This component creates a PGVector Vector Store with search capabilities. For more information, see the PGVector documentation.
Parameters
Name | Type | Description |
---|---|---|
pg_server_url |
SecretString |
PostgreSQL server connection string |
collection_name |
String |
Table name for the vector store |
search_query |
String |
Query for similarity search |
ingest_data |
Data |
Data to be ingested into the vector store |
embedding |
Embeddings |
Embedding function to use |
number_of_results |
Integer |
Number of results to return in search |
Name | Type | Description |
---|---|---|
vector_store |
PGVector |
PGVector vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
pgvector.py
from langchain_community.vectorstores import PGVector
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import DataInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput
from langflow.schema import Data
from langflow.utils.connection_string_parser import transform_connection_string
class PGVectorStoreComponent(LCVectorStoreComponent):
display_name = "PGVector"
description = "PGVector Vector Store with search capabilities"
documentation = "https://python.langchain.com/v0.2/docs/integrations/vectorstores/pgvector/"
name = "pgvector"
icon = "cpu"
inputs = [
SecretStrInput(name="pg_server_url", display_name="PostgreSQL Server Connection String", required=True),
StrInput(name="collection_name", display_name="Table", required=True),
MultilineInput(name="search_query", display_name="Search Query"),
DataInput(
name="ingest_data",
display_name="Ingestion Data",
is_list=True,
),
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
]
@check_cached_vector_store
def build_vector_store(self) -> PGVector:
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
connection_string_parsed = transform_connection_string(self.pg_server_url)
if documents:
pgvector = PGVector.from_documents(
embedding=self.embedding,
documents=documents,
collection_name=self.collection_name,
connection_string=connection_string_parsed,
)
else:
pgvector = PGVector.from_existing_index(
embedding=self.embedding,
collection_name=self.collection_name,
connection_string=connection_string_parsed,
)
return pgvector
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
data = docs_to_data(docs)
self.status = data
return data
return []
Pinecone
This component creates a Pinecone Vector Store with search capabilities.
For more information, see the Pinecone documentation.
Parameters
Name | Type | Description |
---|---|---|
index_name |
String |
Name of the Pinecone index |
namespace |
String |
Namespace for the index |
distance_strategy |
String |
Strategy for calculating distance between vectors |
pinecone_api_key |
SecretString |
API key for Pinecone |
text_key |
String |
Key in the record to use as text |
search_query |
String |
Query for similarity search |
ingest_data |
Data |
Data to be ingested into the vector store |
embedding |
Embeddings |
Embedding function to use |
number_of_results |
Integer |
Number of results to return in search |
Name | Type | Description |
---|---|---|
vector_store |
Pinecone |
Pinecone vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
pinecone.py
import numpy as np
from langchain_pinecone import Pinecone
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import DataInput, DropdownInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput
from langflow.schema import Data
class PineconeVectorStoreComponent(LCVectorStoreComponent):
display_name = "Pinecone"
description = "Pinecone Vector Store with search capabilities"
documentation = "https://python.langchain.com/v0.2/docs/integrations/vectorstores/pinecone/"
name = "Pinecone"
icon = "Pinecone"
inputs = [
StrInput(name="index_name", display_name="Index Name", required=True),
StrInput(name="namespace", display_name="Namespace", info="Namespace for the index."),
DropdownInput(
name="distance_strategy",
display_name="Distance Strategy",
options=["Cosine", "Euclidean", "Dot Product"],
value="Cosine",
advanced=True,
),
SecretStrInput(name="pinecone_api_key", display_name="Pinecone API Key", required=True),
StrInput(
name="text_key",
display_name="Text Key",
info="Key in the record to use as text.",
value="text",
advanced=True,
),
MultilineInput(name="search_query", display_name="Search Query"),
DataInput(
name="ingest_data",
display_name="Ingest Data",
is_list=True,
),
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> Pinecone:
"""Build and return a Pinecone vector store instance."""
try:
from langchain_pinecone._utilities import DistanceStrategy
# Wrap the embedding model to ensure float32 output
wrapped_embeddings = Float32Embeddings(self.embedding)
# Convert distance strategy
distance_strategy = self.distance_strategy.replace(" ", "_").upper()
distance_strategy = DistanceStrategy[distance_strategy]
# Initialize Pinecone instance with wrapped embeddings
pinecone = Pinecone(
index_name=self.index_name,
embedding=wrapped_embeddings, # Use wrapped embeddings
text_key=self.text_key,
namespace=self.namespace,
distance_strategy=distance_strategy,
pinecone_api_key=self.pinecone_api_key,
)
except Exception as e:
error_msg = "Error building Pinecone vector store"
raise ValueError(error_msg) from e
else:
# Process documents if any
documents = []
if self.ingest_data:
for doc in self.ingest_data:
if isinstance(doc, Data):
documents.append(doc.to_lc_document())
else:
documents.append(doc)
if documents:
pinecone.add_documents(documents)
return pinecone
def search_documents(self) -> list[Data]:
"""Search documents in the vector store."""
try:
if not self.search_query or not isinstance(self.search_query, str) or not self.search_query.strip():
return []
vector_store = self.build_vector_store()
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
except Exception as e:
error_msg = "Error searching documents"
raise ValueError(error_msg) from e
else:
data = docs_to_data(docs)
self.status = data
return data
class Float32Embeddings:
"""Wrapper class to ensure float32 embeddings."""
def __init__(self, base_embeddings):
self.base_embeddings = base_embeddings
def embed_documents(self, texts):
embeddings = self.base_embeddings.embed_documents(texts)
if isinstance(embeddings, np.ndarray):
return [[self._force_float32(x) for x in vec] for vec in embeddings]
return [[self._force_float32(x) for x in vec] for vec in embeddings]
def embed_query(self, text):
embedding = self.base_embeddings.embed_query(text)
if isinstance(embedding, np.ndarray):
return [self._force_float32(x) for x in embedding]
return [self._force_float32(x) for x in embedding]
def _force_float32(self, value):
"""Convert any numeric type to Python float."""
return float(np.float32(value))
Qdrant
This component creates a Qdrant Vector Store with search capabilities. For more information, see the Qdrant documentation.
Parameters
Name | Type | Description |
---|---|---|
collection_name |
String |
Name of the Qdrant collection |
host |
String |
Qdrant server host |
port |
Integer |
Qdrant server port |
grpc_port |
Integer |
Qdrant gRPC port |
api_key |
SecretString |
API key for Qdrant |
prefix |
String |
Prefix for Qdrant |
timeout |
Integer |
Timeout for Qdrant operations |
path |
String |
Path for Qdrant |
url |
String |
URL for Qdrant |
distance_func |
String |
Distance function for vector similarity |
content_payload_key |
String |
Key for content payload |
metadata_payload_key |
String |
Key for metadata payload |
search_query |
String |
Query for similarity search |
ingest_data |
Data |
Data to be ingested into the vector store |
embedding |
Embeddings |
Embedding function to use |
number_of_results |
Integer |
Number of results to return in search |
Name | Type | Description |
---|---|---|
vector_store |
Qdrant |
Qdrant vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
qdrant.py
from langchain.embeddings.base import Embeddings
from langchain_community.vectorstores import Qdrant
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import (
DataInput,
DropdownInput,
HandleInput,
IntInput,
MultilineInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data
class QdrantVectorStoreComponent(LCVectorStoreComponent):
display_name = "Qdrant"
description = "Qdrant Vector Store with search capabilities"
documentation = "https://python.langchain.com/docs/modules/data_connection/vectorstores/integrations/qdrant"
icon = "Qdrant"
inputs = [
StrInput(name="collection_name", display_name="Collection Name", required=True),
StrInput(name="host", display_name="Host", value="localhost", advanced=True),
IntInput(name="port", display_name="Port", value=6333, advanced=True),
IntInput(name="grpc_port", display_name="gRPC Port", value=6334, advanced=True),
SecretStrInput(name="api_key", display_name="API Key", advanced=True),
StrInput(name="prefix", display_name="Prefix", advanced=True),
IntInput(name="timeout", display_name="Timeout", advanced=True),
StrInput(name="path", display_name="Path", advanced=True),
StrInput(name="url", display_name="URL", advanced=True),
DropdownInput(
name="distance_func",
display_name="Distance Function",
options=["Cosine", "Euclidean", "Dot Product"],
value="Cosine",
advanced=True,
),
StrInput(name="content_payload_key", display_name="Content Payload Key", value="page_content", advanced=True),
StrInput(name="metadata_payload_key", display_name="Metadata Payload Key", value="metadata", advanced=True),
MultilineInput(name="search_query", display_name="Search Query"),
DataInput(
name="ingest_data",
display_name="Ingest Data",
is_list=True,
),
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> Qdrant:
qdrant_kwargs = {
"collection_name": self.collection_name,
"content_payload_key": self.content_payload_key,
"metadata_payload_key": self.metadata_payload_key,
}
server_kwargs = {
"host": self.host or None,
"port": int(self.port), # Ensure port is an integer
"grpc_port": int(self.grpc_port), # Ensure grpc_port is an integer
"api_key": self.api_key,
"prefix": self.prefix,
# Ensure timeout is an integer
"timeout": int(self.timeout) if self.timeout else None,
"path": self.path or None,
"url": self.url or None,
}
server_kwargs = {k: v for k, v in server_kwargs.items() if v is not None}
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
if not isinstance(self.embedding, Embeddings):
msg = "Invalid embedding object"
raise TypeError(msg)
if documents:
qdrant = Qdrant.from_documents(documents, embedding=self.embedding, **qdrant_kwargs, **server_kwargs)
else:
from qdrant_client import QdrantClient
client = QdrantClient(**server_kwargs)
qdrant = Qdrant(embeddings=self.embedding, client=client, **qdrant_kwargs)
return qdrant
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
data = docs_to_data(docs)
self.status = data
return data
return []
Redis
This component creates a Redis Vector Store with search capabilities. For more information, see the Redis documentation.
Parameters
Name | Type | Description |
---|---|---|
redis_server_url |
SecretString |
Redis server connection string |
redis_index_name |
String |
Name of the Redis index |
code |
String |
Custom code for Redis (advanced) |
schema |
String |
Schema for Redis index |
search_query |
String |
Query for similarity search |
ingest_data |
Data |
Data to be ingested into the vector store |
number_of_results |
Integer |
Number of results to return in search |
embedding |
Embeddings |
Embedding function to use |
Name | Type | Description |
---|---|---|
vector_store |
Redis |
Redis vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
redis.py
from pathlib import Path
from langchain.text_splitter import CharacterTextSplitter
from langchain_community.vectorstores.redis import Redis
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import DataInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput
from langflow.schema import Data
class RedisVectorStoreComponent(LCVectorStoreComponent):
"""A custom component for implementing a Vector Store using Redis."""
display_name: str = "Redis"
description: str = "Implementation of Vector Store using Redis"
documentation = "https://python.langchain.com/docs/integrations/vectorstores/redis"
name = "Redis"
icon = "Redis"
inputs = [
SecretStrInput(name="redis_server_url", display_name="Redis Server Connection String", required=True),
StrInput(
name="redis_index_name",
display_name="Redis Index",
),
StrInput(name="code", display_name="Code", advanced=True),
StrInput(
name="schema",
display_name="Schema",
),
MultilineInput(name="search_query", display_name="Search Query"),
DataInput(
name="ingest_data",
display_name="Ingest Data",
is_list=True,
),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
]
@check_cached_vector_store
def build_vector_store(self) -> Redis:
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
Path("docuemnts.txt").write_text(str(documents), encoding="utf-8")
if not documents:
if self.schema is None:
msg = "If no documents are provided, a schema must be provided."
raise ValueError(msg)
redis_vs = Redis.from_existing_index(
embedding=self.embedding,
index_name=self.redis_index_name,
schema=self.schema,
key_prefix=None,
redis_url=self.redis_server_url,
)
else:
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
redis_vs = Redis.from_documents(
documents=docs,
embedding=self.embedding,
redis_url=self.redis_server_url,
index_name=self.redis_index_name,
)
return redis_vs
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
data = docs_to_data(docs)
self.status = data
return data
return []
Supabase
This component creates a connection to a Supabase Vector Store with search capabilities.
For more information, see the Supabase documentation.
Parameters
Name | Type | Description |
---|---|---|
supabase_url |
String |
URL of the Supabase instance |
supabase_service_key |
SecretString |
Service key for Supabase authentication |
table_name |
String |
Name of the table in Supabase |
query_name |
String |
Name of the query to use |
search_query |
String |
Query for similarity search |
ingest_data |
Data |
Data to be ingested into the vector store |
embedding |
Embeddings |
Embedding function to use |
number_of_results |
Integer |
Number of results to return in search |
Name | Type | Description |
---|---|---|
vector_store |
SupabaseVectorStore |
Supabase vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
supabase.py
from langchain_community.vectorstores import SupabaseVectorStore
from supabase.client import Client, create_client
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import DataInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput
from langflow.schema import Data
class SupabaseVectorStoreComponent(LCVectorStoreComponent):
display_name = "Supabase"
description = "Supabase Vector Store with search capabilities"
documentation = "https://python.langchain.com/v0.2/docs/integrations/vectorstores/supabase/"
name = "SupabaseVectorStore"
icon = "Supabase"
inputs = [
StrInput(name="supabase_url", display_name="Supabase URL", required=True),
SecretStrInput(name="supabase_service_key", display_name="Supabase Service Key", required=True),
StrInput(name="table_name", display_name="Table Name", advanced=True),
StrInput(name="query_name", display_name="Query Name"),
MultilineInput(name="search_query", display_name="Search Query"),
DataInput(
name="ingest_data",
display_name="Ingest Data",
is_list=True,
),
HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> SupabaseVectorStore:
supabase: Client = create_client(self.supabase_url, supabase_key=self.supabase_service_key)
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
if documents:
supabase_vs = SupabaseVectorStore.from_documents(
documents=documents,
embedding=self.embedding,
query_name=self.query_name,
client=supabase,
table_name=self.table_name,
)
else:
supabase_vs = SupabaseVectorStore(
client=supabase,
embedding=self.embedding,
table_name=self.table_name,
query_name=self.query_name,
)
return supabase_vs
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
data = docs_to_data(docs)
self.status = data
return data
return []
Upstash
This component creates an Upstash Vector Store with search capabilities. For more information, see the Upstash documentation.
Parameters
Name | Type | Description |
---|---|---|
index_url |
String |
The URL of the Upstash index |
index_token |
SecretString |
The token for the Upstash index |
text_key |
String |
The key in the record to use as text |
namespace |
String |
Namespace for the index |
search_query |
String |
Query for similarity search |
metadata_filter |
String |
Filters documents by metadata |
ingest_data |
Data |
Data to be ingested into the vector store |
embedding |
Embeddings |
Embedding function to use (optional) |
number_of_results |
Integer |
Number of results to return in search |
Name | Type | Description |
---|---|---|
vector_store |
UpstashVectorStore |
Upstash vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
upstash.py
from langchain_community.vectorstores import UpstashVectorStore
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import (
DataInput,
HandleInput,
IntInput,
MultilineInput,
SecretStrInput,
StrInput,
)
from langflow.schema import Data
class UpstashVectorStoreComponent(LCVectorStoreComponent):
display_name = "Upstash"
description = "Upstash Vector Store with search capabilities"
documentation = "https://python.langchain.com/v0.2/docs/integrations/vectorstores/upstash/"
name = "Upstash"
icon = "Upstash"
inputs = [
StrInput(
name="index_url",
display_name="Index URL",
info="The URL of the Upstash index.",
required=True,
),
SecretStrInput(
name="index_token",
display_name="Index Token",
info="The token for the Upstash index.",
required=True,
),
StrInput(
name="text_key",
display_name="Text Key",
info="The key in the record to use as text.",
value="text",
advanced=True,
),
StrInput(
name="namespace",
display_name="Namespace",
info="Leave empty for default namespace.",
),
MultilineInput(name="search_query", display_name="Search Query"),
MultilineInput(
name="metadata_filter",
display_name="Metadata Filter",
info="Filters documents by metadata. Look at the documentation for more information.",
),
DataInput(
name="ingest_data",
display_name="Ingest Data",
is_list=True,
),
HandleInput(
name="embedding",
display_name="Embedding",
input_types=["Embeddings"],
info="To use Upstash's embeddings, don't provide an embedding.",
),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> UpstashVectorStore:
use_upstash_embedding = self.embedding is None
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
if documents:
if use_upstash_embedding:
upstash_vs = UpstashVectorStore(
embedding=use_upstash_embedding,
text_key=self.text_key,
index_url=self.index_url,
index_token=self.index_token,
namespace=self.namespace,
)
upstash_vs.add_documents(documents)
else:
upstash_vs = UpstashVectorStore.from_documents(
documents=documents,
embedding=self.embedding,
text_key=self.text_key,
index_url=self.index_url,
index_token=self.index_token,
namespace=self.namespace,
)
else:
upstash_vs = UpstashVectorStore(
embedding=self.embedding or use_upstash_embedding,
text_key=self.text_key,
index_url=self.index_url,
index_token=self.index_token,
namespace=self.namespace,
)
return upstash_vs
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
filter=self.metadata_filter,
)
data = docs_to_data(docs)
self.status = data
return data
return []
Vectara
This component creates a Vectara Vector Store with search capabilities. For more information, see the Vectara documentation.
Parameters
Name | Type | Description |
---|---|---|
vectara_customer_id |
String |
Vectara customer ID |
vectara_corpus_id |
String |
Vectara corpus ID |
vectara_api_key |
SecretString |
Vectara API key |
embedding |
Embeddings |
Embedding function to use (optional) |
ingest_data |
List[Document/Data] |
Data to be ingested into the vector store |
search_query |
String |
Query for similarity search |
number_of_results |
Integer |
Number of results to return in search |
Name | Type | Description |
---|---|---|
vector_store |
Vectara |
Vectara vector store instance |
search_results |
List[Data] |
Results of similarity search |
Component code
vectara.py
from typing import TYPE_CHECKING
from langchain_community.vectorstores import Vectara
from loguru import logger
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import HandleInput, IntInput, MessageTextInput, SecretStrInput, StrInput
from langflow.schema import Data
if TYPE_CHECKING:
from langchain_community.vectorstores import Vectara
class VectaraVectorStoreComponent(LCVectorStoreComponent):
"""Vectara Vector Store with search capabilities."""
display_name: str = "Vectara"
description: str = "Vectara Vector Store with search capabilities"
documentation = "https://python.langchain.com/docs/modules/data_connection/vectorstores/integrations/vectara"
name = "Vectara"
icon = "Vectara"
inputs = [
StrInput(name="vectara_customer_id", display_name="Vectara Customer ID", required=True),
StrInput(name="vectara_corpus_id", display_name="Vectara Corpus ID", required=True),
SecretStrInput(name="vectara_api_key", display_name="Vectara API Key", required=True),
HandleInput(
name="embedding",
display_name="Embedding",
input_types=["Embeddings"],
),
HandleInput(
name="ingest_data",
display_name="Ingest Data",
input_types=["Document", "Data"],
is_list=True,
),
MessageTextInput(
name="search_query",
display_name="Search Query",
),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=4,
advanced=True,
),
]
@check_cached_vector_store
def build_vector_store(self) -> "Vectara":
"""Builds the Vectara object."""
try:
from langchain_community.vectorstores import Vectara
except ImportError as e:
msg = "Could not import Vectara. Please install it with `pip install langchain-community`."
raise ImportError(msg) from e
vectara = Vectara(
vectara_customer_id=self.vectara_customer_id,
vectara_corpus_id=self.vectara_corpus_id,
vectara_api_key=self.vectara_api_key,
)
self._add_documents_to_vector_store(vectara)
return vectara
def _add_documents_to_vector_store(self, vector_store: "Vectara") -> None:
"""Adds documents to the Vector Store."""
if not self.ingest_data:
self.status = "No documents to add to Vectara"
return
documents = []
for _input in self.ingest_data or []:
if isinstance(_input, Data):
documents.append(_input.to_lc_document())
else:
documents.append(_input)
if documents:
logger.debug(f"Adding {len(documents)} documents to Vectara.")
vector_store.add_documents(documents)
self.status = f"Added {len(documents)} documents to Vectara"
else:
logger.debug("No documents to add to Vectara.")
self.status = "No valid documents to add to Vectara"
def search_documents(self) -> list[Data]:
vector_store = self.build_vector_store()
if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
docs = vector_store.similarity_search(
query=self.search_query,
k=self.number_of_results,
)
data = docs_to_data(docs)
self.status = f"Found {len(data)} results for the query: {self.search_query}"
return data
self.status = "No search query provided"
return []
Vectara RAG
This component creates a Vectara RAG pipeline. For more information, see the Vectara documentation.
Parameters
Name | Type | Description |
---|---|---|
vectara_customer_id |
String |
Vectara customer ID |
vectara_corpus_id |
String |
Vectara corpus ID |
vectara_api_key |
SecretString |
Vectara API key |
search_query |
String |
The query to receive an answer on |
lexical_interpolation |
Float |
Hybrid search factor |
filter |
String |
Metadata filters for narrowing search |
reranker |
String |
Type of reranker to use |
reranker_k |
Integer |
Number of results to rerank |
diversity_bias |
Float |
Diversity bias for MMR reranker |
max_results |
Integer |
Maximum results to summarize |
response_lang |
String |
Language code for response |
prompt |
String |
Name of the summarizer prompt |
Name | Type | Description |
---|---|---|
answer |
Message |
Generated response from Vectara RAG |
Component code
vectara_rag.py
from langflow.custom import Component
from langflow.field_typing.range_spec import RangeSpec
from langflow.io import DropdownInput, FloatInput, IntInput, MessageTextInput, Output, SecretStrInput, StrInput
from langflow.schema.message import Message
class VectaraRagComponent(Component):
display_name = "Vectara RAG"
description = "Vectara's full end to end RAG"
documentation = "https://docs.vectara.com/docs"
icon = "Vectara"
name = "VectaraRAG"
SUMMARIZER_PROMPTS = [
"vectara-summary-ext-24-05-sml",
"vectara-summary-ext-24-05-med-omni",
"vectara-summary-ext-24-05-large",
"vectara-summary-ext-24-05-med",
"vectara-summary-ext-v1.3.0",
]
RERANKER_TYPES = ["mmr", "rerank_multilingual_v1", "none"]
RESPONSE_LANGUAGES = [
"auto",
"eng",
"spa",
"fra",
"zho",
"deu",
"hin",
"ara",
"por",
"ita",
"jpn",
"kor",
"rus",
"tur",
"fas",
"vie",
"tha",
"heb",
"nld",
"ind",
"pol",
"ukr",
"ron",
"swe",
"ces",
"ell",
"ben",
"msa",
"urd",
]
field_order = ["vectara_customer_id", "vectara_corpus_id", "vectara_api_key", "search_query", "reranker"]
inputs = [
StrInput(name="vectara_customer_id", display_name="Vectara Customer ID", required=True),
StrInput(name="vectara_corpus_id", display_name="Vectara Corpus ID", required=True),
SecretStrInput(name="vectara_api_key", display_name="Vectara API Key", required=True),
MessageTextInput(name="search_query", display_name="Search Query", info="The query to receive an answer on."),
FloatInput(
name="lexical_interpolation",
display_name="Hybrid Search Factor",
range_spec=RangeSpec(min=0.005, max=0.1, step=0.005),
value=0.005,
advanced=True,
info="How much to weigh lexical scores compared to the embedding score. "
"0 means lexical search is not used at all, and 1 means only lexical search is used.",
),
MessageTextInput(
name="filter",
display_name="Metadata Filters",
value="",
advanced=True,
info="The filter string to narrow the search to according to metadata attributes.",
),
DropdownInput(
name="reranker",
display_name="Reranker Type",
options=RERANKER_TYPES,
value=RERANKER_TYPES[0],
info="How to rerank the retrieved search results.",
),
IntInput(
name="reranker_k",
display_name="Number of Results to Rerank",
value=50,
range_spec=RangeSpec(min=1, max=100, step=1),
advanced=True,
),
FloatInput(
name="diversity_bias",
display_name="Diversity Bias",
value=0.2,
range_spec=RangeSpec(min=0, max=1, step=0.01),
advanced=True,
info="Ranges from 0 to 1, with higher values indicating greater diversity (only applies to MMR reranker).",
),
IntInput(
name="max_results",
display_name="Max Results to Summarize",
value=7,
range_spec=RangeSpec(min=1, max=100, step=1),
advanced=True,
info="The maximum number of search results to be available to the prompt.",
),
DropdownInput(
name="response_lang",
display_name="Response Language",
options=RESPONSE_LANGUAGES,
value="eng",
advanced=True,
info="Use the ISO 639-1 or 639-3 language code or auto to automatically detect the language.",
),
DropdownInput(
name="prompt",
display_name="Prompt Name",
options=SUMMARIZER_PROMPTS,
value=SUMMARIZER_PROMPTS[0],
advanced=True,
info="Only vectara-summary-ext-24-05-sml is for Growth customers; "
"all other prompts are for Scale customers only.",
),
]
outputs = [
Output(name="answer", display_name="Answer", method="generate_response"),
]
def generate_response(
self,
) -> Message:
text_output = ""
try:
from langchain_community.vectorstores import Vectara
from langchain_community.vectorstores.vectara import RerankConfig, SummaryConfig, VectaraQueryConfig
except ImportError as e:
msg = "Could not import Vectara. Please install it with `pip install langchain-community`."
raise ImportError(msg) from e
vectara = Vectara(self.vectara_customer_id, self.vectara_corpus_id, self.vectara_api_key)
rerank_config = RerankConfig(self.reranker, self.reranker_k, self.diversity_bias)
summary_config = SummaryConfig(
is_enabled=True, max_results=self.max_results, response_lang=self.response_lang, prompt_name=self.prompt
)
config = VectaraQueryConfig(
lambda_val=self.lexical_interpolation,
filter=self.filter,
summary_config=summary_config,
rerank_config=rerank_config,
)
rag = vectara.as_rag(config)
response = rag.invoke(self.search_query, config={"callbacks": self.get_langchain_callbacks()})
text_output = response["answer"]
return Message(text=text_output)