Vector Stores

Vector databases store vector data, which backs AI workloads like chatbots and Retrieval Augmented Generation.

Vector database components establish connections to existing vector databases or create in-memory vector stores for storing and retrieving vector data.

Vector database components are distinct from memory components, which are built specifically for storing and retrieving chat messages from external databases.

Use a vector store component in a flow

Vector databases can be populated from within Langflow with document ingestion pipelines, like the following.

vector store document ingestion

This example uses the Astra DB vector store component. Your vector store component’s parameters and authentication may be different, but the document ingestion workflow is the same. A document is loaded from a local machine and chunked. The Astra DB vector store generates embeddings with the connected model component, and stores them in the connected Astra DB database.

This vector data can then be retrieved for workloads like Retrieval Augmented Generation.

vector store retrieval

The user’s chat input is embedded and compared to the vectors embedded during document ingestion for a similarity search. The results are output from the vector database component as a Data object, and parsed into text. This text fills the {context} variable in the Prompt component, which informs the Open AI model component’s responses.

Alternatively, connect the vector database component’s Retriever port to a retriever tool, and then to an agent component. This enables the agent to use your vector database as a tool and make decisions based on the available data.

vector store agent retrieval tool

Astra DB Vector Store

This component implements a Vector Store using Astra DB Serverless with search capabilities.

Parameters

Inputs
Name Display Name Info

collection_name

Collection Name

The name of the collection within Astra DB Serverless where the vectors will be stored (required).

token

Astra DB Application Token

Authentication token for accessing Astra DB Serverless (required).

api_endpoint

API Endpoint

API endpoint URL for the Astra DB Serverless service (required).

search_input

Search Input

Query string for similarity search.

ingest_data

Ingest Data

Data to be ingested into the vector store.

namespace

Namespace

Optional namespace within Astra DB Serverless to use for the collection.

embedding_service

Embedding Model or Astra Vectorize

Determines whether to use an Embedding Model or Astra Vectorize for the collection.

embedding

Embedding Model

Allows an embedding model configuration (when using Embedding Model).

provider

Vectorize Provider

Provider for Astra Vectorize (when using Astra Vectorize).

metric

Metric

Optional distance metric for vector comparisons.

batch_size

Batch Size

Optional number of data to process in a single batch.

setup_mode

Setup Mode

Configuration mode for setting up the vector store (options: "Sync", "Async", "Off", default: "Sync").

pre_delete_collection

Pre Delete Collection

Boolean flag to determine whether to delete the collection before creating a new one.

number_of_results

Number of Results

Number of results to return in similarity search (default: 4).

search_type

Search Type

Search type to use (options: "Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)").

search_score_threshold

Search Score Threshold

Minimum similarity score threshold for search results.

search_filter

Search Metadata Filter

Optional dictionary of filters to apply to the search query.

Outputs
Name Display Name Info

vector_store

Vector Store

Built Astra DB Serverless vector store

search_results

Search Results

Results of the similarity search as a list of Data objects

Component code

astradb.py
from collections import defaultdict
from dataclasses import asdict, dataclass, field

from astrapy import AstraDBAdmin, DataAPIClient, Database
from astrapy.info import CollectionDescriptor
from langchain_astradb import AstraDBVectorStore, CollectionVectorServiceOptions

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers import docs_to_data
from langflow.inputs import FloatInput, NestedDictInput
from langflow.io import (
    BoolInput,
    DropdownInput,
    HandleInput,
    IntInput,
    SecretStrInput,
    StrInput,
)
from langflow.schema import Data
from langflow.utils.version import get_version_info


class AstraDBVectorStoreComponent(LCVectorStoreComponent):
    display_name: str = "Astra DB"
    description: str = "Ingest and search documents in Astra DB"
    documentation: str = "https://docs.datastax.com/en/langflow/astra-components.html"
    name = "AstraDB"
    icon: str = "AstraDB"

    _cached_vector_store: AstraDBVectorStore | None = None

    @dataclass
    class NewDatabaseInput:
        functionality: str = "create"
        fields: dict[str, dict] = field(
            default_factory=lambda: {
                "data": {
                    "node": {
                        "name": "create_database",
                        "description": "",
                        "display_name": "Create new database",
                        "field_order": ["new_database_name", "cloud_provider", "region"],
                        "template": {
                            "new_database_name": StrInput(
                                name="new_database_name",
                                display_name="Name",
                                info="Name of the new database to create in Astra DB.",
                                required=True,
                            ),
                            "cloud_provider": DropdownInput(
                                name="cloud_provider",
                                display_name="Cloud provider",
                                info="Cloud provider for the new database.",
                                options=["Amazon Web Services", "Google Cloud Platform", "Microsoft Azure"],
                                required=True,
                                real_time_refresh=True,
                            ),
                            "region": DropdownInput(
                                name="region",
                                display_name="Region",
                                info="Region for the new database.",
                                options=[],
                                required=True,
                            ),
                        },
                    },
                }
            }
        )

    @dataclass
    class NewCollectionInput:
        functionality: str = "create"
        fields: dict[str, dict] = field(
            default_factory=lambda: {
                "data": {
                    "node": {
                        "name": "create_collection",
                        "description": "",
                        "display_name": "Create new collection",
                        "field_order": [
                            "new_collection_name",
                            "embedding_generation_provider",
                            "embedding_generation_model",
                            "dimension",
                        ],
                        "template": {
                            "new_collection_name": StrInput(
                                name="new_collection_name",
                                display_name="Name",
                                info="Name of the new collection to create in Astra DB.",
                                required=True,
                            ),
                            "embedding_generation_provider": DropdownInput(
                                name="embedding_generation_provider",
                                display_name="Embedding generation method",
                                info="Provider to use for generating embeddings.",
                                real_time_refresh=True,
                                required=True,
                                options=["Bring your own", "Nvidia"],
                            ),
                            "embedding_generation_model": DropdownInput(
                                name="embedding_generation_model",
                                display_name="Embedding model",
                                info="Model to use for generating embeddings.",
                                required=True,
                                options=[],
                            ),
                            "dimension": IntInput(
                                name="dimension",
                                display_name="Dimensions (Required only for `Bring your own`)",
                                info="Dimensions of the embeddings to generate.",
                                required=False,
                                value=1024,
                            ),
                        },
                    },
                }
            }
        )

    inputs = [
        SecretStrInput(
            name="token",
            display_name="Astra DB Application Token",
            info="Authentication token for accessing Astra DB.",
            value="ASTRA_DB_APPLICATION_TOKEN",
            required=True,
            real_time_refresh=True,
            input_types=[],
        ),
        StrInput(
            name="environment",
            display_name="Environment",
            info="The environment for the Astra DB API Endpoint.",
            advanced=True,
            real_time_refresh=True,
        ),
        DropdownInput(
            name="database_name",
            display_name="Database",
            info="The Database name for the Astra DB instance.",
            required=True,
            refresh_button=True,
            real_time_refresh=True,
            dialog_inputs=asdict(NewDatabaseInput()),
            combobox=True,
        ),
        StrInput(
            name="api_endpoint",
            display_name="Astra DB API Endpoint",
            info="The API Endpoint for the Astra DB instance. Supercedes database selection.",
            advanced=True,
        ),
        DropdownInput(
            name="collection_name",
            display_name="Collection",
            info="The name of the collection within Astra DB where the vectors will be stored.",
            required=True,
            refresh_button=True,
            real_time_refresh=True,
            dialog_inputs=asdict(NewCollectionInput()),
            combobox=True,
            advanced=True,
        ),
        StrInput(
            name="keyspace",
            display_name="Keyspace",
            info="Optional keyspace within Astra DB to use for the collection.",
            advanced=True,
        ),
        DropdownInput(
            name="embedding_choice",
            display_name="Embedding Model or Astra Vectorize",
            info="Choose an embedding model or use Astra Vectorize.",
            options=["Embedding Model", "Astra Vectorize"],
            value="Embedding Model",
            advanced=True,
            real_time_refresh=True,
        ),
        HandleInput(
            name="embedding_model",
            display_name="Embedding Model",
            input_types=["Embeddings"],
            info="Specify the Embedding Model. Not required for Astra Vectorize collections.",
            required=False,
        ),
        *LCVectorStoreComponent.inputs,
        IntInput(
            name="number_of_results",
            display_name="Number of Search Results",
            info="Number of search results to return.",
            advanced=True,
            value=4,
        ),
        DropdownInput(
            name="search_type",
            display_name="Search Type",
            info="Search type to use",
            options=["Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)"],
            value="Similarity",
            advanced=True,
        ),
        FloatInput(
            name="search_score_threshold",
            display_name="Search Score Threshold",
            info="Minimum similarity score threshold for search results. "
            "(when using 'Similarity with score threshold')",
            value=0,
            advanced=True,
        ),
        NestedDictInput(
            name="advanced_search_filter",
            display_name="Search Metadata Filter",
            info="Optional dictionary of filters to apply to the search query.",
            advanced=True,
        ),
        BoolInput(
            name="autodetect_collection",
            display_name="Autodetect Collection",
            info="Boolean flag to determine whether to autodetect the collection.",
            advanced=True,
            value=True,
        ),
        StrInput(
            name="content_field",
            display_name="Content Field",
            info="Field to use as the text content field for the vector store.",
            advanced=True,
        ),
        StrInput(
            name="deletion_field",
            display_name="Deletion Based On Field",
            info="When this parameter is provided, documents in the target collection with "
            "metadata field values matching the input metadata field value will be deleted "
            "before new data is loaded.",
            advanced=True,
        ),
        BoolInput(
            name="ignore_invalid_documents",
            display_name="Ignore Invalid Documents",
            info="Boolean flag to determine whether to ignore invalid documents at runtime.",
            advanced=True,
        ),
        NestedDictInput(
            name="astradb_vectorstore_kwargs",
            display_name="AstraDBVectorStore Parameters",
            info="Optional dictionary of additional parameters for the AstraDBVectorStore.",
            advanced=True,
        ),
    ]

    @classmethod
    def map_cloud_providers(cls):
        # TODO: Programmatically fetch the regions for each cloud provider
        return {
            "Amazon Web Services": {
                "id": "aws",
                "regions": ["us-east-2", "ap-south-1", "eu-west-1"],
            },
            "Google Cloud Platform": {
                "id": "gcp",
                "regions": ["us-east1"],
            },
            "Microsoft Azure": {
                "id": "azure",
                "regions": ["westus3"],
            },
        }

    @classmethod
    def get_vectorize_providers(cls, token: str, environment: str | None = None, api_endpoint: str | None = None):
        try:
            # Get the admin object
            admin = AstraDBAdmin(token=token, environment=environment)
            db_admin = admin.get_database_admin(api_endpoint=api_endpoint)

            # Get the list of embedding providers
            embedding_providers = db_admin.find_embedding_providers().as_dict()

            vectorize_providers_mapping = {}
            # Map the provider display name to the provider key and models
            for provider_key, provider_data in embedding_providers["embeddingProviders"].items():
                # Get the provider display name and models
                display_name = provider_data["displayName"]
                models = [model["name"] for model in provider_data["models"]]

                # Build our mapping
                vectorize_providers_mapping[display_name] = [provider_key, models]

            # Sort the resulting dictionary
            return defaultdict(list, dict(sorted(vectorize_providers_mapping.items())))
        except Exception as e:
            msg = f"Error fetching vectorize providers: {e}"
            raise ValueError(msg) from e

    @classmethod
    async def create_database_api(
        cls,
        new_database_name: str,
        cloud_provider: str,
        region: str,
        token: str,
        environment: str | None = None,
        keyspace: str | None = None,
    ):
        client = DataAPIClient(token=token, environment=environment)

        # Get the admin object
        admin_client = client.get_admin(token=token)

        # Call the create database function
        return await admin_client.async_create_database(
            name=new_database_name,
            cloud_provider=cls.map_cloud_providers()[cloud_provider]["id"],
            region=region,
            keyspace=keyspace,
            wait_until_active=False,
        )

    @classmethod
    async def create_collection_api(
        cls,
        new_collection_name: str,
        token: str,
        api_endpoint: str,
        environment: str | None = None,
        keyspace: str | None = None,
        dimension: int | None = None,
        embedding_generation_provider: str | None = None,
        embedding_generation_model: str | None = None,
    ):
        # Create the data API client
        client = DataAPIClient(token=token, environment=environment)

        # Get the database object
        database = client.get_async_database(api_endpoint=api_endpoint, token=token)

        # Build vectorize options, if needed
        vectorize_options = None
        if not dimension:
            vectorize_options = CollectionVectorServiceOptions(
                provider=cls.get_vectorize_providers(
                    token=token, environment=environment, api_endpoint=api_endpoint
                ).get(embedding_generation_provider, [None, []])[0],
                model_name=embedding_generation_model,
            )

        # Create the collection
        return await database.create_collection(
            name=new_collection_name,
            keyspace=keyspace,
            dimension=dimension,
            service=vectorize_options,
        )

    @classmethod
    def get_database_list_static(cls, token: str, environment: str | None = None):
        client = DataAPIClient(token=token, environment=environment)

        # Get the admin object
        admin_client = client.get_admin(token=token)

        # Get the list of databases
        db_list = list(admin_client.list_databases())

        # Set the environment properly
        env_string = ""
        if environment and environment != "prod":
            env_string = f"-{environment}"

        # Generate the api endpoint for each database
        db_info_dict = {}
        for db in db_list:
            try:
                # Get the API endpoint for the database
                api_endpoint = f"https://{db.info.id}-{db.info.region}.apps.astra{env_string}.datastax.com"

                # Get the number of collections
                try:
                    num_collections = len(
                        list(
                            client.get_database(
                                api_endpoint=api_endpoint, token=token, keyspace=db.info.keyspace
                            ).list_collection_names(keyspace=db.info.keyspace)
                        )
                    )
                except Exception:  # noqa: BLE001
                    num_collections = 0
                    if db.status != "PENDING":
                        continue

                # Add the database to the dictionary
                db_info_dict[db.info.name] = {
                    "api_endpoint": api_endpoint,
                    "collections": num_collections,
                    "status": db.status if db.status != "ACTIVE" else None,
                }
            except Exception:  # noqa: BLE001, S110
                pass

        return db_info_dict

    def get_database_list(self):
        return self.get_database_list_static(token=self.token, environment=self.environment)

    @classmethod
    def get_api_endpoint_static(
        cls,
        token: str,
        environment: str | None = None,
        api_endpoint: str | None = None,
        database_name: str | None = None,
    ):
        # If the api_endpoint is set, return it
        if api_endpoint:
            return api_endpoint

        # Check if the database_name is like a url
        if database_name and database_name.startswith("https://"):
            return database_name

        # If the database is not set, nothing we can do.
        if not database_name:
            return None

        # Grab the database object
        db = cls.get_database_list_static(token=token, environment=environment).get(database_name)
        if not db:
            return None

        # Otherwise, get the URL from the database list
        return db.get("api_endpoint")

    def get_api_endpoint(self):
        return self.get_api_endpoint_static(
            token=self.token,
            environment=self.environment,
            api_endpoint=self.api_endpoint,
            database_name=self.database_name,
        )

    def get_keyspace(self):
        keyspace = self.keyspace

        if keyspace:
            return keyspace.strip()

        return None

    def get_database_object(self, api_endpoint: str | None = None):
        try:
            client = DataAPIClient(token=self.token, environment=self.environment)

            return client.get_database(
                api_endpoint=api_endpoint or self.get_api_endpoint(),
                token=self.token,
                keyspace=self.get_keyspace(),
            )
        except Exception as e:
            msg = f"Error fetching database object: {e}"
            raise ValueError(msg) from e

    def collection_data(self, collection_name: str, database: Database | None = None):
        try:
            if not database:
                client = DataAPIClient(token=self.token, environment=self.environment)

                database = client.get_database(
                    api_endpoint=self.get_api_endpoint(),
                    token=self.token,
                    keyspace=self.get_keyspace(),
                )

            collection = database.get_collection(collection_name, keyspace=self.get_keyspace())

            return collection.estimated_document_count()
        except Exception as e:  # noqa: BLE001
            self.log(f"Error checking collection data: {e}")

            return None

    def _initialize_database_options(self):
        try:
            return [
                {
                    "name": name,
                    "status": info["status"],
                    "collections": info["collections"],
                    "api_endpoint": info["api_endpoint"],
                    "icon": "data",
                }
                for name, info in self.get_database_list().items()
            ]
        except Exception as e:
            msg = f"Error fetching database options: {e}"
            raise ValueError(msg) from e

    @classmethod
    def get_provider_icon(cls, collection: CollectionDescriptor | None = None, provider_name: str | None = None) -> str:
        # Get the provider name from the collection
        provider_name = provider_name or (
            collection.options.vector.service.provider
            if collection and collection.options and collection.options.vector and collection.options.vector.service
            else None
        )

        # If there is no provider, use the vector store icon
        if not provider_name or provider_name == "bring your own":
            return "vectorstores"

        # Special case for certain models
        # TODO: Add more icons
        if provider_name == "nvidia":
            return "NVIDIA"
        if provider_name == "openai":
            return "OpenAI"

        # Title case on the provider for the icon if no special case
        return provider_name.title()

    def _initialize_collection_options(self, api_endpoint: str | None = None):
        # Nothing to generate if we don't have an API endpoint yet
        api_endpoint = api_endpoint or self.get_api_endpoint()
        if not api_endpoint:
            return []

        # Retrieve the database object
        database = self.get_database_object(api_endpoint=api_endpoint)

        # Get the list of collections
        collection_list = list(database.list_collections(keyspace=self.get_keyspace()))

        # Return the list of collections and metadata associated
        return [
            {
                "name": col.name,
                "records": self.collection_data(collection_name=col.name, database=database),
                "provider": (
                    col.options.vector.service.provider if col.options.vector and col.options.vector.service else None
                ),
                "icon": self.get_provider_icon(collection=col),
                "model": (
                    col.options.vector.service.model_name if col.options.vector and col.options.vector.service else None
                ),
            }
            for col in collection_list
        ]

    def reset_provider_options(self, build_config: dict):
        # Get the list of vectorize providers
        vectorize_providers = self.get_vectorize_providers(
            token=self.token,
            environment=self.environment,
            api_endpoint=build_config["api_endpoint"]["value"],
        )

        # Append a special case for Bring your own
        vectorize_providers["Bring your own"] = [None, ["Bring your own"]]

        # If the collection is set, allow user to see embedding options
        build_config["collection_name"]["dialog_inputs"]["fields"]["data"]["node"]["template"][
            "embedding_generation_provider"
        ]["options"] = ["Bring your own", "Nvidia", *[key for key in vectorize_providers if key != "Nvidia"]]

        # For all not Bring your own or Nvidia providers, add metadata saying configure in Astra DB Portal
        provider_options = build_config["collection_name"]["dialog_inputs"]["fields"]["data"]["node"]["template"][
            "embedding_generation_provider"
        ]["options"]

        # Go over each possible provider and add metadata to configure in Astra DB Portal
        for provider in provider_options:
            # Skip Bring your own and Nvidia, automatically configured
            if provider in ["Bring your own", "Nvidia"]:
                build_config["collection_name"]["dialog_inputs"]["fields"]["data"]["node"]["template"][
                    "embedding_generation_provider"
                ]["options_metadata"].append({"icon": self.get_provider_icon(provider_name=provider.lower())})
                continue

            # Add metadata to configure in Astra DB Portal
            build_config["collection_name"]["dialog_inputs"]["fields"]["data"]["node"]["template"][
                "embedding_generation_provider"
            ]["options_metadata"].append({" ": "Configure in Astra DB Portal"})

        # And allow the user to see the models based on a selected provider
        embedding_provider = build_config["collection_name"]["dialog_inputs"]["fields"]["data"]["node"]["template"][
            "embedding_generation_provider"
        ]["value"]

        # Set the options for the embedding model based on the provider
        build_config["collection_name"]["dialog_inputs"]["fields"]["data"]["node"]["template"][
            "embedding_generation_model"
        ]["options"] = vectorize_providers.get(embedding_provider, [[], []])[1]

        return build_config

    def reset_collection_list(self, build_config: dict):
        # Get the list of options we have based on the token provided
        collection_options = self._initialize_collection_options(api_endpoint=build_config["api_endpoint"]["value"])

        # If we retrieved options based on the token, show the dropdown
        build_config["collection_name"]["options"] = [col["name"] for col in collection_options]
        build_config["collection_name"]["options_metadata"] = [
            {k: v for k, v in col.items() if k not in ["name"]} for col in collection_options
        ]

        # Reset the selected collection
        if build_config["collection_name"]["value"] not in build_config["collection_name"]["options"]:
            build_config["collection_name"]["value"] = ""

        # If we have a database, collection name should not be advanced
        build_config["collection_name"]["advanced"] = not build_config["database_name"]["value"]

        return build_config

    def reset_database_list(self, build_config: dict):
        # Get the list of options we have based on the token provided
        database_options = self._initialize_database_options()

        # If we retrieved options based on the token, show the dropdown
        build_config["database_name"]["options"] = [db["name"] for db in database_options]
        build_config["database_name"]["options_metadata"] = [
            {k: v for k, v in db.items() if k not in ["name"]} for db in database_options
        ]

        # Reset the selected database
        if build_config["database_name"]["value"] not in build_config["database_name"]["options"]:
            build_config["database_name"]["value"] = ""
            build_config["api_endpoint"]["value"] = ""
            build_config["collection_name"]["advanced"] = True

        # If we have a token, database name should not be advanced
        build_config["database_name"]["advanced"] = not build_config["token"]["value"]

        return build_config

    def reset_build_config(self, build_config: dict):
        # Reset the list of databases we have based on the token provided
        build_config["database_name"]["options"] = []
        build_config["database_name"]["options_metadata"] = []
        build_config["database_name"]["value"] = ""
        build_config["database_name"]["advanced"] = True
        build_config["api_endpoint"]["value"] = ""

        # Reset the list of collections and metadata associated
        build_config["collection_name"]["options"] = []
        build_config["collection_name"]["options_metadata"] = []
        build_config["collection_name"]["value"] = ""
        build_config["collection_name"]["advanced"] = True

        return build_config

    async def update_build_config(self, build_config: dict, field_value: str, field_name: str | None = None):
        # Callback for database creation
        if field_name == "database_name" and isinstance(field_value, dict) and "new_database_name" in field_value:
            try:
                await self.create_database_api(
                    new_database_name=field_value["new_database_name"],
                    token=self.token,
                    keyspace=self.get_keyspace(),
                    environment=self.environment,
                    cloud_provider=field_value["cloud_provider"],
                    region=field_value["region"],
                )
            except Exception as e:
                msg = f"Error creating database: {e}"
                raise ValueError(msg) from e

            # Add the new database to the list of options
            build_config["database_name"]["options"] = build_config["database_name"]["options"] + [
                field_value["new_database_name"]
            ]
            build_config["database_name"]["options_metadata"] = build_config["database_name"]["options_metadata"] + [
                {"status": "PENDING"}
            ]

            return self.reset_collection_list(build_config)

        # This is the callback required to update the list of regions for a cloud provider
        if field_name == "database_name" and isinstance(field_value, dict) and "new_database_name" not in field_value:
            cloud_provider = field_value["cloud_provider"]
            build_config["database_name"]["dialog_inputs"]["fields"]["data"]["node"]["template"]["region"][
                "options"
            ] = self.map_cloud_providers()[cloud_provider]["regions"]

            return build_config

        # Callback for the creation of collections
        if field_name == "collection_name" and isinstance(field_value, dict) and "new_collection_name" in field_value:
            try:
                # Get the dimension if its a BYO provider
                dimension = (
                    field_value["dimension"]
                    if field_value["embedding_generation_provider"] == "Bring your own"
                    else None
                )

                # Create the collection
                await self.create_collection_api(
                    new_collection_name=field_value["new_collection_name"],
                    token=self.token,
                    api_endpoint=build_config["api_endpoint"]["value"],
                    environment=self.environment,
                    keyspace=self.get_keyspace(),
                    dimension=dimension,
                    embedding_generation_provider=field_value["embedding_generation_provider"],
                    embedding_generation_model=field_value["embedding_generation_model"],
                )
            except Exception as e:
                msg = f"Error creating collection: {e}"
                raise ValueError(msg) from e

            # Add the new collection to the list of options
            build_config["collection_name"]["value"] = field_value["new_collection_name"]
            build_config["collection_name"]["options"].append(field_value["new_collection_name"])

            # Get the provider and model for the new collection
            generation_provider = field_value["embedding_generation_provider"]
            provider = generation_provider if generation_provider != "Bring your own" else None
            generation_model = field_value["embedding_generation_model"]
            model = generation_model if generation_model and generation_model != "Bring your own" else None

            # Set the embedding choice
            build_config["embedding_choice"]["value"] = "Astra Vectorize" if provider else "Embedding Model"
            build_config["embedding_model"]["advanced"] = bool(provider)

            # Add the new collection to the list of options
            icon = "NVIDIA" if provider == "Nvidia" else "vectorstores"
            build_config["collection_name"]["options_metadata"] = build_config["collection_name"][
                "options_metadata"
            ] + [{"records": 0, "provider": provider, "icon": icon, "model": model}]

            return build_config

        # Callback to update the model list based on the embedding provider
        if (
            field_name == "collection_name"
            and isinstance(field_value, dict)
            and "new_collection_name" not in field_value
        ):
            return self.reset_provider_options(build_config)

        # When the component first executes, this is the update refresh call
        first_run = field_name == "collection_name" and not field_value and not build_config["database_name"]["options"]

        # If the token has not been provided, simply return the empty build config
        if not self.token:
            return self.reset_build_config(build_config)

        # If this is the first execution of the component, reset and build database list
        if first_run or field_name in ["token", "environment"]:
            return self.reset_database_list(build_config)

        # Refresh the collection name options
        if field_name == "database_name" and not isinstance(field_value, dict):
            # If missing, refresh the database options
            if field_value not in build_config["database_name"]["options"]:
                build_config = await self.update_build_config(build_config, field_value=self.token, field_name="token")
                build_config["database_name"]["value"] = ""
            else:
                # Find the position of the selected database to align with metadata
                index_of_name = build_config["database_name"]["options"].index(field_value)

                # Initializing database condition
                pending = build_config["database_name"]["options_metadata"][index_of_name]["status"] == "PENDING"
                if pending:
                    return self.update_build_config(build_config, field_value=self.token, field_name="token")

                # Set the API endpoint based on the selected database
                build_config["api_endpoint"]["value"] = build_config["database_name"]["options_metadata"][
                    index_of_name
                ]["api_endpoint"]

                # Reset the provider options
                build_config = self.reset_provider_options(build_config)

            # Reset the list of collections we have based on the token provided
            return self.reset_collection_list(build_config)

        # Hide embedding model option if opriona_metadata provider is not null
        if field_name == "collection_name" and not isinstance(field_value, dict):
            # Assume we will be autodetecting the collection:
            build_config["autodetect_collection"]["value"] = True

            # Reload the collection list
            build_config = self.reset_collection_list(build_config)

            # Set the options for collection name to be the field value if its a new collection
            if field_value and field_value not in build_config["collection_name"]["options"]:
                # Add the new collection to the list of options
                build_config["collection_name"]["options"].append(field_value)
                build_config["collection_name"]["options_metadata"].append(
                    {"records": 0, "provider": None, "icon": "", "model": None}
                )

                # Ensure that autodetect collection is set to False, since its a new collection
                build_config["autodetect_collection"]["value"] = False

            # If nothing is selected, can't detect provider - return
            if not field_value:
                return build_config

            # Find the position of the selected collection to align with metadata
            index_of_name = build_config["collection_name"]["options"].index(field_value)
            value_of_provider = build_config["collection_name"]["options_metadata"][index_of_name]["provider"]

            # If we were able to determine the Vectorize provider, set it accordingly
            if value_of_provider:
                build_config["embedding_model"]["advanced"] = True
                build_config["embedding_choice"]["value"] = "Astra Vectorize"
            else:
                build_config["embedding_model"]["advanced"] = False
                build_config["embedding_choice"]["value"] = "Embedding Model"

            return build_config

        return build_config

    @check_cached_vector_store
    def build_vector_store(self):
        try:
            from langchain_astradb import AstraDBVectorStore
        except ImportError as e:
            msg = (
                "Could not import langchain Astra DB integration package. "
                "Please install it with `pip install langchain-astradb`."
            )
            raise ImportError(msg) from e

        # Get the embedding model and additional params
        embedding_params = (
            {"embedding": self.embedding_model}
            if self.embedding_model and self.embedding_choice == "Embedding Model"
            else {}
        )

        # Get the additional parameters
        additional_params = self.astradb_vectorstore_kwargs or {}

        # Get Langflow version and platform information
        __version__ = get_version_info()["version"]
        langflow_prefix = ""
        # if os.getenv("AWS_EXECUTION_ENV") == "AWS_ECS_FARGATE":  # TODO: More precise way of detecting
        #     langflow_prefix = "ds-"

        # Get the database object
        database = self.get_database_object()
        autodetect = self.collection_name in database.list_collection_names() and self.autodetect_collection

        # Bundle up the auto-detect parameters
        autodetect_params = {
            "autodetect_collection": autodetect,
            "content_field": (
                self.content_field
                if self.content_field and embedding_params
                else (
                    "page_content"
                    if embedding_params
                    and self.collection_data(collection_name=self.collection_name, database=database) == 0
                    else None
                )
            ),
            "ignore_invalid_documents": self.ignore_invalid_documents,
        }

        # Attempt to build the Vector Store object
        try:
            vector_store = AstraDBVectorStore(
                # Astra DB Authentication Parameters
                token=self.token,
                api_endpoint=database.api_endpoint,
                namespace=database.keyspace,
                collection_name=self.collection_name,
                environment=self.environment,
                # Astra DB Usage Tracking Parameters
                ext_callers=[(f"{langflow_prefix}langflow", __version__)],
                # Astra DB Vector Store Parameters
                **autodetect_params,
                **embedding_params,
                **additional_params,
            )
        except Exception as e:
            msg = f"Error initializing AstraDBVectorStore: {e}"
            raise ValueError(msg) from e

        # Add documents to the vector store
        self._add_documents_to_vector_store(vector_store)

        return vector_store

    def _add_documents_to_vector_store(self, vector_store) -> None:
        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                msg = "Vector Store Inputs must be Data objects."
                raise TypeError(msg)

        if documents and self.deletion_field:
            self.log(f"Deleting documents where {self.deletion_field}")
            try:
                database = self.get_database_object()
                collection = database.get_collection(self.collection_name, keyspace=database.keyspace)
                delete_values = list({doc.metadata[self.deletion_field] for doc in documents})
                self.log(f"Deleting documents where {self.deletion_field} matches {delete_values}.")
                collection.delete_many({f"metadata.{self.deletion_field}": {"$in": delete_values}})
            except Exception as e:
                msg = f"Error deleting documents from AstraDBVectorStore based on '{self.deletion_field}': {e}"
                raise ValueError(msg) from e

        if documents:
            self.log(f"Adding {len(documents)} documents to the Vector Store.")
            try:
                vector_store.add_documents(documents)
            except Exception as e:
                msg = f"Error adding documents to AstraDBVectorStore: {e}"
                raise ValueError(msg) from e
        else:
            self.log("No documents to add to the Vector Store.")

    def _map_search_type(self) -> str:
        search_type_mapping = {
            "Similarity with score threshold": "similarity_score_threshold",
            "MMR (Max Marginal Relevance)": "mmr",
        }

        return search_type_mapping.get(self.search_type, "similarity")

    def _build_search_args(self):
        query = self.search_query if isinstance(self.search_query, str) and self.search_query.strip() else None

        if query:
            args = {
                "query": query,
                "search_type": self._map_search_type(),
                "k": self.number_of_results,
                "score_threshold": self.search_score_threshold,
            }
        elif self.advanced_search_filter:
            args = {
                "n": self.number_of_results,
            }
        else:
            return {}

        filter_arg = self.advanced_search_filter or {}
        if filter_arg:
            args["filter"] = filter_arg

        return args

    def search_documents(self, vector_store=None) -> list[Data]:
        vector_store = vector_store or self.build_vector_store()

        self.log(f"Search input: {self.search_query}")
        self.log(f"Search type: {self.search_type}")
        self.log(f"Number of results: {self.number_of_results}")

        try:
            search_args = self._build_search_args()
        except Exception as e:
            msg = f"Error in AstraDBVectorStore._build_search_args: {e}"
            raise ValueError(msg) from e

        if not search_args:
            self.log("No search input or filters provided. Skipping search.")
            return []

        docs = []
        search_method = "search" if "query" in search_args else "metadata_search"

        try:
            self.log(f"Calling vector_store.{search_method} with args: {search_args}")
            docs = getattr(vector_store, search_method)(**search_args)
        except Exception as e:
            msg = f"Error performing {search_method} in AstraDBVectorStore: {e}"
            raise ValueError(msg) from e

        self.log(f"Retrieved documents: {len(docs)}")

        data = docs_to_data(docs)
        self.log(f"Converted documents to data: {len(data)}")
        self.status = data

        return data

    def get_retriever_kwargs(self):
        search_args = self._build_search_args()

        return {
            "search_type": self._map_search_type(),
            "search_kwargs": search_args,
        }

Astra DB graph vector store

This component implements a Vector Store using Astra DB Serverless with graph capabilities.

Parameters

Inputs
Name Type Description

token

SecretString

Authentication token for accessing Astra DB.

api_endpoint

SecretString

API endpoint URL for the Astra DB service.

collection_name

String

The name of the collection within Astra DB where vectors will be stored.

embedding

Handle

Embedding model to use.

search_input

Multiline

Input text for searching documents.

ingest_data

Data

Data to be ingested into the vector store.

Outputs
Name Type Description

vector_store

VectorStore

An instance of AstraDBGraphVectorStore for storing and searching vectors.

Component code

astradb_graph.py
import os

import orjson
from astrapy.admin import parse_api_endpoint

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers import docs_to_data
from langflow.inputs import (
    BoolInput,
    DictInput,
    DropdownInput,
    FloatInput,
    HandleInput,
    IntInput,
    SecretStrInput,
    StrInput,
)
from langflow.schema import Data


class AstraDBGraphVectorStoreComponent(LCVectorStoreComponent):
    display_name: str = "Astra DB Graph"
    description: str = "Implementation of Graph Vector Store using Astra DB"
    name = "AstraDBGraph"
    icon: str = "AstraDB"

    inputs = [
        SecretStrInput(
            name="token",
            display_name="Astra DB Application Token",
            info="Authentication token for accessing Astra DB.",
            value="ASTRA_DB_APPLICATION_TOKEN",
            required=True,
            advanced=os.getenv("ASTRA_ENHANCED", "false").lower() == "true",
        ),
        SecretStrInput(
            name="api_endpoint",
            display_name="Database" if os.getenv("ASTRA_ENHANCED", "false").lower() == "true" else "API Endpoint",
            info="API endpoint URL for the Astra DB service.",
            value="ASTRA_DB_API_ENDPOINT",
            required=True,
        ),
        StrInput(
            name="collection_name",
            display_name="Collection Name",
            info="The name of the collection within Astra DB where the vectors will be stored.",
            required=True,
        ),
        StrInput(
            name="metadata_incoming_links_key",
            display_name="Metadata incoming links key",
            info="Metadata key used for incoming links.",
            advanced=True,
        ),
        *LCVectorStoreComponent.inputs,
        StrInput(
            name="keyspace",
            display_name="Keyspace",
            info="Optional keyspace within Astra DB to use for the collection.",
            advanced=True,
        ),
        HandleInput(
            name="embedding_model",
            display_name="Embedding Model",
            input_types=["Embeddings"],
            info="Allows an embedding model configuration.",
        ),
        DropdownInput(
            name="metric",
            display_name="Metric",
            info="Optional distance metric for vector comparisons in the vector store.",
            options=["cosine", "dot_product", "euclidean"],
            value="cosine",
            advanced=True,
        ),
        IntInput(
            name="batch_size",
            display_name="Batch Size",
            info="Optional number of data to process in a single batch.",
            advanced=True,
        ),
        IntInput(
            name="bulk_insert_batch_concurrency",
            display_name="Bulk Insert Batch Concurrency",
            info="Optional concurrency level for bulk insert operations.",
            advanced=True,
        ),
        IntInput(
            name="bulk_insert_overwrite_concurrency",
            display_name="Bulk Insert Overwrite Concurrency",
            info="Optional concurrency level for bulk insert operations that overwrite existing data.",
            advanced=True,
        ),
        IntInput(
            name="bulk_delete_concurrency",
            display_name="Bulk Delete Concurrency",
            info="Optional concurrency level for bulk delete operations.",
            advanced=True,
        ),
        DropdownInput(
            name="setup_mode",
            display_name="Setup Mode",
            info="Configuration mode for setting up the vector store, with options like 'Sync', or 'Off'.",
            options=["Sync", "Off"],
            advanced=True,
            value="Sync",
        ),
        BoolInput(
            name="pre_delete_collection",
            display_name="Pre Delete Collection",
            info="Boolean flag to determine whether to delete the collection before creating a new one.",
            advanced=True,
            value=False,
        ),
        StrInput(
            name="metadata_indexing_include",
            display_name="Metadata Indexing Include",
            info="Optional list of metadata fields to include in the indexing.",
            advanced=True,
            list=True,
        ),
        StrInput(
            name="metadata_indexing_exclude",
            display_name="Metadata Indexing Exclude",
            info="Optional list of metadata fields to exclude from the indexing.",
            advanced=True,
            list=True,
        ),
        StrInput(
            name="collection_indexing_policy",
            display_name="Collection Indexing Policy",
            info='Optional JSON string for the "indexing" field of the collection. '
            "See https://docs.datastax.com/en/astra-db-serverless/api-reference/collections.html#the-indexing-option",
            advanced=True,
        ),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            advanced=True,
            value=4,
        ),
        DropdownInput(
            name="search_type",
            display_name="Search Type",
            info="Search type to use",
            options=[
                "Similarity",
                "Similarity with score threshold",
                "MMR (Max Marginal Relevance)",
                "Graph Traversal",
                "MMR (Max Marginal Relevance) Graph Traversal",
            ],
            value="MMR (Max Marginal Relevance) Graph Traversal",
            advanced=True,
        ),
        FloatInput(
            name="search_score_threshold",
            display_name="Search Score Threshold",
            info="Minimum similarity score threshold for search results. "
            "(when using 'Similarity with score threshold')",
            value=0,
            advanced=True,
        ),
        DictInput(
            name="search_filter",
            display_name="Search Metadata Filter",
            info="Optional dictionary of filters to apply to the search query.",
            advanced=True,
            is_list=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self):
        try:
            from langchain_astradb import AstraDBGraphVectorStore
            from langchain_astradb.utils.astradb import SetupMode
        except ImportError as e:
            msg = (
                "Could not import langchain Astra DB integration package. "
                "Please install it with `pip install langchain-astradb`."
            )
            raise ImportError(msg) from e

        try:
            if not self.setup_mode:
                self.setup_mode = self._inputs["setup_mode"].options[0]

            setup_mode_value = SetupMode[self.setup_mode.upper()]
        except KeyError as e:
            msg = f"Invalid setup mode: {self.setup_mode}"
            raise ValueError(msg) from e

        try:
            self.log(f"Initializing Graph Vector Store {self.collection_name}")

            vector_store = AstraDBGraphVectorStore(
                embedding=self.embedding_model,
                collection_name=self.collection_name,
                metadata_incoming_links_key=self.metadata_incoming_links_key or "incoming_links",
                token=self.token,
                api_endpoint=self.api_endpoint,
                namespace=self.keyspace or None,
                environment=parse_api_endpoint(self.api_endpoint).environment if self.api_endpoint else None,
                metric=self.metric or None,
                batch_size=self.batch_size or None,
                bulk_insert_batch_concurrency=self.bulk_insert_batch_concurrency or None,
                bulk_insert_overwrite_concurrency=self.bulk_insert_overwrite_concurrency or None,
                bulk_delete_concurrency=self.bulk_delete_concurrency or None,
                setup_mode=setup_mode_value,
                pre_delete_collection=self.pre_delete_collection,
                metadata_indexing_include=[s for s in self.metadata_indexing_include if s] or None,
                metadata_indexing_exclude=[s for s in self.metadata_indexing_exclude if s] or None,
                collection_indexing_policy=orjson.loads(self.collection_indexing_policy.encode("utf-8"))
                if self.collection_indexing_policy
                else None,
            )
        except Exception as e:
            msg = f"Error initializing AstraDBGraphVectorStore: {e}"
            raise ValueError(msg) from e

        self.log(f"Vector Store initialized: {vector_store.astra_env.collection_name}")
        self._add_documents_to_vector_store(vector_store)

        return vector_store

    def _add_documents_to_vector_store(self, vector_store) -> None:
        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                msg = "Vector Store Inputs must be Data objects."
                raise TypeError(msg)

        if documents:
            self.log(f"Adding {len(documents)} documents to the Vector Store.")
            try:
                vector_store.add_documents(documents)
            except Exception as e:
                msg = f"Error adding documents to AstraDBGraphVectorStore: {e}"
                raise ValueError(msg) from e
        else:
            self.log("No documents to add to the Vector Store.")

    def _map_search_type(self) -> str:
        match self.search_type:
            case "Similarity":
                return "similarity"
            case "Similarity with score threshold":
                return "similarity_score_threshold"
            case "MMR (Max Marginal Relevance)":
                return "mmr"
            case "Graph Traversal":
                return "traversal"
            case "MMR (Max Marginal Relevance) Graph Traversal":
                return "mmr_traversal"
            case _:
                return "similarity"

    def _build_search_args(self):
        args = {
            "k": self.number_of_results,
            "score_threshold": self.search_score_threshold,
        }

        if self.search_filter:
            clean_filter = {k: v for k, v in self.search_filter.items() if k and v}
            if len(clean_filter) > 0:
                args["filter"] = clean_filter
        return args

    def search_documents(self, vector_store=None) -> list[Data]:
        if not vector_store:
            vector_store = self.build_vector_store()

        self.log("Searching for documents in AstraDBGraphVectorStore.")
        self.log(f"Search query: {self.search_query}")
        self.log(f"Search type: {self.search_type}")
        self.log(f"Number of results: {self.number_of_results}")

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            try:
                search_type = self._map_search_type()
                search_args = self._build_search_args()

                docs = vector_store.search(query=self.search_query, search_type=search_type, **search_args)

                # Drop links from the metadata. At this point the links don't add any value for building the
                # context and haven't been restored to json which causes the conversion to fail.
                self.log("Removing links from metadata.")
                for doc in docs:
                    if "links" in doc.metadata:
                        doc.metadata.pop("links")

            except Exception as e:
                msg = f"Error performing search in AstraDBGraphVectorStore: {e}"
                raise ValueError(msg) from e

            self.log(f"Retrieved documents: {len(docs)}")

            data = docs_to_data(docs)

            self.log(f"Converted documents to data: {len(data)}")

            self.status = data
            return data
        self.log("No search input provided. Skipping search.")
        return []

    def get_retriever_kwargs(self):
        search_args = self._build_search_args()
        return {
            "search_type": self._map_search_type(),
            "search_kwargs": search_args,
        }

Cassandra

This component creates a Cassandra Vector Store with search capabilities. For more information, see the Cassandra documentation.

Parameters

Inputs
Name Type Description

database_ref

String

Contact points for the database or AstraDB database ID

username

String

Username for the database (leave empty for AstraDB)

token

SecretString

User password for the database or AstraDB token

keyspace

String

Table Keyspace or AstraDB namespace

table_name

String

Name of the table or AstraDB collection

ttl_seconds

Integer

Time-to-live for added texts

batch_size

Integer

Number of data to process in a single batch

setup_mode

String

Configuration mode for setting up the Cassandra table

cluster_kwargs

Dict

Additional keyword arguments for the Cassandra cluster

search_query

String

Query for similarity search

ingest_data

Data

Data to be ingested into the vector store

embedding

Embeddings

Embedding function to use

number_of_results

Integer

Number of results to return in search

search_type

String

Type of search to perform

search_score_threshold

Float

Minimum similarity score for search results

search_filter

Dict

Metadata filters for search query

body_search

String

Document textual search terms

enable_body_search

Boolean

Flag to enable body search

Outputs
Name Type Description

vector_store

Cassandra

Cassandra vector store instance

search_results

List[Data]

Results of similarity search

Component code

cassandra.py
from langchain_community.vectorstores import Cassandra

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.inputs import BoolInput, DictInput, FloatInput
from langflow.io import (
    DropdownInput,
    HandleInput,
    IntInput,
    MessageTextInput,
    SecretStrInput,
)
from langflow.schema import Data


class CassandraVectorStoreComponent(LCVectorStoreComponent):
    display_name = "Cassandra"
    description = "Cassandra Vector Store with search capabilities"
    documentation = "https://python.langchain.com/docs/modules/data_connection/vectorstores/integrations/cassandra"
    name = "Cassandra"
    icon = "Cassandra"

    inputs = [
        MessageTextInput(
            name="database_ref",
            display_name="Contact Points / Astra Database ID",
            info="Contact points for the database (or AstraDB database ID)",
            required=True,
        ),
        MessageTextInput(
            name="username", display_name="Username", info="Username for the database (leave empty for AstraDB)."
        ),
        SecretStrInput(
            name="token",
            display_name="Password / AstraDB Token",
            info="User password for the database (or AstraDB token).",
            required=True,
        ),
        MessageTextInput(
            name="keyspace",
            display_name="Keyspace",
            info="Table Keyspace (or AstraDB namespace).",
            required=True,
        ),
        MessageTextInput(
            name="table_name",
            display_name="Table Name",
            info="The name of the table (or AstraDB collection) where vectors will be stored.",
            required=True,
        ),
        IntInput(
            name="ttl_seconds",
            display_name="TTL Seconds",
            info="Optional time-to-live for the added texts.",
            advanced=True,
        ),
        IntInput(
            name="batch_size",
            display_name="Batch Size",
            info="Optional number of data to process in a single batch.",
            value=16,
            advanced=True,
        ),
        DropdownInput(
            name="setup_mode",
            display_name="Setup Mode",
            info="Configuration mode for setting up the Cassandra table, with options like 'Sync', 'Async', or 'Off'.",
            options=["Sync", "Async", "Off"],
            value="Sync",
            advanced=True,
        ),
        DictInput(
            name="cluster_kwargs",
            display_name="Cluster arguments",
            info="Optional dictionary of additional keyword arguments for the Cassandra cluster.",
            advanced=True,
            list=True,
        ),
        *LCVectorStoreComponent.inputs,
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
        DropdownInput(
            name="search_type",
            display_name="Search Type",
            info="Search type to use",
            options=["Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)"],
            value="Similarity",
            advanced=True,
        ),
        FloatInput(
            name="search_score_threshold",
            display_name="Search Score Threshold",
            info="Minimum similarity score threshold for search results. "
            "(when using 'Similarity with score threshold')",
            value=0,
            advanced=True,
        ),
        DictInput(
            name="search_filter",
            display_name="Search Metadata Filter",
            info="Optional dictionary of filters to apply to the search query.",
            advanced=True,
            list=True,
        ),
        MessageTextInput(
            name="body_search",
            display_name="Search Body",
            info="Document textual search terms to apply to the search query.",
            advanced=True,
        ),
        BoolInput(
            name="enable_body_search",
            display_name="Enable Body Search",
            info="Flag to enable body search. This must be enabled BEFORE the table is created.",
            value=False,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> Cassandra:
        try:
            import cassio
            from langchain_community.utilities.cassandra import SetupMode
        except ImportError as e:
            msg = "Could not import cassio integration package. Please install it with `pip install cassio`."
            raise ImportError(msg) from e

        from uuid import UUID

        database_ref = self.database_ref

        try:
            UUID(self.database_ref)
            is_astra = True
        except ValueError:
            is_astra = False
            if "," in self.database_ref:
                # use a copy because we can't change the type of the parameter
                database_ref = self.database_ref.split(",")

        if is_astra:
            cassio.init(
                database_id=database_ref,
                token=self.token,
                cluster_kwargs=self.cluster_kwargs,
            )
        else:
            cassio.init(
                contact_points=database_ref,
                username=self.username,
                password=self.token,
                cluster_kwargs=self.cluster_kwargs,
            )
        documents = []

        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        body_index_options = [("index_analyzer", "STANDARD")] if self.enable_body_search else None

        if self.setup_mode == "Off":
            setup_mode = SetupMode.OFF
        elif self.setup_mode == "Sync":
            setup_mode = SetupMode.SYNC
        else:
            setup_mode = SetupMode.ASYNC

        if documents:
            self.log(f"Adding {len(documents)} documents to the Vector Store.")
            table = Cassandra.from_documents(
                documents=documents,
                embedding=self.embedding,
                table_name=self.table_name,
                keyspace=self.keyspace,
                ttl_seconds=self.ttl_seconds or None,
                batch_size=self.batch_size,
                body_index_options=body_index_options,
            )
        else:
            self.log("No documents to add to the Vector Store.")
            table = Cassandra(
                embedding=self.embedding,
                table_name=self.table_name,
                keyspace=self.keyspace,
                ttl_seconds=self.ttl_seconds or None,
                body_index_options=body_index_options,
                setup_mode=setup_mode,
            )
        return table

    def _map_search_type(self) -> str:
        if self.search_type == "Similarity with score threshold":
            return "similarity_score_threshold"
        if self.search_type == "MMR (Max Marginal Relevance)":
            return "mmr"
        return "similarity"

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        self.log(f"Search input: {self.search_query}")
        self.log(f"Search type: {self.search_type}")
        self.log(f"Number of results: {self.number_of_results}")

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            try:
                search_type = self._map_search_type()
                search_args = self._build_search_args()

                self.log(f"Search args: {search_args}")

                docs = vector_store.search(query=self.search_query, search_type=search_type, **search_args)
            except KeyError as e:
                if "content" in str(e):
                    msg = (
                        "You should ingest data through Langflow (or LangChain) to query it in Langflow. "
                        "Your collection does not contain a field name 'content'."
                    )
                    raise ValueError(msg) from e
                raise

            self.log(f"Retrieved documents: {len(docs)}")

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

    def _build_search_args(self):
        args = {
            "k": self.number_of_results,
            "score_threshold": self.search_score_threshold,
        }

        if self.search_filter:
            clean_filter = {k: v for k, v in self.search_filter.items() if k and v}
            if len(clean_filter) > 0:
                args["filter"] = clean_filter
        if self.body_search:
            if not self.enable_body_search:
                msg = "You should enable body search when creating the table to search the body field."
                raise ValueError(msg)
            args["body_search"] = self.body_search
        return args

    def get_retriever_kwargs(self):
        search_args = self._build_search_args()
        return {
            "search_type": self._map_search_type(),
            "search_kwargs": search_args,
        }

Cassandra Graph Vector Store

This component implements a Cassandra Graph Vector Store with search capabilities.

Parameters

Inputs
Name Display Name Info

database_ref

Contact Points / Astra Database ID

Contact points for the database or AstraDB database ID (required).

username

Username

Username for the database (leave empty for AstraDB).

token

Password / AstraDB Token

User password for the database or AstraDB token (required).

keyspace

Keyspace

Table Keyspace or AstraDB namespace (required).

table_name

Table Name

The name of the table or AstraDB collection where vectors will be stored (required).

setup_mode

Setup Mode

Configuration mode for setting up the Cassandra table (options: "Sync", "Off", default: "Sync").

cluster_kwargs

Cluster arguments

Optional dictionary of additional keyword arguments for the Cassandra cluster.

search_query

Search Query

Query string for similarity search.

ingest_data

Ingest Data

Data to be ingested into the vector store (list of Data objects).

embedding

Embedding

Embedding model to use.

number_of_results

Number of Results

Number of results to return in similarity search (default: 4).

search_type

Search Type

Search type to use (options: "Traversal", "MMR traversal", "Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)", default: "Traversal").

depth

Depth of traversal

The maximum depth of edges to traverse (for "Traversal" or "MMR traversal" search types, default: 1).

search_score_threshold

Search Score Threshold

Minimum similarity score threshold for search results (for "Similarity with score threshold" search type).

search_filter

Search Metadata Filter

Optional dictionary of filters to apply to the search query.

Outputs
Name Display Name Info

vector_store

Vector Store

Built Cassandra Graph vector store.

search_results

Search Results

Results of the similarity search as a list of Data objects.

Component code

cassandra_graph.py
from uuid import UUID

from langchain_community.graph_vectorstores import CassandraGraphVectorStore

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.inputs import DictInput, FloatInput
from langflow.io import (
    DropdownInput,
    HandleInput,
    IntInput,
    MessageTextInput,
    SecretStrInput,
)
from langflow.schema import Data


class CassandraGraphVectorStoreComponent(LCVectorStoreComponent):
    display_name = "Cassandra Graph"
    description = "Cassandra Graph Vector Store"
    name = "CassandraGraph"
    icon = "Cassandra"

    inputs = [
        MessageTextInput(
            name="database_ref",
            display_name="Contact Points / Astra Database ID",
            info="Contact points for the database (or AstraDB database ID)",
            required=True,
        ),
        MessageTextInput(
            name="username", display_name="Username", info="Username for the database (leave empty for AstraDB)."
        ),
        SecretStrInput(
            name="token",
            display_name="Password / AstraDB Token",
            info="User password for the database (or AstraDB token).",
            required=True,
        ),
        MessageTextInput(
            name="keyspace",
            display_name="Keyspace",
            info="Table Keyspace (or AstraDB namespace).",
            required=True,
        ),
        MessageTextInput(
            name="table_name",
            display_name="Table Name",
            info="The name of the table (or AstraDB collection) where vectors will be stored.",
            required=True,
        ),
        DropdownInput(
            name="setup_mode",
            display_name="Setup Mode",
            info="Configuration mode for setting up the Cassandra table, with options like 'Sync' or 'Off'.",
            options=["Sync", "Off"],
            value="Sync",
            advanced=True,
        ),
        DictInput(
            name="cluster_kwargs",
            display_name="Cluster arguments",
            info="Optional dictionary of additional keyword arguments for the Cassandra cluster.",
            advanced=True,
            list=True,
        ),
        *LCVectorStoreComponent.inputs,
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
        DropdownInput(
            name="search_type",
            display_name="Search Type",
            info="Search type to use",
            options=[
                "Traversal",
                "MMR traversal",
                "Similarity",
                "Similarity with score threshold",
                "MMR (Max Marginal Relevance)",
            ],
            value="Traversal",
            advanced=True,
        ),
        IntInput(
            name="depth",
            display_name="Depth of traversal",
            info="The maximum depth of edges to traverse. (when using 'Traversal' or 'MMR traversal')",
            value=1,
            advanced=True,
        ),
        FloatInput(
            name="search_score_threshold",
            display_name="Search Score Threshold",
            info="Minimum similarity score threshold for search results. "
            "(when using 'Similarity with score threshold')",
            value=0,
            advanced=True,
        ),
        DictInput(
            name="search_filter",
            display_name="Search Metadata Filter",
            info="Optional dictionary of filters to apply to the search query.",
            advanced=True,
            list=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> CassandraGraphVectorStore:
        try:
            import cassio
            from langchain_community.utilities.cassandra import SetupMode
        except ImportError as e:
            msg = "Could not import cassio integration package. Please install it with `pip install cassio`."
            raise ImportError(msg) from e

        database_ref = self.database_ref

        try:
            UUID(self.database_ref)
            is_astra = True
        except ValueError:
            is_astra = False
            if "," in self.database_ref:
                # use a copy because we can't change the type of the parameter
                database_ref = self.database_ref.split(",")

        if is_astra:
            cassio.init(
                database_id=database_ref,
                token=self.token,
                cluster_kwargs=self.cluster_kwargs,
            )
        else:
            cassio.init(
                contact_points=database_ref,
                username=self.username,
                password=self.token,
                cluster_kwargs=self.cluster_kwargs,
            )
        documents = []

        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        setup_mode = SetupMode.OFF if self.setup_mode == "Off" else SetupMode.SYNC

        if documents:
            self.log(f"Adding {len(documents)} documents to the Vector Store.")
            store = CassandraGraphVectorStore.from_documents(
                documents=documents,
                embedding=self.embedding,
                node_table=self.table_name,
                keyspace=self.keyspace,
            )
        else:
            self.log("No documents to add to the Vector Store.")
            store = CassandraGraphVectorStore(
                embedding=self.embedding,
                node_table=self.table_name,
                keyspace=self.keyspace,
                setup_mode=setup_mode,
            )
        return store

    def _map_search_type(self) -> str:
        if self.search_type == "Similarity":
            return "similarity"
        if self.search_type == "Similarity with score threshold":
            return "similarity_score_threshold"
        if self.search_type == "MMR (Max Marginal Relevance)":
            return "mmr"
        if self.search_type == "MMR Traversal":
            return "mmr_traversal"
        return "traversal"

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        self.log(f"Search input: {self.search_query}")
        self.log(f"Search type: {self.search_type}")
        self.log(f"Number of results: {self.number_of_results}")

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            try:
                search_type = self._map_search_type()
                search_args = self._build_search_args()

                self.log(f"Search args: {search_args}")

                docs = vector_store.search(query=self.search_query, search_type=search_type, **search_args)
            except KeyError as e:
                if "content" in str(e):
                    msg = (
                        "You should ingest data through Langflow (or LangChain) to query it in Langflow. "
                        "Your collection does not contain a field name 'content'."
                    )
                    raise ValueError(msg) from e
                raise

            self.log(f"Retrieved documents: {len(docs)}")

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

    def _build_search_args(self):
        args = {
            "k": self.number_of_results,
            "score_threshold": self.search_score_threshold,
            "depth": self.depth,
        }

        if self.search_filter:
            clean_filter = {k: v for k, v in self.search_filter.items() if k and v}
            if len(clean_filter) > 0:
                args["filter"] = clean_filter
        return args

    def get_retriever_kwargs(self):
        search_args = self._build_search_args()
        return {
            "search_type": self._map_search_type(),
            "search_kwargs": search_args,
        }

Chroma DB

This component creates a Chroma Vector Store with search capabilities. For more information, see the Chroma documentation.

Parameters

Inputs
Name Type Description

collection_name

String

The name of the Chroma collection. Default: "langflow".

persist_directory

String

The directory to persist the Chroma database.

search_query

String

The query to search for in the vector store.

ingest_data

Data

The data to ingest into the vector store (list of Data objects).

embedding

Embeddings

The embedding function to use for the vector store.

chroma_server_cors_allow_origins

String

CORS allow origins for the Chroma server.

chroma_server_host

String

Host for the Chroma server.

chroma_server_http_port

Integer

HTTP port for the Chroma server.

chroma_server_grpc_port

Integer

gRPC port for the Chroma server.

chroma_server_ssl_enabled

Boolean

Enable SSL for the Chroma server.

allow_duplicates

Boolean

Allow duplicate documents in the vector store.

search_type

String

Type of search to perform: "Similarity" or "MMR".

number_of_results

Integer

Number of results to return from the search. Default: 10.

limit

Integer

Limit the number of records to compare when Allow Duplicates is False.

Component code

chroma.py
from copy import deepcopy

from chromadb.config import Settings
from langchain_chroma import Chroma
from typing_extensions import override

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.base.vectorstores.utils import chroma_collection_to_data
from langflow.io import BoolInput, DropdownInput, HandleInput, IntInput, StrInput
from langflow.schema import Data


class ChromaVectorStoreComponent(LCVectorStoreComponent):
    """Chroma Vector Store with search capabilities."""

    display_name: str = "Chroma DB"
    description: str = "Chroma Vector Store with search capabilities"
    name = "Chroma"
    icon = "Chroma"

    inputs = [
        StrInput(
            name="collection_name",
            display_name="Collection Name",
            value="langflow",
        ),
        StrInput(
            name="persist_directory",
            display_name="Persist Directory",
        ),
        *LCVectorStoreComponent.inputs,
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        StrInput(
            name="chroma_server_cors_allow_origins",
            display_name="Server CORS Allow Origins",
            advanced=True,
        ),
        StrInput(
            name="chroma_server_host",
            display_name="Server Host",
            advanced=True,
        ),
        IntInput(
            name="chroma_server_http_port",
            display_name="Server HTTP Port",
            advanced=True,
        ),
        IntInput(
            name="chroma_server_grpc_port",
            display_name="Server gRPC Port",
            advanced=True,
        ),
        BoolInput(
            name="chroma_server_ssl_enabled",
            display_name="Server SSL Enabled",
            advanced=True,
        ),
        BoolInput(
            name="allow_duplicates",
            display_name="Allow Duplicates",
            advanced=True,
            info="If false, will not add documents that are already in the Vector Store.",
        ),
        DropdownInput(
            name="search_type",
            display_name="Search Type",
            options=["Similarity", "MMR"],
            value="Similarity",
            advanced=True,
        ),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            advanced=True,
            value=10,
        ),
        IntInput(
            name="limit",
            display_name="Limit",
            advanced=True,
            info="Limit the number of records to compare when Allow Duplicates is False.",
        ),
    ]

    @override
    @check_cached_vector_store
    def build_vector_store(self) -> Chroma:
        """Builds the Chroma object."""
        try:
            from chromadb import Client
            from langchain_chroma import Chroma
        except ImportError as e:
            msg = "Could not import Chroma integration package. Please install it with `pip install langchain-chroma`."
            raise ImportError(msg) from e
        # Chroma settings
        chroma_settings = None
        client = None
        if self.chroma_server_host:
            chroma_settings = Settings(
                chroma_server_cors_allow_origins=self.chroma_server_cors_allow_origins or [],
                chroma_server_host=self.chroma_server_host,
                chroma_server_http_port=self.chroma_server_http_port or None,
                chroma_server_grpc_port=self.chroma_server_grpc_port or None,
                chroma_server_ssl_enabled=self.chroma_server_ssl_enabled,
            )
            client = Client(settings=chroma_settings)

        # Check persist_directory and expand it if it is a relative path
        persist_directory = self.resolve_path(self.persist_directory) if self.persist_directory is not None else None

        chroma = Chroma(
            persist_directory=persist_directory,
            client=client,
            embedding_function=self.embedding,
            collection_name=self.collection_name,
        )

        self._add_documents_to_vector_store(chroma)
        self.status = chroma_collection_to_data(chroma.get(limit=self.limit))
        return chroma

    def _add_documents_to_vector_store(self, vector_store: "Chroma") -> None:
        """Adds documents to the Vector Store."""
        if not self.ingest_data:
            self.status = ""
            return

        stored_documents_without_id = []
        if self.allow_duplicates:
            stored_data = []
        else:
            stored_data = chroma_collection_to_data(vector_store.get(limit=self.limit))
            for value in deepcopy(stored_data):
                del value.id
                stored_documents_without_id.append(value)

        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                if _input not in stored_documents_without_id:
                    documents.append(_input.to_lc_document())
            else:
                msg = "Vector Store Inputs must be Data objects."
                raise TypeError(msg)

        if documents and self.embedding is not None:
            self.log(f"Adding {len(documents)} documents to the Vector Store.")
            vector_store.add_documents(documents)
        else:
            self.log("No documents to add to the Vector Store.")

Clickhouse

This component implements a Clickhouse Vector Store with search capabilities using the LangChain framework.

Parameters

Inputs
Name Display Name Info

host

hostname

Clickhouse server hostname (required, default: "localhost")

port

port

Clickhouse server port (required, default: 8123)

database

database

Clickhouse database name (required)

table

Table name

Clickhouse table name (required)

username

The ClickHouse user name.

Username for authentication (required)

password

The password for username.

Password for authentication (required)

index_type

index_type

Type of the index (options: "annoy", "vector_similarity", default: "annoy")

metric

metric

Metric to compute distance (options: "angular", "euclidean", "manhattan", "hamming", "dot", default: "angular")

secure

Use https/TLS

Overrides inferred values from the interface or port arguments (default: false)

index_param

Param of the index

Index parameters (default: "'L2Distance',100")

index_query_params

index query params

Additional index query parameters

search_query

Search Query

Query string for similarity search

ingest_data

Ingest Data

Data to be ingested into the vector store

embedding

Embedding

Embedding model to use

number_of_results

Number of Results

Number of results to return in similarity search (default: 4)

score_threshold

Score threshold

Threshold for similarity scores

Outputs
Name Display Name Info

vector_store

Vector Store

Built Clickhouse vector store

search_results

Search Results

Results of the similarity search as a list of Data objects

Component code

clickhouse.py
from langchain_community.vectorstores import Clickhouse, ClickhouseSettings

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.inputs import BoolInput, FloatInput
from langflow.io import (
    DictInput,
    DropdownInput,
    HandleInput,
    IntInput,
    SecretStrInput,
    StrInput,
)
from langflow.schema import Data


class ClickhouseVectorStoreComponent(LCVectorStoreComponent):
    display_name = "Clickhouse"
    description = "Clickhouse Vector Store with search capabilities"
    name = "Clickhouse"
    icon = "Clickhouse"

    inputs = [
        StrInput(name="host", display_name="hostname", required=True, value="localhost"),
        IntInput(name="port", display_name="port", required=True, value=8123),
        StrInput(name="database", display_name="database", required=True),
        StrInput(name="table", display_name="Table name", required=True),
        StrInput(name="username", display_name="The ClickHouse user name.", required=True),
        SecretStrInput(name="password", display_name="The password for username.", required=True),
        DropdownInput(
            name="index_type",
            display_name="index_type",
            options=["annoy", "vector_similarity"],
            info="Type of the index.",
            value="annoy",
            advanced=True,
        ),
        DropdownInput(
            name="metric",
            display_name="metric",
            options=["angular", "euclidean", "manhattan", "hamming", "dot"],
            info="Metric to compute distance.",
            value="angular",
            advanced=True,
        ),
        BoolInput(
            name="secure",
            display_name="Use https/TLS. This overrides inferred values from the interface or port arguments.",
            value=False,
            advanced=True,
        ),
        StrInput(name="index_param", display_name="Param of the index", value="100,'L2Distance'", advanced=True),
        DictInput(name="index_query_params", display_name="index query params", advanced=True),
        *LCVectorStoreComponent.inputs,
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
        FloatInput(name="score_threshold", display_name="Score threshold", advanced=True),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> Clickhouse:
        try:
            import clickhouse_connect
        except ImportError as e:
            msg = (
                "Failed to import Clickhouse dependencies. "
                "Install it using `pip install langflow[clickhouse-connect] --pre`"
            )
            raise ImportError(msg) from e

        try:
            client = clickhouse_connect.get_client(
                host=self.host, port=self.port, username=self.username, password=self.password
            )
            client.command("SELECT 1")
        except Exception as e:
            msg = f"Failed to connect to Clickhouse: {e}"
            raise ValueError(msg) from e

        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        kwargs = {}
        if self.index_param:
            kwargs["index_param"] = self.index_param.split(",")
        if self.index_query_params:
            kwargs["index_query_params"] = self.index_query_params

        settings = ClickhouseSettings(
            table=self.table,
            database=self.database,
            host=self.host,
            index_type=self.index_type,
            metric=self.metric,
            password=self.password,
            port=self.port,
            secure=self.secure,
            username=self.username,
            **kwargs,
        )
        if documents:
            clickhouse_vs = Clickhouse.from_documents(documents=documents, embedding=self.embedding, config=settings)

        else:
            clickhouse_vs = Clickhouse(embedding=self.embedding, config=settings)

        return clickhouse_vs

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            kwargs = {}
            if self.score_threshold:
                kwargs["score_threshold"] = self.score_threshold

            docs = vector_store.similarity_search(query=self.search_query, k=self.number_of_results, **kwargs)

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

Couchbase

This component creates a Couchbase Vector Store with search capabilities. For more information, see the Couchbase documentation.

Parameters

Inputs
Name Type Description

couchbase_connection_string

SecretString

Couchbase Cluster connection string (required).

couchbase_username

String

Couchbase username (required).

couchbase_password

SecretString

Couchbase password (required).

bucket_name

String

Name of the Couchbase bucket (required).

scope_name

String

Name of the Couchbase scope (required).

collection_name

String

Name of the Couchbase collection (required).

index_name

String

Name of the Couchbase index (required).

search_query

String

The query to search for in the vector store.

ingest_data

Data

The data to ingest into the vector store (list of Data objects).

embedding

Embeddings

The embedding function to use for the vector store.

number_of_results

Integer

Number of results to return from the search. Default: 4 (advanced).

Outputs
Name Type Description

vector_store

CouchbaseVectorStore

A Couchbase vector store instance configured with the specified parameters.

Component code

couchbase.py
from datetime import timedelta

from langchain_community.vectorstores import CouchbaseVectorStore

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import HandleInput, IntInput, SecretStrInput, StrInput
from langflow.schema import Data


class CouchbaseVectorStoreComponent(LCVectorStoreComponent):
    display_name = "Couchbase"
    description = "Couchbase Vector Store with search capabilities"
    name = "Couchbase"
    icon = "Couchbase"

    inputs = [
        SecretStrInput(
            name="couchbase_connection_string", display_name="Couchbase Cluster connection string", required=True
        ),
        StrInput(name="couchbase_username", display_name="Couchbase username", required=True),
        SecretStrInput(name="couchbase_password", display_name="Couchbase password", required=True),
        StrInput(name="bucket_name", display_name="Bucket Name", required=True),
        StrInput(name="scope_name", display_name="Scope Name", required=True),
        StrInput(name="collection_name", display_name="Collection Name", required=True),
        StrInput(name="index_name", display_name="Index Name", required=True),
        *LCVectorStoreComponent.inputs,
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> CouchbaseVectorStore:
        try:
            from couchbase.auth import PasswordAuthenticator
            from couchbase.cluster import Cluster
            from couchbase.options import ClusterOptions
        except ImportError as e:
            msg = "Failed to import Couchbase dependencies. Install it using `pip install langflow[couchbase] --pre`"
            raise ImportError(msg) from e

        try:
            auth = PasswordAuthenticator(self.couchbase_username, self.couchbase_password)
            options = ClusterOptions(auth)
            cluster = Cluster(self.couchbase_connection_string, options)

            cluster.wait_until_ready(timedelta(seconds=5))
        except Exception as e:
            msg = f"Failed to connect to Couchbase: {e}"
            raise ValueError(msg) from e

        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        if documents:
            couchbase_vs = CouchbaseVectorStore.from_documents(
                documents=documents,
                cluster=cluster,
                bucket_name=self.bucket_name,
                scope_name=self.scope_name,
                collection_name=self.collection_name,
                embedding=self.embedding,
                index_name=self.index_name,
            )

        else:
            couchbase_vs = CouchbaseVectorStore(
                cluster=cluster,
                bucket_name=self.bucket_name,
                scope_name=self.scope_name,
                collection_name=self.collection_name,
                embedding=self.embedding,
                index_name=self.index_name,
            )

        return couchbase_vs

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

Elasticsearch

This component creates an Elasticsearch Vector Store with search capabilities. For more information, see the Elasticsearch vector store documentation.

Parameters

Inputs
Name Type Description

elasticsearch_url

String

URL for self-managed Elasticsearch deployments, such as http://localhost:9200.

cloud_id

SecretString

Elastic Cloud ID for cloud deployments.

index_name

String

The index name where vectors will be stored in Elasticsearch cluster.

search_input

Multiline

Search query for retrieving documents.

username

String

Elasticsearch username for authentication.

password

SecretString

Elasticsearch password for authentication.

ingest_data

Data

Data to be ingested into the vector store.

embedding

Handle

Embedding model to use.

search_type

Dropdown

Type of search to perform (similarity or mmr).

number_of_results

Integer

Number of results to return.

search_score_threshold

Float

Minimum similarity score threshold for search results.

api_key

SecretString

API Key for Elastic Cloud authentication.

Outputs
Name Type Description

vector_store

VectorStore

An instance of ElasticsearchStore for storing and searching vectors.

Component code

elasticsearch.py
from typing import Any

from langchain.schema import Document
from langchain_elasticsearch import ElasticsearchStore

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.io import (
    DropdownInput,
    FloatInput,
    HandleInput,
    IntInput,
    SecretStrInput,
    StrInput,
)
from langflow.schema import Data


class ElasticsearchVectorStoreComponent(LCVectorStoreComponent):
    """Elasticsearch Vector Store with with advanced, customizable search capabilities."""

    display_name: str = "Elasticsearch"
    description: str = "Elasticsearch Vector Store with with advanced, customizable search capabilities."
    name = "Elasticsearch"
    icon = "ElasticsearchStore"

    inputs = [
        StrInput(
            name="elasticsearch_url",
            display_name="Elasticsearch URL",
            value="http://localhost:9200",
            info="URL for self-managed Elasticsearch deployments (e.g., http://localhost:9200). "
            "Do not use with Elastic Cloud deployments, use Elastic Cloud ID instead.",
        ),
        SecretStrInput(
            name="cloud_id",
            display_name="Elastic Cloud ID",
            value="",
            info="Use this for Elastic Cloud deployments. Do not use together with 'Elasticsearch URL'.",
        ),
        StrInput(
            name="index_name",
            display_name="Index Name",
            value="langflow",
            info="The index name where the vectors will be stored in Elasticsearch cluster.",
        ),
        *LCVectorStoreComponent.inputs,
        StrInput(
            name="username",
            display_name="Username",
            value="",
            advanced=False,
            info=(
                "Elasticsearch username (e.g., 'elastic'). "
                "Required for both local and Elastic Cloud setups unless API keys are used."
            ),
        ),
        SecretStrInput(
            name="password",
            display_name="Password",
            value="",
            advanced=False,
            info=(
                "Elasticsearch password for the specified user. "
                "Required for both local and Elastic Cloud setups unless API keys are used."
            ),
        ),
        HandleInput(
            name="embedding",
            display_name="Embedding",
            input_types=["Embeddings"],
        ),
        DropdownInput(
            name="search_type",
            display_name="Search Type",
            options=["similarity", "mmr"],
            value="similarity",
            advanced=True,
        ),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            advanced=True,
            value=4,
        ),
        FloatInput(
            name="search_score_threshold",
            display_name="Search Score Threshold",
            info="Minimum similarity score threshold for search results.",
            value=0.0,
            advanced=True,
        ),
        SecretStrInput(
            name="api_key",
            display_name="Elastic API Key",
            value="",
            advanced=True,
            info="API Key for Elastic Cloud authentication. If used, 'username' and 'password' are not required.",
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> ElasticsearchStore:
        """Builds the Elasticsearch Vector Store object."""
        if self.cloud_id and self.elasticsearch_url:
            msg = (
                "Both 'cloud_id' and 'elasticsearch_url' provided. "
                "Please use only one based on your deployment (Cloud or Local)."
            )
            raise ValueError(msg)

        es_params = {
            "index_name": self.index_name,
            "embedding": self.embedding,
            "es_user": self.username or None,
            "es_password": self.password or None,
        }

        if self.cloud_id:
            es_params["es_cloud_id"] = self.cloud_id
        else:
            es_params["es_url"] = self.elasticsearch_url

        if self.api_key:
            es_params["api_key"] = self.api_key

        elasticsearch = ElasticsearchStore(**es_params)

        # If documents are provided, add them to the store
        if self.ingest_data:
            documents = self._prepare_documents()
            if documents:
                elasticsearch.add_documents(documents)

        return elasticsearch

    def _prepare_documents(self) -> list[Document]:
        """Prepares documents from the input data to add to the vector store."""
        documents = []
        for data in self.ingest_data:
            if isinstance(data, Data):
                documents.append(data.to_lc_document())
            else:
                error_message = "Vector Store Inputs must be Data objects."
                self.log(error_message)
                raise TypeError(error_message)
        return documents

    def _add_documents_to_vector_store(self, vector_store: "ElasticsearchStore") -> None:
        """Adds documents to the Vector Store."""
        documents = self._prepare_documents()
        if documents and self.embedding:
            self.log(f"Adding {len(documents)} documents to the Vector Store.")
            vector_store.add_documents(documents)
        else:
            self.log("No documents to add to the Vector Store.")

    def search(self, query: str | None = None) -> list[dict[str, Any]]:
        """Search for similar documents in the vector store or retrieve all documents if no query is provided."""
        vector_store = self.build_vector_store()
        search_kwargs = {
            "k": self.number_of_results,
            "score_threshold": self.search_score_threshold,
        }

        if query:
            search_type = self.search_type.lower()
            if search_type not in {"similarity", "mmr"}:
                msg = f"Invalid search type: {self.search_type}"
                self.log(msg)
                raise ValueError(msg)
            try:
                if search_type == "similarity":
                    results = vector_store.similarity_search_with_score(query, **search_kwargs)
                elif search_type == "mmr":
                    results = vector_store.max_marginal_relevance_search(query, **search_kwargs)
            except Exception as e:
                msg = (
                    "Error occurred while querying the Elasticsearch VectorStore,"
                    " there is no Data into the VectorStore."
                )
                self.log(msg)
                raise ValueError(msg) from e
            return [
                {"page_content": doc.page_content, "metadata": doc.metadata, "score": score} for doc, score in results
            ]
        results = self.get_all_documents(vector_store, **search_kwargs)
        return [{"page_content": doc.page_content, "metadata": doc.metadata, "score": score} for doc, score in results]

    def get_all_documents(self, vector_store: ElasticsearchStore, **kwargs) -> list[tuple[Document, float]]:
        """Retrieve all documents from the vector store."""
        client = vector_store.client
        index_name = self.index_name

        query = {
            "query": {"match_all": {}},
            "size": kwargs.get("k", self.number_of_results),
        }

        response = client.search(index=index_name, body=query)

        results = []
        for hit in response["hits"]["hits"]:
            doc = Document(
                page_content=hit["_source"].get("text", ""),
                metadata=hit["_source"].get("metadata", {}),
            )
            score = hit["_score"]
            results.append((doc, score))

        return results

    def search_documents(self) -> list[Data]:
        """Search for documents in the vector store based on the search input.

        If no search input is provided, retrieve all documents.
        """
        results = self.search(self.search_query)
        retrieved_data = [
            Data(
                text=result["page_content"],
                file_path=result["metadata"].get("file_path", ""),
            )
            for result in results
        ]
        self.status = retrieved_data
        return retrieved_data

    def get_retriever_kwargs(self):
        """Get the keyword arguments for the retriever."""
        return {
            "search_type": self.search_type.lower(),
            "search_kwargs": {
                "k": self.number_of_results,
                "score_threshold": self.search_score_threshold,
            },
        }

FAISS

This component creates a FAISS Vector Store with search capabilities.

For more information, see the FAISS documentation.

Parameters

Inputs
Name Type Description

index_name

String

The name of the FAISS index. Default: "langflow_index".

persist_directory

String

Path to save the FAISS index. It will be relative to where Langflow is running.

search_query

String

The query to search for in the vector store.

ingest_data

Data

The data to ingest into the vector store (list of Data objects or documents).

allow_dangerous_deserialization

Boolean

Set to True to allow loading pickle files from untrusted sources. Default: True (advanced).

embedding

Embeddings

The embedding function to use for the vector store.

number_of_results

Integer

Number of results to return from the search. Default: 4 (advanced).

Outputs
Name Type Description

vector_store

FAISS

A FAISS vector store instance configured with the specified parameters.

Component code

faiss.py
from pathlib import Path

from langchain_community.vectorstores import FAISS

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import BoolInput, HandleInput, IntInput, StrInput
from langflow.schema import Data


class FaissVectorStoreComponent(LCVectorStoreComponent):
    """FAISS Vector Store with search capabilities."""

    display_name: str = "FAISS"
    description: str = "FAISS Vector Store with search capabilities"
    name = "FAISS"
    icon = "FAISS"

    inputs = [
        StrInput(
            name="index_name",
            display_name="Index Name",
            value="langflow_index",
        ),
        StrInput(
            name="persist_directory",
            display_name="Persist Directory",
            info="Path to save the FAISS index. It will be relative to where Langflow is running.",
        ),
        *LCVectorStoreComponent.inputs,
        BoolInput(
            name="allow_dangerous_deserialization",
            display_name="Allow Dangerous Deserialization",
            info="Set to True to allow loading pickle files from untrusted sources. "
            "Only enable this if you trust the source of the data.",
            advanced=True,
            value=True,
        ),
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            advanced=True,
            value=4,
        ),
    ]

    @staticmethod
    def resolve_path(path: str) -> str:
        """Resolve the path relative to the Langflow root.

        Args:
            path: The path to resolve
        Returns:
            str: The resolved path as a string
        """
        return str(Path(path).resolve())

    def get_persist_directory(self) -> Path:
        """Returns the resolved persist directory path or the current directory if not set."""
        if self.persist_directory:
            return Path(self.resolve_path(self.persist_directory))
        return Path()

    @check_cached_vector_store
    def build_vector_store(self) -> FAISS:
        """Builds the FAISS object."""
        path = self.get_persist_directory()
        path.mkdir(parents=True, exist_ok=True)

        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        faiss = FAISS.from_documents(documents=documents, embedding=self.embedding)
        faiss.save_local(str(path), self.index_name)
        return faiss

    def search_documents(self) -> list[Data]:
        """Search for documents in the FAISS vector store."""
        path = self.get_persist_directory()
        index_path = path / f"{self.index_name}.faiss"

        if not index_path.exists():
            vector_store = self.build_vector_store()
        else:
            vector_store = FAISS.load_local(
                folder_path=str(path),
                embeddings=self.embedding,
                index_name=self.index_name,
                allow_dangerous_deserialization=self.allow_dangerous_deserialization,
            )

        if not vector_store:
            msg = "Failed to load the FAISS index."
            raise ValueError(msg)

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )
            return docs_to_data(docs)
        return []

Hyper-Converged Database (HCD) Vector Store

This component implements a Vector Store using Hyper-Converged Database (HCD).

Parameters

Inputs
Name Display Name Info

collection_name

Collection Name

The name of the collection within HCD where the vectors will be stored (required)

username

HCD Username

Authentication username for accessing HCD (default: "hcd-superuser", required)

password

HCD Password

Authentication password for accessing HCD (required)

api_endpoint

HCD API Endpoint

API endpoint URL for the HCD service (required)

search_input

Search Input

Query string for similarity search

ingest_data

Ingest Data

Data to be ingested into the vector store

namespace

Namespace

Optional namespace within HCD to use for the collection (default: "default_namespace")

ca_certificate

CA Certificate

Optional CA certificate for TLS connections to HCD

metric

Metric

Optional distance metric for vector comparisons (options: "cosine", "dot_product", "euclidean")

batch_size

Batch Size

Optional number of data to process in a single batch

bulk_insert_batch_concurrency

Bulk Insert Batch Concurrency

Optional concurrency level for bulk insert operations

bulk_insert_overwrite_concurrency

Bulk Insert Overwrite Concurrency

Optional concurrency level for bulk insert operations that overwrite existing data

bulk_delete_concurrency

Bulk Delete Concurrency

Optional concurrency level for bulk delete operations

setup_mode

Setup Mode

Configuration mode for setting up the vector store (options: "Sync", "Async", "Off", default: "Sync")

pre_delete_collection

Pre Delete Collection

Boolean flag to determine whether to delete the collection before creating a new one

metadata_indexing_include

Metadata Indexing Include

Optional list of metadata fields to include in the indexing

embedding

Embedding or Astra Vectorize

Allows either an embedding model or an Astra Vectorize configuration

metadata_indexing_exclude

Metadata Indexing Exclude

Optional list of metadata fields to exclude from the indexing

collection_indexing_policy

Collection Indexing Policy

Optional dictionary defining the indexing policy for the collection

number_of_results

Number of Results

Number of results to return in similarity search (default: 4)

search_type

Search Type

Search type to use (options: "Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)", default: "Similarity")

search_score_threshold

Search Score Threshold

Minimum similarity score threshold for search results (default: 0)

search_filter

Search Metadata Filter

Optional dictionary of filters to apply to the search query

Outputs
Name Display Name Info

vector_store

Vector Store

Built HCD vector store instance

search_results

Search Results

Results of similarity search as a list of Data objects

Component code

hcd.py
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers import docs_to_data
from langflow.inputs import DictInput, FloatInput
from langflow.io import (
    BoolInput,
    DropdownInput,
    HandleInput,
    IntInput,
    MultilineInput,
    SecretStrInput,
    StrInput,
)
from langflow.schema import Data


class HCDVectorStoreComponent(LCVectorStoreComponent):
    display_name: str = "Hyper-Converged Database"
    description: str = "Implementation of Vector Store using Hyper-Converged Database (HCD) with search capabilities"
    name = "HCD"
    icon: str = "HCD"

    inputs = [
        StrInput(
            name="collection_name",
            display_name="Collection Name",
            info="The name of the collection within HCD where the vectors will be stored.",
            required=True,
        ),
        StrInput(
            name="username",
            display_name="HCD Username",
            info="Authentication username for accessing HCD.",
            value="hcd-superuser",
            required=True,
        ),
        SecretStrInput(
            name="password",
            display_name="HCD Password",
            info="Authentication password for accessing HCD.",
            value="HCD_PASSWORD",
            required=True,
        ),
        SecretStrInput(
            name="api_endpoint",
            display_name="HCD API Endpoint",
            info="API endpoint URL for the HCD service.",
            value="HCD_API_ENDPOINT",
            required=True,
        ),
        *LCVectorStoreComponent.inputs,
        StrInput(
            name="namespace",
            display_name="Namespace",
            info="Optional namespace within HCD to use for the collection.",
            value="default_namespace",
            advanced=True,
        ),
        MultilineInput(
            name="ca_certificate",
            display_name="CA Certificate",
            info="Optional CA certificate for TLS connections to HCD.",
            advanced=True,
        ),
        DropdownInput(
            name="metric",
            display_name="Metric",
            info="Optional distance metric for vector comparisons in the vector store.",
            options=["cosine", "dot_product", "euclidean"],
            advanced=True,
        ),
        IntInput(
            name="batch_size",
            display_name="Batch Size",
            info="Optional number of data to process in a single batch.",
            advanced=True,
        ),
        IntInput(
            name="bulk_insert_batch_concurrency",
            display_name="Bulk Insert Batch Concurrency",
            info="Optional concurrency level for bulk insert operations.",
            advanced=True,
        ),
        IntInput(
            name="bulk_insert_overwrite_concurrency",
            display_name="Bulk Insert Overwrite Concurrency",
            info="Optional concurrency level for bulk insert operations that overwrite existing data.",
            advanced=True,
        ),
        IntInput(
            name="bulk_delete_concurrency",
            display_name="Bulk Delete Concurrency",
            info="Optional concurrency level for bulk delete operations.",
            advanced=True,
        ),
        DropdownInput(
            name="setup_mode",
            display_name="Setup Mode",
            info="Configuration mode for setting up the vector store, with options like 'Sync', 'Async', or 'Off'.",
            options=["Sync", "Async", "Off"],
            advanced=True,
            value="Sync",
        ),
        BoolInput(
            name="pre_delete_collection",
            display_name="Pre Delete Collection",
            info="Boolean flag to determine whether to delete the collection before creating a new one.",
            advanced=True,
        ),
        StrInput(
            name="metadata_indexing_include",
            display_name="Metadata Indexing Include",
            info="Optional list of metadata fields to include in the indexing.",
            advanced=True,
        ),
        HandleInput(
            name="embedding",
            display_name="Embedding or Astra Vectorize",
            input_types=["Embeddings", "dict"],
            # TODO: This should be optional, but need to refactor langchain-astradb first.
            info="Allows either an embedding model or an Astra Vectorize configuration.",
        ),
        StrInput(
            name="metadata_indexing_exclude",
            display_name="Metadata Indexing Exclude",
            info="Optional list of metadata fields to exclude from the indexing.",
            advanced=True,
        ),
        StrInput(
            name="collection_indexing_policy",
            display_name="Collection Indexing Policy",
            info="Optional dictionary defining the indexing policy for the collection.",
            advanced=True,
        ),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            advanced=True,
            value=4,
        ),
        DropdownInput(
            name="search_type",
            display_name="Search Type",
            info="Search type to use",
            options=["Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)"],
            value="Similarity",
            advanced=True,
        ),
        FloatInput(
            name="search_score_threshold",
            display_name="Search Score Threshold",
            info="Minimum similarity score threshold for search results. "
            "(when using 'Similarity with score threshold')",
            value=0,
            advanced=True,
        ),
        DictInput(
            name="search_filter",
            display_name="Search Metadata Filter",
            info="Optional dictionary of filters to apply to the search query.",
            advanced=True,
            is_list=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self):
        try:
            from langchain_astradb import AstraDBVectorStore
            from langchain_astradb.utils.astradb import SetupMode
        except ImportError as e:
            msg = (
                "Could not import langchain Astra DB integration package. "
                "Please install it with `pip install langchain-astradb`."
            )
            raise ImportError(msg) from e

        try:
            from astrapy.authentication import UsernamePasswordTokenProvider
            from astrapy.constants import Environment
        except ImportError as e:
            msg = "Could not import astrapy integration package. Please install it with `pip install astrapy`."
            raise ImportError(msg) from e

        try:
            if not self.setup_mode:
                self.setup_mode = self._inputs["setup_mode"].options[0]

            setup_mode_value = SetupMode[self.setup_mode.upper()]
        except KeyError as e:
            msg = f"Invalid setup mode: {self.setup_mode}"
            raise ValueError(msg) from e

        if not isinstance(self.embedding, dict):
            embedding_dict = {"embedding": self.embedding}
        else:
            from astrapy.info import CollectionVectorServiceOptions

            dict_options = self.embedding.get("collection_vector_service_options", {})
            dict_options["authentication"] = {
                k: v for k, v in dict_options.get("authentication", {}).items() if k and v
            }
            dict_options["parameters"] = {k: v for k, v in dict_options.get("parameters", {}).items() if k and v}
            embedding_dict = {
                "collection_vector_service_options": CollectionVectorServiceOptions.from_dict(dict_options)
            }
            collection_embedding_api_key = self.embedding.get("collection_embedding_api_key")
            if collection_embedding_api_key:
                embedding_dict["collection_embedding_api_key"] = collection_embedding_api_key

        token_provider = UsernamePasswordTokenProvider(self.username, self.password)
        vector_store_kwargs = {
            **embedding_dict,
            "collection_name": self.collection_name,
            "token": token_provider,
            "api_endpoint": self.api_endpoint,
            "namespace": self.namespace,
            "metric": self.metric or None,
            "batch_size": self.batch_size or None,
            "bulk_insert_batch_concurrency": self.bulk_insert_batch_concurrency or None,
            "bulk_insert_overwrite_concurrency": self.bulk_insert_overwrite_concurrency or None,
            "bulk_delete_concurrency": self.bulk_delete_concurrency or None,
            "setup_mode": setup_mode_value,
            "pre_delete_collection": self.pre_delete_collection or False,
            "environment": Environment.HCD,
        }

        if self.metadata_indexing_include:
            vector_store_kwargs["metadata_indexing_include"] = self.metadata_indexing_include
        elif self.metadata_indexing_exclude:
            vector_store_kwargs["metadata_indexing_exclude"] = self.metadata_indexing_exclude
        elif self.collection_indexing_policy:
            vector_store_kwargs["collection_indexing_policy"] = self.collection_indexing_policy

        try:
            vector_store = AstraDBVectorStore(**vector_store_kwargs)
        except Exception as e:
            msg = f"Error initializing AstraDBVectorStore: {e}"
            raise ValueError(msg) from e

        self._add_documents_to_vector_store(vector_store)
        return vector_store

    def _add_documents_to_vector_store(self, vector_store) -> None:
        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                msg = "Vector Store Inputs must be Data objects."
                raise TypeError(msg)

        if documents:
            self.log(f"Adding {len(documents)} documents to the Vector Store.")
            try:
                vector_store.add_documents(documents)
            except Exception as e:
                msg = f"Error adding documents to AstraDBVectorStore: {e}"
                raise ValueError(msg) from e
        else:
            self.log("No documents to add to the Vector Store.")

    def _map_search_type(self) -> str:
        if self.search_type == "Similarity with score threshold":
            return "similarity_score_threshold"
        if self.search_type == "MMR (Max Marginal Relevance)":
            return "mmr"
        return "similarity"

    def _build_search_args(self):
        args = {
            "k": self.number_of_results,
            "score_threshold": self.search_score_threshold,
        }

        if self.search_filter:
            clean_filter = {k: v for k, v in self.search_filter.items() if k and v}
            if len(clean_filter) > 0:
                args["filter"] = clean_filter
        return args

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        self.log(f"Search query: {self.search_query}")
        self.log(f"Search type: {self.search_type}")
        self.log(f"Number of results: {self.number_of_results}")

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            try:
                search_type = self._map_search_type()
                search_args = self._build_search_args()

                docs = vector_store.search(query=self.search_query, search_type=search_type, **search_args)
            except Exception as e:
                msg = f"Error performing search in AstraDBVectorStore: {e}"
                raise ValueError(msg) from e

            self.log(f"Retrieved documents: {len(docs)}")

            data = docs_to_data(docs)
            self.log(f"Converted documents to data: {len(data)}")
            self.status = data
            return data
        self.log("No search input provided. Skipping search.")
        return []

    def get_retriever_kwargs(self):
        search_args = self._build_search_args()
        return {
            "search_type": self._map_search_type(),
            "search_kwargs": search_args,
        }

Milvus

This component creates a Milvus Vector Store with search capabilities.

For more information, see the Milvus documentation.

Parameters

Inputs
Name Type Description

collection_name

String

Name of the Milvus collection

collection_description

String

Description of the Milvus collection

uri

String

Connection URI for Milvus

password

SecretString

Connection password (if required)

connection_args

Dict

Additional connection arguments

primary_field

String

Name of the primary field

text_field

String

Name of the text field

vector_field

String

Name of the vector field

consistency_level

String

Consistency level for operations

index_params

Dict

Parameters for indexing

search_params

Dict

Parameters for searching

drop_old

Boolean

Whether to drop old collection

timeout

Float

Timeout for operations

search_query

String

Query for similarity search

ingest_data

Data

Data to be ingested into the vector store

embedding

Embeddings

Embedding function to use

number_of_results

Integer

Number of results to return in search

Outputs
Name Type Description

vector_store

Milvus

Milvus vector store instance

search_results

List[Data]

Results of similarity search

Component code

milvus.py
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import (
    BoolInput,
    DictInput,
    DropdownInput,
    FloatInput,
    HandleInput,
    IntInput,
    SecretStrInput,
    StrInput,
)
from langflow.schema import Data


class MilvusVectorStoreComponent(LCVectorStoreComponent):
    """Milvus vector store with search capabilities."""

    display_name: str = "Milvus"
    description: str = "Milvus vector store with search capabilities"
    name = "Milvus"
    icon = "Milvus"

    inputs = [
        StrInput(name="collection_name", display_name="Collection Name", value="langflow"),
        StrInput(name="collection_description", display_name="Collection Description", value=""),
        StrInput(
            name="uri",
            display_name="Connection URI",
            value="http://localhost:19530",
        ),
        SecretStrInput(
            name="password",
            display_name="Token",
            value="",
            info="Ignore this field if no token is required to make connection.",
        ),
        DictInput(name="connection_args", display_name="Other Connection Arguments", advanced=True),
        StrInput(name="primary_field", display_name="Primary Field Name", value="pk"),
        StrInput(name="text_field", display_name="Text Field Name", value="text"),
        StrInput(name="vector_field", display_name="Vector Field Name", value="vector"),
        DropdownInput(
            name="consistency_level",
            display_name="Consistencey Level",
            options=["Bounded", "Session", "Strong", "Eventual"],
            value="Session",
            advanced=True,
        ),
        DictInput(name="index_params", display_name="Index Parameters", advanced=True),
        DictInput(name="search_params", display_name="Search Parameters", advanced=True),
        BoolInput(name="drop_old", display_name="Drop Old Collection", value=False, advanced=True),
        FloatInput(name="timeout", display_name="Timeout", advanced=True),
        *LCVectorStoreComponent.inputs,
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self):
        try:
            from langchain_milvus.vectorstores import Milvus as LangchainMilvus
        except ImportError as e:
            msg = "Could not import Milvus integration package. Please install it with `pip install langchain-milvus`."
            raise ImportError(msg) from e
        self.connection_args.update(uri=self.uri, token=self.password)
        milvus_store = LangchainMilvus(
            embedding_function=self.embedding,
            collection_name=self.collection_name,
            collection_description=self.collection_description,
            connection_args=self.connection_args,
            consistency_level=self.consistency_level,
            index_params=self.index_params,
            search_params=self.search_params,
            drop_old=self.drop_old,
            auto_id=True,
            primary_field=self.primary_field,
            text_field=self.text_field,
            vector_field=self.vector_field,
            timeout=self.timeout,
        )

        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        if documents:
            milvus_store.add_documents(documents)

        return milvus_store

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

MongoDB Atlas

This component creates a MongoDB Atlas Vector Store with search capabilities. For more information, see the MongoDB Atlas documentation.

Parameters

Inputs
Name Type Description

mongodb_atlas_cluster_uri

SecretString

MongoDB Atlas Cluster URI

db_name

String

Database name

collection_name

String

Collection name

index_name

String

Index name

search_query

String

Query for similarity search

ingest_data

Data

Data to be ingested into the vector store

embedding

Embeddings

Embedding function to use

number_of_results

Integer

Number of results to return in search

Outputs
Name Type Description

vector_store

MongoDBAtlasVectorSearch

MongoDB Atlas vector store instance

search_results

List[Data]

Results of similarity search

Component code

mongodb_atlas.py
import tempfile

import certifi
from langchain_community.vectorstores import MongoDBAtlasVectorSearch

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import BoolInput, HandleInput, IntInput, SecretStrInput, StrInput
from langflow.schema import Data


class MongoVectorStoreComponent(LCVectorStoreComponent):
    display_name = "MongoDB Atlas"
    description = "MongoDB Atlas Vector Store with search capabilities"
    name = "MongoDBAtlasVector"
    icon = "MongoDB"

    inputs = [
        SecretStrInput(name="mongodb_atlas_cluster_uri", display_name="MongoDB Atlas Cluster URI", required=True),
        BoolInput(name="enable_mtls", display_name="Enable mTLS", value=False, advanced=True, required=True),
        SecretStrInput(
            name="mongodb_atlas_client_cert",
            display_name="MongoDB Atlas Combined Client Certificate",
            required=False,
            info="Client Certificate combined with the private key in the following format:\n "
            "-----BEGIN PRIVATE KEY-----\n...\n -----END PRIVATE KEY-----\n-----BEGIN CERTIFICATE-----\n"
            "...\n-----END CERTIFICATE-----\n",
        ),
        StrInput(name="db_name", display_name="Database Name", required=True),
        StrInput(name="collection_name", display_name="Collection Name", required=True),
        StrInput(name="index_name", display_name="Index Name", required=True),
        *LCVectorStoreComponent.inputs,
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> MongoDBAtlasVectorSearch:
        try:
            from pymongo import MongoClient
        except ImportError as e:
            msg = "Please install pymongo to use MongoDB Atlas Vector Store"
            raise ImportError(msg) from e

        # Create temporary files for the client certificate
        if self.enable_mtls:
            client_cert_path = None
            try:
                client_cert = self.mongodb_atlas_client_cert.replace(" ", "\n")
                client_cert = client_cert.replace("-----BEGIN\nPRIVATE\nKEY-----", "-----BEGIN PRIVATE KEY-----")
                client_cert = client_cert.replace(
                    "-----END\nPRIVATE\nKEY-----\n-----BEGIN\nCERTIFICATE-----",
                    "-----END PRIVATE KEY-----\n-----BEGIN CERTIFICATE-----",
                )
                client_cert = client_cert.replace("-----END\nCERTIFICATE-----", "-----END CERTIFICATE-----")
                with tempfile.NamedTemporaryFile(delete=False) as client_cert_file:
                    client_cert_file.write(client_cert.encode("utf-8"))
                    client_cert_path = client_cert_file.name

            except Exception as e:
                msg = f"Failed to write certificate to temporary file: {e}"
                raise ValueError(msg) from e

        try:
            mongo_client: MongoClient = (
                MongoClient(
                    self.mongodb_atlas_cluster_uri,
                    tls=True,
                    tlsCertificateKeyFile=client_cert_path,
                    tlsCAFile=certifi.where(),
                )
                if self.enable_mtls
                else MongoClient(self.mongodb_atlas_cluster_uri)
            )

            collection = mongo_client[self.db_name][self.collection_name]
            collection.drop()  # Drop collection to override the vector store
        except Exception as e:
            msg = f"Failed to connect to MongoDB Atlas: {e}"
            raise ValueError(msg) from e

        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        if documents:
            return MongoDBAtlasVectorSearch.from_documents(
                documents=documents, embedding=self.embedding, collection=collection, index_name=self.index_name
            )
        return MongoDBAtlasVectorSearch(
            embedding=self.embedding,
            collection=collection,
            index_name=self.index_name,
        )

    def search_documents(self) -> list[Data]:
        from bson.objectid import ObjectId

        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str):
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )
            for doc in docs:
                doc.metadata = {
                    key: str(value) if isinstance(value, ObjectId) else value for key, value in doc.metadata.items()
                }

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

Opensearch

This component creates an OpenSearch Vector Store with search capabilities. For more information, see the Opensearch vector store documentation.

Parameters

Inputs
Name Type Description

opensearch_url

String

URL for OpenSearch cluster, for example https://192.168.1.1:9200.

index_name

String

The index name where the vectors will be stored in OpenSearch cluster.

search_input

Multiline

Enter a search query. Leave empty to retrieve all documents.

ingest_data

Data

Data to be ingested into the vector store.

embedding

Handle

Embedding model to use.

search_type

Dropdown

Search type to use (similarity, similarity_score_threshold, or mmr).

number_of_results

Integer

Number of results to return.

search_score_threshold

Float

Minimum similarity score threshold for search results.

username

String

Username for OpenSearch authentication.

password

SecretString

Password for OpenSearch authentication.

use_ssl

Boolean

Whether to use SSL for connection.

verify_certs

Boolean

Whether to verify SSL certificates.

hybrid_search_query

Multiline

Custom hybrid search query in JSON format.

Outputs
Name Type Description

vector_store

VectorStore

An instance of OpenSearchVectorSearch for storing and searching vectors.

Component code

opensearch.py
import json
from typing import Any

from langchain_community.vectorstores import OpenSearchVectorSearch

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.io import (
    BoolInput,
    DropdownInput,
    FloatInput,
    HandleInput,
    IntInput,
    MultilineInput,
    SecretStrInput,
    StrInput,
)
from langflow.schema import Data


class OpenSearchVectorStoreComponent(LCVectorStoreComponent):
    """OpenSearch Vector Store with advanced, customizable search capabilities."""

    display_name: str = "OpenSearch"
    description: str = "OpenSearch Vector Store with advanced, customizable search capabilities."
    name = "OpenSearch"
    icon = "OpenSearch"

    inputs = [
        StrInput(
            name="opensearch_url",
            display_name="OpenSearch URL",
            value="http://localhost:9200",
            info="URL for OpenSearch cluster (e.g. https://192.168.1.1:9200).",
        ),
        StrInput(
            name="index_name",
            display_name="Index Name",
            value="langflow",
            info="The index name where the vectors will be stored in OpenSearch cluster.",
        ),
        *LCVectorStoreComponent.inputs,
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        DropdownInput(
            name="search_type",
            display_name="Search Type",
            options=["similarity", "similarity_score_threshold", "mmr"],
            value="similarity",
            advanced=True,
        ),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            advanced=True,
            value=4,
        ),
        FloatInput(
            name="search_score_threshold",
            display_name="Search Score Threshold",
            info="Minimum similarity score threshold for search results.",
            value=0.0,
            advanced=True,
        ),
        StrInput(
            name="username",
            display_name="Username",
            value="admin",
            advanced=True,
        ),
        SecretStrInput(
            name="password",
            display_name="Password",
            value="admin",
            advanced=True,
        ),
        BoolInput(
            name="use_ssl",
            display_name="Use SSL",
            value=True,
            advanced=True,
        ),
        BoolInput(
            name="verify_certs",
            display_name="Verify Certificates",
            value=False,
            advanced=True,
        ),
        MultilineInput(
            name="hybrid_search_query",
            display_name="Hybrid Search Query",
            value="",
            advanced=True,
            info=(
                "Provide a custom hybrid search query in JSON format. This allows you to combine "
                "vector similarity and keyword matching."
            ),
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> OpenSearchVectorSearch:
        """Builds the OpenSearch Vector Store object."""
        try:
            from langchain_community.vectorstores import OpenSearchVectorSearch
        except ImportError as e:
            error_message = f"Failed to import required modules: {e}"
            self.log(error_message)
            raise ImportError(error_message) from e

        try:
            opensearch = OpenSearchVectorSearch(
                index_name=self.index_name,
                embedding_function=self.embedding,
                opensearch_url=self.opensearch_url,
                http_auth=(self.username, self.password),
                use_ssl=self.use_ssl,
                verify_certs=self.verify_certs,
                ssl_assert_hostname=False,
                ssl_show_warn=False,
            )
        except Exception as e:
            error_message = f"Failed to create OpenSearchVectorSearch instance: {e}"
            self.log(error_message)
            raise RuntimeError(error_message) from e

        if self.ingest_data:
            self._add_documents_to_vector_store(opensearch)

        return opensearch

    def _add_documents_to_vector_store(self, vector_store: "OpenSearchVectorSearch") -> None:
        """Adds documents to the Vector Store."""
        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                error_message = f"Expected Data object, got {type(_input)}"
                self.log(error_message)
                raise TypeError(error_message)

        if documents and self.embedding is not None:
            self.log(f"Adding {len(documents)} documents to the Vector Store.")
            try:
                vector_store.add_documents(documents)
            except Exception as e:
                error_message = f"Error adding documents to Vector Store: {e}"
                self.log(error_message)
                raise RuntimeError(error_message) from e
        else:
            self.log("No documents to add to the Vector Store.")

    def search(self, query: str | None = None) -> list[dict[str, Any]]:
        """Search for similar documents in the vector store or retrieve all documents if no query is provided."""
        try:
            vector_store = self.build_vector_store()

            query = query or ""

            if self.hybrid_search_query.strip():
                try:
                    hybrid_query = json.loads(self.hybrid_search_query)
                except json.JSONDecodeError as e:
                    error_message = f"Invalid hybrid search query JSON: {e}"
                    self.log(error_message)
                    raise ValueError(error_message) from e

                results = vector_store.client.search(index=self.index_name, body=hybrid_query)

                processed_results = []
                for hit in results.get("hits", {}).get("hits", []):
                    source = hit.get("_source", {})
                    text = source.get("text", "")
                    metadata = source.get("metadata", {})

                    if isinstance(text, dict):
                        text = text.get("text", "")

                    processed_results.append(
                        {
                            "page_content": text,
                            "metadata": metadata,
                        }
                    )
                return processed_results

            search_kwargs = {"k": self.number_of_results}
            search_type = self.search_type.lower()

            if search_type == "similarity":
                results = vector_store.similarity_search(query, **search_kwargs)
                return [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in results]
            if search_type == "similarity_score_threshold":
                search_kwargs["score_threshold"] = self.search_score_threshold
                results = vector_store.similarity_search_with_relevance_scores(query, **search_kwargs)
                return [
                    {
                        "page_content": doc.page_content,
                        "metadata": doc.metadata,
                        "score": score,
                    }
                    for doc, score in results
                ]
            if search_type == "mmr":
                results = vector_store.max_marginal_relevance_search(query, **search_kwargs)
                return [{"page_content": doc.page_content, "metadata": doc.metadata} for doc in results]

        except Exception as e:
            error_message = f"Error during search: {e}"
            self.log(error_message)
            raise RuntimeError(error_message) from e

        error_message = f"Error during search. Invalid search type: {self.search_type}"
        self.log(error_message)
        raise ValueError(error_message)

    def search_documents(self) -> list[Data]:
        """Search for documents in the vector store based on the search input.

        If no search input is provided, retrieve all documents.
        """
        try:
            query = self.search_query.strip() if self.search_query else None
            results = self.search(query)
            retrieved_data = [
                Data(
                    file_path=result["metadata"].get("file_path", ""),
                    text=result["page_content"],
                )
                for result in results
            ]
        except Exception as e:
            error_message = f"Error during document search: {e}"
            self.log(error_message)
            raise RuntimeError(error_message) from e

        self.status = retrieved_data
        return retrieved_data

PGVector

This component creates a PGVector Vector Store with search capabilities. For more information, see the PGVector documentation.

Parameters

Inputs
Name Type Description

pg_server_url

SecretString

PostgreSQL server connection string

collection_name

String

Table name for the vector store

search_query

String

Query for similarity search

ingest_data

Data

Data to be ingested into the vector store

embedding

Embeddings

Embedding function to use

number_of_results

Integer

Number of results to return in search

Outputs
Name Type Description

vector_store

PGVector

PGVector vector store instance

search_results

List[Data]

Results of similarity search

Component code

pgvector.py
from langchain_community.vectorstores import PGVector

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import HandleInput, IntInput, SecretStrInput, StrInput
from langflow.schema import Data
from langflow.utils.connection_string_parser import transform_connection_string


class PGVectorStoreComponent(LCVectorStoreComponent):
    display_name = "PGVector"
    description = "PGVector Vector Store with search capabilities"
    name = "pgvector"
    icon = "cpu"

    inputs = [
        SecretStrInput(name="pg_server_url", display_name="PostgreSQL Server Connection String", required=True),
        StrInput(name="collection_name", display_name="Table", required=True),
        *LCVectorStoreComponent.inputs,
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"], required=True),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> PGVector:
        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        connection_string_parsed = transform_connection_string(self.pg_server_url)

        if documents:
            pgvector = PGVector.from_documents(
                embedding=self.embedding,
                documents=documents,
                collection_name=self.collection_name,
                connection_string=connection_string_parsed,
            )
        else:
            pgvector = PGVector.from_existing_index(
                embedding=self.embedding,
                collection_name=self.collection_name,
                connection_string=connection_string_parsed,
            )

        return pgvector

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

Pinecone

This component creates a Pinecone Vector Store with search capabilities.

For more information, see the Pinecone documentation.

Parameters

Inputs
Name Type Description

index_name

String

Name of the Pinecone index

namespace

String

Namespace for the index

distance_strategy

String

Strategy for calculating distance between vectors

pinecone_api_key

SecretString

API key for Pinecone

text_key

String

Key in the record to use as text

search_query

String

Query for similarity search

ingest_data

Data

Data to be ingested into the vector store

embedding

Embeddings

Embedding function to use

number_of_results

Integer

Number of results to return in search

Outputs
Name Type Description

vector_store

Pinecone

Pinecone vector store instance

search_results

List[Data]

Results of similarity search

Component code

pinecone.py
import numpy as np
from langchain_core.vectorstores import VectorStore

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import DropdownInput, HandleInput, IntInput, SecretStrInput, StrInput
from langflow.schema import Data


class PineconeVectorStoreComponent(LCVectorStoreComponent):
    display_name = "Pinecone"
    description = "Pinecone Vector Store with search capabilities"
    name = "Pinecone"
    icon = "Pinecone"
    inputs = [
        StrInput(name="index_name", display_name="Index Name", required=True),
        StrInput(name="namespace", display_name="Namespace", info="Namespace for the index."),
        DropdownInput(
            name="distance_strategy",
            display_name="Distance Strategy",
            options=["Cosine", "Euclidean", "Dot Product"],
            value="Cosine",
            advanced=True,
        ),
        SecretStrInput(name="pinecone_api_key", display_name="Pinecone API Key", required=True),
        StrInput(
            name="text_key",
            display_name="Text Key",
            info="Key in the record to use as text.",
            value="text",
            advanced=True,
        ),
        *LCVectorStoreComponent.inputs,
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> VectorStore:
        """Build and return a Pinecone vector store instance."""
        try:
            from langchain_pinecone import PineconeVectorStore
        except ImportError as e:
            msg = "langchain-pinecone is not installed. Please install it with `pip install langchain-pinecone`."
            raise ValueError(msg) from e

        try:
            from langchain_pinecone._utilities import DistanceStrategy

            # Wrap the embedding model to ensure float32 output
            wrapped_embeddings = Float32Embeddings(self.embedding)

            # Convert distance strategy
            distance_strategy = self.distance_strategy.replace(" ", "_").upper()
            distance_strategy = DistanceStrategy[distance_strategy]

            # Initialize Pinecone instance with wrapped embeddings
            pinecone = PineconeVectorStore(
                index_name=self.index_name,
                embedding=wrapped_embeddings,  # Use wrapped embeddings
                text_key=self.text_key,
                namespace=self.namespace,
                distance_strategy=distance_strategy,
                pinecone_api_key=self.pinecone_api_key,
            )
        except Exception as e:
            error_msg = "Error building Pinecone vector store"
            raise ValueError(error_msg) from e
        else:
            # Process documents if any
            documents = []
            if self.ingest_data:
                for doc in self.ingest_data:
                    if isinstance(doc, Data):
                        documents.append(doc.to_lc_document())
                    else:
                        documents.append(doc)

                if documents:
                    pinecone.add_documents(documents)

            return pinecone

    def search_documents(self) -> list[Data]:
        """Search documents in the vector store."""
        try:
            if not self.search_query or not isinstance(self.search_query, str) or not self.search_query.strip():
                return []

            vector_store = self.build_vector_store()
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )
        except Exception as e:
            error_msg = "Error searching documents"
            raise ValueError(error_msg) from e
        else:
            data = docs_to_data(docs)
            self.status = data
            return data


class Float32Embeddings:
    """Wrapper class to ensure float32 embeddings."""

    def __init__(self, base_embeddings):
        self.base_embeddings = base_embeddings

    def embed_documents(self, texts):
        embeddings = self.base_embeddings.embed_documents(texts)
        if isinstance(embeddings, np.ndarray):
            return [[self._force_float32(x) for x in vec] for vec in embeddings]
        return [[self._force_float32(x) for x in vec] for vec in embeddings]

    def embed_query(self, text):
        embedding = self.base_embeddings.embed_query(text)
        if isinstance(embedding, np.ndarray):
            return [self._force_float32(x) for x in embedding]
        return [self._force_float32(x) for x in embedding]

    def _force_float32(self, value):
        """Convert any numeric type to Python float."""
        return float(np.float32(value))

Qdrant

This component creates a Qdrant Vector Store with search capabilities. For more information, see the Qdrant documentation.

Parameters

Inputs
Name Type Description

collection_name

String

Name of the Qdrant collection

host

String

Qdrant server host

port

Integer

Qdrant server port

grpc_port

Integer

Qdrant gRPC port

api_key

SecretString

API key for Qdrant

prefix

String

Prefix for Qdrant

timeout

Integer

Timeout for Qdrant operations

path

String

Path for Qdrant

url

String

URL for Qdrant

distance_func

String

Distance function for vector similarity

content_payload_key

String

Key for content payload

metadata_payload_key

String

Key for metadata payload

search_query

String

Query for similarity search

ingest_data

Data

Data to be ingested into the vector store

embedding

Embeddings

Embedding function to use

number_of_results

Integer

Number of results to return in search

Outputs
Name Type Description

vector_store

Qdrant

Qdrant vector store instance

search_results

List[Data]

Results of similarity search

Component code

qdrant.py
from langchain.embeddings.base import Embeddings
from langchain_community.vectorstores import Qdrant

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import (
    DropdownInput,
    HandleInput,
    IntInput,
    SecretStrInput,
    StrInput,
)
from langflow.schema import Data


class QdrantVectorStoreComponent(LCVectorStoreComponent):
    display_name = "Qdrant"
    description = "Qdrant Vector Store with search capabilities"
    icon = "Qdrant"

    inputs = [
        StrInput(name="collection_name", display_name="Collection Name", required=True),
        StrInput(name="host", display_name="Host", value="localhost", advanced=True),
        IntInput(name="port", display_name="Port", value=6333, advanced=True),
        IntInput(name="grpc_port", display_name="gRPC Port", value=6334, advanced=True),
        SecretStrInput(name="api_key", display_name="API Key", advanced=True),
        StrInput(name="prefix", display_name="Prefix", advanced=True),
        IntInput(name="timeout", display_name="Timeout", advanced=True),
        StrInput(name="path", display_name="Path", advanced=True),
        StrInput(name="url", display_name="URL", advanced=True),
        DropdownInput(
            name="distance_func",
            display_name="Distance Function",
            options=["Cosine", "Euclidean", "Dot Product"],
            value="Cosine",
            advanced=True,
        ),
        StrInput(name="content_payload_key", display_name="Content Payload Key", value="page_content", advanced=True),
        StrInput(name="metadata_payload_key", display_name="Metadata Payload Key", value="metadata", advanced=True),
        *LCVectorStoreComponent.inputs,
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> Qdrant:
        qdrant_kwargs = {
            "collection_name": self.collection_name,
            "content_payload_key": self.content_payload_key,
            "metadata_payload_key": self.metadata_payload_key,
        }

        server_kwargs = {
            "host": self.host or None,
            "port": int(self.port),  # Ensure port is an integer
            "grpc_port": int(self.grpc_port),  # Ensure grpc_port is an integer
            "api_key": self.api_key,
            "prefix": self.prefix,
            # Ensure timeout is an integer
            "timeout": int(self.timeout) if self.timeout else None,
            "path": self.path or None,
            "url": self.url or None,
        }

        server_kwargs = {k: v for k, v in server_kwargs.items() if v is not None}
        documents = []

        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        if not isinstance(self.embedding, Embeddings):
            msg = "Invalid embedding object"
            raise TypeError(msg)

        if documents:
            qdrant = Qdrant.from_documents(documents, embedding=self.embedding, **qdrant_kwargs, **server_kwargs)
        else:
            from qdrant_client import QdrantClient

            client = QdrantClient(**server_kwargs)
            qdrant = Qdrant(embeddings=self.embedding, client=client, **qdrant_kwargs)

        return qdrant

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

Redis

This component creates a Redis Vector Store with search capabilities. For more information, see the Redis documentation.

Parameters

Inputs
Name Type Description

redis_server_url

SecretString

Redis server connection string

redis_index_name

String

Name of the Redis index

code

String

Custom code for Redis (advanced)

schema

String

Schema for Redis index

search_query

String

Query for similarity search

ingest_data

Data

Data to be ingested into the vector store

number_of_results

Integer

Number of results to return in search

embedding

Embeddings

Embedding function to use

Outputs
Name Type Description

vector_store

Redis

Redis vector store instance

search_results

List[Data]

Results of similarity search

Component code

redis.py
from pathlib import Path

from langchain.text_splitter import CharacterTextSplitter
from langchain_community.vectorstores.redis import Redis

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import HandleInput, IntInput, SecretStrInput, StrInput
from langflow.schema import Data


class RedisVectorStoreComponent(LCVectorStoreComponent):
    """A custom component for implementing a Vector Store using Redis."""

    display_name: str = "Redis"
    description: str = "Implementation of Vector Store using Redis"
    name = "Redis"
    icon = "Redis"

    inputs = [
        SecretStrInput(name="redis_server_url", display_name="Redis Server Connection String", required=True),
        StrInput(
            name="redis_index_name",
            display_name="Redis Index",
        ),
        StrInput(name="code", display_name="Code", advanced=True),
        StrInput(
            name="schema",
            display_name="Schema",
        ),
        *LCVectorStoreComponent.inputs,
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> Redis:
        documents = []

        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)
        Path("docuemnts.txt").write_text(str(documents), encoding="utf-8")

        if not documents:
            if self.schema is None:
                msg = "If no documents are provided, a schema must be provided."
                raise ValueError(msg)
            redis_vs = Redis.from_existing_index(
                embedding=self.embedding,
                index_name=self.redis_index_name,
                schema=self.schema,
                key_prefix=None,
                redis_url=self.redis_server_url,
            )
        else:
            text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
            docs = text_splitter.split_documents(documents)
            redis_vs = Redis.from_documents(
                documents=docs,
                embedding=self.embedding,
                redis_url=self.redis_server_url,
                index_name=self.redis_index_name,
            )
        return redis_vs

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

Supabase

This component creates a connection to a Supabase Vector Store with search capabilities.

For more information, see the Supabase documentation.

Parameters

Inputs
Name Type Description

supabase_url

String

URL of the Supabase instance

supabase_service_key

SecretString

Service key for Supabase authentication

table_name

String

Name of the table in Supabase

query_name

String

Name of the query to use

search_query

String

Query for similarity search

ingest_data

Data

Data to be ingested into the vector store

embedding

Embeddings

Embedding function to use

number_of_results

Integer

Number of results to return in search

Outputs
Name Type Description

vector_store

SupabaseVectorStore

Supabase vector store instance

search_results

List[Data]

Results of similarity search

Component code

supabase.py
from langchain_community.vectorstores import SupabaseVectorStore
from supabase.client import Client, create_client

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import HandleInput, IntInput, SecretStrInput, StrInput
from langflow.schema import Data


class SupabaseVectorStoreComponent(LCVectorStoreComponent):
    display_name = "Supabase"
    description = "Supabase Vector Store with search capabilities"
    name = "SupabaseVectorStore"
    icon = "Supabase"

    inputs = [
        StrInput(name="supabase_url", display_name="Supabase URL", required=True),
        SecretStrInput(name="supabase_service_key", display_name="Supabase Service Key", required=True),
        StrInput(name="table_name", display_name="Table Name", advanced=True),
        StrInput(name="query_name", display_name="Query Name"),
        *LCVectorStoreComponent.inputs,
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> SupabaseVectorStore:
        supabase: Client = create_client(self.supabase_url, supabase_key=self.supabase_service_key)

        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        if documents:
            supabase_vs = SupabaseVectorStore.from_documents(
                documents=documents,
                embedding=self.embedding,
                query_name=self.query_name,
                client=supabase,
                table_name=self.table_name,
            )
        else:
            supabase_vs = SupabaseVectorStore(
                client=supabase,
                embedding=self.embedding,
                table_name=self.table_name,
                query_name=self.query_name,
            )

        return supabase_vs

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

Upstash

This component creates an Upstash Vector Store with search capabilities. For more information, see the Upstash documentation.

Parameters

Inputs
Name Type Description

index_url

String

The URL of the Upstash index

index_token

SecretString

The token for the Upstash index

text_key

String

The key in the record to use as text

namespace

String

Namespace for the index

search_query

String

Query for similarity search

metadata_filter

String

Filters documents by metadata

ingest_data

Data

Data to be ingested into the vector store

embedding

Embeddings

Embedding function to use (optional)

number_of_results

Integer

Number of results to return in search

Outputs
Name Type Description

vector_store

UpstashVectorStore

Upstash vector store instance

search_results

List[Data]

Results of similarity search

Component code

upstash.py
from langchain_community.vectorstores import UpstashVectorStore

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import (
    HandleInput,
    IntInput,
    MultilineInput,
    SecretStrInput,
    StrInput,
)
from langflow.schema import Data


class UpstashVectorStoreComponent(LCVectorStoreComponent):
    display_name = "Upstash"
    description = "Upstash Vector Store with search capabilities"
    name = "Upstash"
    icon = "Upstash"

    inputs = [
        StrInput(
            name="index_url",
            display_name="Index URL",
            info="The URL of the Upstash index.",
            required=True,
        ),
        SecretStrInput(
            name="index_token",
            display_name="Index Token",
            info="The token for the Upstash index.",
            required=True,
        ),
        StrInput(
            name="text_key",
            display_name="Text Key",
            info="The key in the record to use as text.",
            value="text",
            advanced=True,
        ),
        StrInput(
            name="namespace",
            display_name="Namespace",
            info="Leave empty for default namespace.",
        ),
        *LCVectorStoreComponent.inputs,
        MultilineInput(
            name="metadata_filter",
            display_name="Metadata Filter",
            info="Filters documents by metadata. Look at the documentation for more information.",
        ),
        HandleInput(
            name="embedding",
            display_name="Embedding",
            input_types=["Embeddings"],
            info="To use Upstash's embeddings, don't provide an embedding.",
        ),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> UpstashVectorStore:
        use_upstash_embedding = self.embedding is None

        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        if documents:
            if use_upstash_embedding:
                upstash_vs = UpstashVectorStore(
                    embedding=use_upstash_embedding,
                    text_key=self.text_key,
                    index_url=self.index_url,
                    index_token=self.index_token,
                    namespace=self.namespace,
                )
                upstash_vs.add_documents(documents)
            else:
                upstash_vs = UpstashVectorStore.from_documents(
                    documents=documents,
                    embedding=self.embedding,
                    text_key=self.text_key,
                    index_url=self.index_url,
                    index_token=self.index_token,
                    namespace=self.namespace,
                )
        else:
            upstash_vs = UpstashVectorStore(
                embedding=self.embedding or use_upstash_embedding,
                text_key=self.text_key,
                index_url=self.index_url,
                index_token=self.index_token,
                namespace=self.namespace,
            )

        return upstash_vs

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
                filter=self.metadata_filter,
            )

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

Vectara

This component creates a Vectara Vector Store with search capabilities. For more information, see the Vectara documentation.

Parameters

Inputs
Name Type Description

vectara_customer_id

String

Vectara customer ID

vectara_corpus_id

String

Vectara corpus ID

vectara_api_key

SecretString

Vectara API key

embedding

Embeddings

Embedding function to use (optional)

ingest_data

List[Document/Data]

Data to be ingested into the vector store

search_query

String

Query for similarity search

number_of_results

Integer

Number of results to return in search

Outputs
Name Type Description

vector_store

Vectara

Vectara vector store instance

search_results

List[Data]

Results of similarity search

Component code

vectara.py
from typing import TYPE_CHECKING

from langchain_community.vectorstores import Vectara

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import HandleInput, IntInput, SecretStrInput, StrInput
from langflow.schema import Data

if TYPE_CHECKING:
    from langchain_community.vectorstores import Vectara


class VectaraVectorStoreComponent(LCVectorStoreComponent):
    """Vectara Vector Store with search capabilities."""

    display_name: str = "Vectara"
    description: str = "Vectara Vector Store with search capabilities"
    name = "Vectara"
    icon = "Vectara"

    inputs = [
        StrInput(name="vectara_customer_id", display_name="Vectara Customer ID", required=True),
        StrInput(name="vectara_corpus_id", display_name="Vectara Corpus ID", required=True),
        SecretStrInput(name="vectara_api_key", display_name="Vectara API Key", required=True),
        HandleInput(
            name="embedding",
            display_name="Embedding",
            input_types=["Embeddings"],
        ),
        *LCVectorStoreComponent.inputs,
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> "Vectara":
        """Builds the Vectara object."""
        try:
            from langchain_community.vectorstores import Vectara
        except ImportError as e:
            msg = "Could not import Vectara. Please install it with `pip install langchain-community`."
            raise ImportError(msg) from e

        vectara = Vectara(
            vectara_customer_id=self.vectara_customer_id,
            vectara_corpus_id=self.vectara_corpus_id,
            vectara_api_key=self.vectara_api_key,
        )

        self._add_documents_to_vector_store(vectara)
        return vectara

    def _add_documents_to_vector_store(self, vector_store: "Vectara") -> None:
        """Adds documents to the Vector Store."""
        if not self.ingest_data:
            self.status = "No documents to add to Vectara"
            return

        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        if documents:
            self.log(f"Adding {len(documents)} documents to Vectara.")
            vector_store.add_documents(documents)
            self.status = f"Added {len(documents)} documents to Vectara"
        else:
            self.log("No documents to add to Vectara.")
            self.status = "No valid documents to add to Vectara"

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )

            data = docs_to_data(docs)
            self.status = f"Found {len(data)} results for the query: {self.search_query}"
            return data
        self.status = "No search query provided"
        return []

Vectara RAG

This component creates a Vectara RAG pipeline. For more information, see the Vectara documentation.

Parameters

Inputs
Name Type Description

vectara_customer_id

String

Vectara customer ID

vectara_corpus_id

String

Vectara corpus ID

vectara_api_key

SecretString

Vectara API key

search_query

String

The query to receive an answer on

lexical_interpolation

Float

Hybrid search factor

filter

String

Metadata filters for narrowing search

reranker

String

Type of reranker to use

reranker_k

Integer

Number of results to rerank

diversity_bias

Float

Diversity bias for MMR reranker

max_results

Integer

Maximum results to summarize

response_lang

String

Language code for response

prompt

String

Name of the summarizer prompt

Outputs
Name Type Description

answer

Message

Generated response from Vectara RAG

Component code

vectara_rag.py
from langflow.custom import Component
from langflow.field_typing.range_spec import RangeSpec
from langflow.io import DropdownInput, FloatInput, IntInput, MessageTextInput, Output, SecretStrInput, StrInput
from langflow.schema.message import Message


class VectaraRagComponent(Component):
    display_name = "Vectara RAG"
    description = "Vectara's full end to end RAG"
    documentation = "https://docs.vectara.com/docs"
    icon = "Vectara"
    name = "VectaraRAG"
    SUMMARIZER_PROMPTS = [
        "vectara-summary-ext-24-05-sml",
        "vectara-summary-ext-24-05-med-omni",
        "vectara-summary-ext-24-05-large",
        "vectara-summary-ext-24-05-med",
        "vectara-summary-ext-v1.3.0",
    ]

    RERANKER_TYPES = ["mmr", "rerank_multilingual_v1", "none"]

    RESPONSE_LANGUAGES = [
        "auto",
        "eng",
        "spa",
        "fra",
        "zho",
        "deu",
        "hin",
        "ara",
        "por",
        "ita",
        "jpn",
        "kor",
        "rus",
        "tur",
        "fas",
        "vie",
        "tha",
        "heb",
        "nld",
        "ind",
        "pol",
        "ukr",
        "ron",
        "swe",
        "ces",
        "ell",
        "ben",
        "msa",
        "urd",
    ]

    field_order = ["vectara_customer_id", "vectara_corpus_id", "vectara_api_key", "search_query", "reranker"]

    inputs = [
        StrInput(name="vectara_customer_id", display_name="Vectara Customer ID", required=True),
        StrInput(name="vectara_corpus_id", display_name="Vectara Corpus ID", required=True),
        SecretStrInput(name="vectara_api_key", display_name="Vectara API Key", required=True),
        MessageTextInput(
            name="search_query",
            display_name="Search Query",
            info="The query to receive an answer on.",
            tool_mode=True,
        ),
        FloatInput(
            name="lexical_interpolation",
            display_name="Hybrid Search Factor",
            range_spec=RangeSpec(min=0.005, max=0.1, step=0.005),
            value=0.005,
            advanced=True,
            info="How much to weigh lexical scores compared to the embedding score. "
            "0 means lexical search is not used at all, and 1 means only lexical search is used.",
        ),
        MessageTextInput(
            name="filter",
            display_name="Metadata Filters",
            value="",
            advanced=True,
            info="The filter string to narrow the search to according to metadata attributes.",
        ),
        DropdownInput(
            name="reranker",
            display_name="Reranker Type",
            options=RERANKER_TYPES,
            value=RERANKER_TYPES[0],
            info="How to rerank the retrieved search results.",
        ),
        IntInput(
            name="reranker_k",
            display_name="Number of Results to Rerank",
            value=50,
            range_spec=RangeSpec(min=1, max=100, step=1),
            advanced=True,
        ),
        FloatInput(
            name="diversity_bias",
            display_name="Diversity Bias",
            value=0.2,
            range_spec=RangeSpec(min=0, max=1, step=0.01),
            advanced=True,
            info="Ranges from 0 to 1, with higher values indicating greater diversity (only applies to MMR reranker).",
        ),
        IntInput(
            name="max_results",
            display_name="Max Results to Summarize",
            value=7,
            range_spec=RangeSpec(min=1, max=100, step=1),
            advanced=True,
            info="The maximum number of search results to be available to the prompt.",
        ),
        DropdownInput(
            name="response_lang",
            display_name="Response Language",
            options=RESPONSE_LANGUAGES,
            value="eng",
            advanced=True,
            info="Use the ISO 639-1 or 639-3 language code or auto to automatically detect the language.",
        ),
        DropdownInput(
            name="prompt",
            display_name="Prompt Name",
            options=SUMMARIZER_PROMPTS,
            value=SUMMARIZER_PROMPTS[0],
            advanced=True,
            info="Only vectara-summary-ext-24-05-sml is for Growth customers; "
            "all other prompts are for Scale customers only.",
        ),
    ]

    outputs = [
        Output(name="answer", display_name="Answer", method="generate_response"),
    ]

    def generate_response(
        self,
    ) -> Message:
        text_output = ""

        try:
            from langchain_community.vectorstores import Vectara
            from langchain_community.vectorstores.vectara import RerankConfig, SummaryConfig, VectaraQueryConfig
        except ImportError as e:
            msg = "Could not import Vectara. Please install it with `pip install langchain-community`."
            raise ImportError(msg) from e

        vectara = Vectara(self.vectara_customer_id, self.vectara_corpus_id, self.vectara_api_key)
        rerank_config = RerankConfig(self.reranker, self.reranker_k, self.diversity_bias)
        summary_config = SummaryConfig(
            is_enabled=True, max_results=self.max_results, response_lang=self.response_lang, prompt_name=self.prompt
        )
        config = VectaraQueryConfig(
            lambda_val=self.lexical_interpolation,
            filter=self.filter,
            summary_config=summary_config,
            rerank_config=rerank_config,
        )
        rag = vectara.as_rag(config)
        response = rag.invoke(self.search_query, config={"callbacks": self.get_langchain_callbacks()})

        text_output = response["answer"]

        return Message(text=text_output)

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com