Vector Stores

Vector databases are used to store and search for vectors. They can be used to store embeddings, search for similar vectors, and perform other vector operations.

Astra DB Vector Store

This component implements a Vector Store using Astra DB Serverless with search capabilities.

Parameters

Inputs
Name Display Name Info

collection_name

Collection Name

The name of the collection within Astra DB Serverless where the vectors will be stored (required)

token

Astra DB Application Token

Authentication token for accessing Astra DB Serverless (required)

api_endpoint

API Endpoint

API endpoint URL for the Astra DB Serverless service (required)

search_input

Search Input

Query string for similarity search

ingest_data

Ingest Data

Data to be ingested into the vector store

namespace

Namespace

Optional namespace within Astra DB Serverless to use for the collection

embedding_service

Embedding Model or Astra Vectorize

Determines whether to use an Embedding Model or Astra Vectorize for the collection

embedding

Embedding Model

Allows an embedding model configuration (when using Embedding Model)

provider

Vectorize Provider

Provider for Astra Vectorize (when using Astra Vectorize)

metric

Metric

Optional distance metric for vector comparisons

batch_size

Batch Size

Optional number of data to process in a single batch

setup_mode

Setup Mode

Configuration mode for setting up the vector store (options: "Sync", "Async", "Off", default: "Sync")

pre_delete_collection

Pre Delete Collection

Boolean flag to determine whether to delete the collection before creating a new one

number_of_results

Number of Results

Number of results to return in similarity search (default: 4)

search_type

Search Type

Search type to use (options: "Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)")

search_score_threshold

Search Score Threshold

Minimum similarity score threshold for search results

search_filter

Search Metadata Filter

Optional dictionary of filters to apply to the search query

Outputs
Name Display Name Info

vector_store

Vector Store

Built Astra DB Serverless vector store

search_results

Search Results

Results of the similarity search as a list of Data objects

Component code

astradb.py
import os
from collections import defaultdict

import orjson
from astrapy import DataAPIClient
from astrapy.admin import parse_api_endpoint
from langchain_astradb import AstraDBVectorStore

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers import docs_to_data
from langflow.inputs import DictInput, FloatInput, MessageTextInput, NestedDictInput
from langflow.io import (
    BoolInput,
    DataInput,
    DropdownInput,
    HandleInput,
    IntInput,
    MultilineInput,
    SecretStrInput,
    StrInput,
)
from langflow.schema import Data
from langflow.utils.version import get_version_info


class AstraDBVectorStoreComponent(LCVectorStoreComponent):
    display_name: str = "Astra DB"
    description: str = "Implementation of Vector Store using Astra DB with search capabilities"
    documentation: str = "https://docs.langflow.org/starter-projects-vector-store-rag"
    name = "AstraDB"
    icon: str = "AstraDB"

    _cached_vector_store: AstraDBVectorStore | None = None

    VECTORIZE_PROVIDERS_MAPPING = defaultdict(
        list,
        {
            "Azure OpenAI": [
                "azureOpenAI",
                ["text-embedding-3-small", "text-embedding-3-large", "text-embedding-ada-002"],
            ],
            "Hugging Face - Dedicated": ["huggingfaceDedicated", ["endpoint-defined-model"]],
            "Hugging Face - Serverless": [
                "huggingface",
                [
                    "sentence-transformers/all-MiniLM-L6-v2",
                    "intfloat/multilingual-e5-large",
                    "intfloat/multilingual-e5-large-instruct",
                    "BAAI/bge-small-en-v1.5",
                    "BAAI/bge-base-en-v1.5",
                    "BAAI/bge-large-en-v1.5",
                ],
            ],
            "Jina AI": [
                "jinaAI",
                [
                    "jina-embeddings-v2-base-en",
                    "jina-embeddings-v2-base-de",
                    "jina-embeddings-v2-base-es",
                    "jina-embeddings-v2-base-code",
                    "jina-embeddings-v2-base-zh",
                ],
            ],
            "Mistral AI": ["mistral", ["mistral-embed"]],
            "Nvidia": ["nvidia", ["NV-Embed-QA"]],
            "OpenAI": ["openai", ["text-embedding-3-small", "text-embedding-3-large", "text-embedding-ada-002"]],
            "Upstage": ["upstageAI", ["solar-embedding-1-large"]],
            "Voyage AI": [
                "voyageAI",
                ["voyage-large-2-instruct", "voyage-law-2", "voyage-code-2", "voyage-large-2", "voyage-2"],
            ],
        },
    )

    inputs = [
        SecretStrInput(
            name="token",
            display_name="Astra DB Application Token",
            info="Authentication token for accessing Astra DB.",
            value="ASTRA_DB_APPLICATION_TOKEN",
            required=True,
            advanced=os.getenv("ASTRA_ENHANCED", "false").lower() == "true",
            real_time_refresh=True,
        ),
        SecretStrInput(
            name="api_endpoint",
            display_name="Database" if os.getenv("ASTRA_ENHANCED", "false").lower() == "true" else "API Endpoint",
            info="API endpoint URL for the Astra DB service.",
            value="ASTRA_DB_API_ENDPOINT",
            required=True,
            real_time_refresh=True,
        ),
        DropdownInput(
            name="collection_name",
            display_name="Collection",
            info="The name of the collection within Astra DB where the vectors will be stored.",
            required=True,
            refresh_button=True,
            real_time_refresh=True,
            options=["+ Create new collection"],
            value="+ Create new collection",
        ),
        StrInput(
            name="collection_name_new",
            display_name="Collection Name",
            info="Name of the new collection to create.",
            advanced=os.getenv("LANGFLOW_HOST") is not None,
            required=os.getenv("LANGFLOW_HOST") is None,
        ),
        StrInput(
            name="keyspace",
            display_name="Keyspace",
            info="Optional keyspace within Astra DB to use for the collection.",
            advanced=True,
        ),
        MultilineInput(
            name="search_input",
            display_name="Search Input",
        ),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            advanced=True,
            value=4,
        ),
        DropdownInput(
            name="search_type",
            display_name="Search Type",
            info="Search type to use",
            options=["Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)"],
            value="Similarity",
            advanced=True,
        ),
        FloatInput(
            name="search_score_threshold",
            display_name="Search Score Threshold",
            info="Minimum similarity score threshold for search results. "
            "(when using 'Similarity with score threshold')",
            value=0,
            advanced=True,
        ),
        NestedDictInput(
            name="advanced_search_filter",
            display_name="Search Metadata Filter",
            info="Optional dictionary of filters to apply to the search query.",
            advanced=True,
        ),
        DictInput(
            name="search_filter",
            display_name="[DEPRECATED] Search Metadata Filter",
            info="Deprecated: use advanced_search_filter. Optional dictionary of filters to apply to the search query.",
            advanced=True,
            list=True,
        ),
        DataInput(
            name="ingest_data",
            display_name="Ingest Data",
        ),
        DropdownInput(
            name="embedding_choice",
            display_name="Embedding Model or Astra Vectorize",
            info="Determines whether to use Astra Vectorize for the collection.",
            options=["Embedding Model", "Astra Vectorize"],
            real_time_refresh=True,
            value="Embedding Model",
        ),
        HandleInput(
            name="embedding_model",
            display_name="Embedding Model",
            input_types=["Embeddings"],
            info="Allows an embedding model configuration.",
        ),
        DropdownInput(
            name="metric",
            display_name="Metric",
            info="Optional distance metric for vector comparisons in the vector store.",
            options=["cosine", "dot_product", "euclidean"],
            value="cosine",
            advanced=True,
        ),
        IntInput(
            name="batch_size",
            display_name="Batch Size",
            info="Optional number of data to process in a single batch.",
            advanced=True,
        ),
        IntInput(
            name="bulk_insert_batch_concurrency",
            display_name="Bulk Insert Batch Concurrency",
            info="Optional concurrency level for bulk insert operations.",
            advanced=True,
        ),
        IntInput(
            name="bulk_insert_overwrite_concurrency",
            display_name="Bulk Insert Overwrite Concurrency",
            info="Optional concurrency level for bulk insert operations that overwrite existing data.",
            advanced=True,
        ),
        IntInput(
            name="bulk_delete_concurrency",
            display_name="Bulk Delete Concurrency",
            info="Optional concurrency level for bulk delete operations.",
            advanced=True,
        ),
        DropdownInput(
            name="setup_mode",
            display_name="Setup Mode",
            info="Configuration mode for setting up the vector store, with options like 'Sync' or 'Off'.",
            options=["Sync", "Off"],
            advanced=True,
            value="Sync",
        ),
        BoolInput(
            name="pre_delete_collection",
            display_name="Pre Delete Collection",
            info="Boolean flag to determine whether to delete the collection before creating a new one.",
            advanced=True,
        ),
        StrInput(
            name="metadata_indexing_include",
            display_name="Metadata Indexing Include",
            info="Optional list of metadata fields to include in the indexing.",
            list=True,
            advanced=True,
        ),
        StrInput(
            name="metadata_indexing_exclude",
            display_name="Metadata Indexing Exclude",
            info="Optional list of metadata fields to exclude from the indexing.",
            list=True,
            advanced=True,
        ),
        StrInput(
            name="collection_indexing_policy",
            display_name="Collection Indexing Policy",
            info='Optional JSON string for the "indexing" field of the collection. '
            "See https://docs.datastax.com/en/astra-db-serverless/api-reference/collections.html#the-indexing-option",
            advanced=True,
        ),
    ]

    def del_fields(self, build_config, field_list):
        for field in field_list:
            if field in build_config:
                del build_config[field]

        return build_config

    def insert_in_dict(self, build_config, field_name, new_parameters):
        # Insert the new key-value pair after the found key
        for new_field_name, new_parameter in new_parameters.items():
            # Get all the items as a list of tuples (key, value)
            items = list(build_config.items())

            # Find the index of the key to insert after
            idx = len(items)
            for i, (key, _) in enumerate(items):
                if key == field_name:
                    idx = i + 1
                    break

            items.insert(idx, (new_field_name, new_parameter))

            # Clear the original dictionary and update with the modified items
            build_config.clear()
            build_config.update(items)

        return build_config

    def update_providers_mapping(self):
        # If we don't have token or api_endpoint, we can't fetch the list of providers
        if not self.token or not self.api_endpoint:
            self.log("Astra DB token and API endpoint are required to fetch the list of Vectorize providers.")

            return self.VECTORIZE_PROVIDERS_MAPPING

        try:
            self.log("Dynamically updating list of Vectorize providers.")

            # Get the admin object
            client = DataAPIClient(token=self.token)
            admin = client.get_admin()

            # Get the embedding providers
            db_admin = admin.get_database_admin(self.api_endpoint)
            embedding_providers = db_admin.find_embedding_providers().as_dict()

            vectorize_providers_mapping = {}

            # Map the provider display name to the provider key and models
            for provider_key, provider_data in embedding_providers["embeddingProviders"].items():
                display_name = provider_data["displayName"]
                models = [model["name"] for model in provider_data["models"]]

                vectorize_providers_mapping[display_name] = [provider_key, models]

            # Sort the resulting dictionary
            return defaultdict(list, dict(sorted(vectorize_providers_mapping.items())))
        except Exception as e:  # noqa: BLE001
            self.log(f"Error fetching Vectorize providers: {e}")

            return self.VECTORIZE_PROVIDERS_MAPPING

    def get_database(self):
        try:
            client = DataAPIClient(token=self.token)

            return client.get_database(
                self.api_endpoint,
                token=self.token,
            )
        except Exception as e:  # noqa: BLE001
            self.log(f"Error getting database: {e}")

            return None

    def _initialize_collection_options(self):
        database = self.get_database()
        if database is None:
            return ["+ Create new collection"]

        try:
            collections = [collection.name for collection in database.list_collections()]
        except Exception as e:  # noqa: BLE001
            self.log(f"Error fetching collections: {e}")

            return ["+ Create new collection"]

        return [*collections, "+ Create new collection"]

    def get_collection_choice(self):
        collection_name = self.collection_name
        if collection_name == "+ Create new collection":
            return self.collection_name_new

        return collection_name

    def get_collection_options(self):
        # Only get the options if the collection exists
        database = self.get_database()
        if database is None:
            return None

        collection_name = self.get_collection_choice()

        try:
            collection = database.get_collection(collection_name)
            collection_options = collection.options()
        except Exception as _:  # noqa: BLE001
            return None

        return collection_options.vector

    def update_build_config(self, build_config: dict, field_value: str, field_name: str | None = None):
        # Refresh the collection name options
        build_config["collection_name"]["options"] = self._initialize_collection_options()

        # If the collection name is set to "+ Create new collection", show embedding choice
        if field_name == "collection_name" and field_value == "+ Create new collection":
            build_config["embedding_choice"]["advanced"] = False
            build_config["embedding_choice"]["value"] = "Embedding Model"
            build_config["embedding_model"]["advanced"] = False

            build_config["collection_name_new"]["advanced"] = False
            build_config["collection_name_new"]["required"] = True

        # But if it's not, hide embedding choice
        elif field_name == "collection_name" and field_value != "+ Create new collection":
            build_config["embedding_choice"]["advanced"] = True

            build_config["collection_name_new"]["advanced"] = True
            build_config["collection_name_new"]["required"] = False
            build_config["collection_name_new"]["value"] = ""

        # Get the collection options for the selected collection
        collection_options = self.get_collection_options()

        # If the collection options are available (DB exists), show the advanced options
        if collection_options:
            build_config["embedding_choice"]["advanced"] = True

            if collection_options.service:
                self.del_fields(
                    build_config,
                    [
                        "embedding_provider",
                        "model",
                        "z_01_model_parameters",
                        "z_02_api_key_name",
                        "z_03_provider_api_key",
                        "z_04_authentication",
                    ],
                )

                build_config["embedding_model"]["advanced"] = True
                build_config["embedding_choice"]["value"] = "Astra Vectorize"
            else:
                build_config["embedding_model"]["advanced"] = False
                build_config["embedding_provider"]["advanced"] = False
                build_config["embedding_choice"]["value"] = "Embedding Model"

        elif field_name == "embedding_choice":
            if field_value == "Astra Vectorize":
                build_config["embedding_model"]["advanced"] = True

                # Update the providers mapping
                vectorize_providers = self.update_providers_mapping()

                new_parameter = DropdownInput(
                    name="embedding_provider",
                    display_name="Embedding Provider",
                    options=vectorize_providers.keys(),
                    value="",
                    required=True,
                    real_time_refresh=True,
                ).to_dict()

                self.insert_in_dict(build_config, "embedding_choice", {"embedding_provider": new_parameter})
            else:
                build_config["embedding_model"]["advanced"] = False

                self.del_fields(
                    build_config,
                    [
                        "embedding_provider",
                        "model",
                        "z_01_model_parameters",
                        "z_02_api_key_name",
                        "z_03_provider_api_key",
                        "z_04_authentication",
                    ],
                )

        elif field_name == "embedding_provider":
            self.del_fields(
                build_config,
                ["model", "z_01_model_parameters", "z_02_api_key_name", "z_03_provider_api_key", "z_04_authentication"],
            )

            # Update the providers mapping
            vectorize_providers = self.update_providers_mapping()
            model_options = vectorize_providers[field_value][1]

            new_parameter = DropdownInput(
                name="model",
                display_name="Model",
                info="The embedding model to use for the selected provider. Each provider has a different set of "
                "models available (full list at "
                "https://docs.datastax.com/en/astra-db-serverless/databases/embedding-generation.html):\n\n"
                f"{', '.join(model_options)}",
                options=model_options,
                value=None,
                required=True,
                real_time_refresh=True,
            ).to_dict()

            self.insert_in_dict(build_config, "embedding_provider", {"model": new_parameter})

        elif field_name == "model":
            self.del_fields(
                build_config,
                ["z_01_model_parameters", "z_02_api_key_name", "z_03_provider_api_key", "z_04_authentication"],
            )

            new_parameter_1 = DictInput(
                name="z_01_model_parameters",
                display_name="Model Parameters",
                list=True,
            ).to_dict()

            new_parameter_2 = MessageTextInput(
                name="z_02_api_key_name",
                display_name="API Key Name",
                info="The name of the embeddings provider API key stored on Astra. "
                "If set, it will override the 'ProviderKey' in the authentication parameters.",
            ).to_dict()

            new_parameter_3 = SecretStrInput(
                load_from_db=False,
                name="z_03_provider_api_key",
                display_name="Provider API Key",
                info="An alternative to the Astra Authentication that passes an API key for the provider "
                "with each request to Astra DB. "
                "This may be used when Vectorize is configured for the collection, "
                "but no corresponding provider secret is stored within Astra's key management system.",
            ).to_dict()

            new_parameter_4 = DictInput(
                name="z_04_authentication",
                display_name="Authentication Parameters",
                list=True,
            ).to_dict()

            self.insert_in_dict(
                build_config,
                "model",
                {
                    "z_01_model_parameters": new_parameter_1,
                    "z_02_api_key_name": new_parameter_2,
                    "z_03_provider_api_key": new_parameter_3,
                    "z_04_authentication": new_parameter_4,
                },
            )

        return build_config

    def build_vectorize_options(self, **kwargs):
        for attribute in [
            "embedding_provider",
            "model",
            "z_01_model_parameters",
            "z_02_api_key_name",
            "z_03_provider_api_key",
            "z_04_authentication",
        ]:
            if not hasattr(self, attribute):
                setattr(self, attribute, None)

        # Fetch values from kwargs if any self.* attributes are None
        provider_mapping = self.update_providers_mapping()
        provider_value = provider_mapping.get(self.embedding_provider, [None])[0] or kwargs.get("embedding_provider")
        model_name = self.model or kwargs.get("model")
        authentication = {**(self.z_04_authentication or {}), **kwargs.get("z_04_authentication", {})}
        parameters = self.z_01_model_parameters or kwargs.get("z_01_model_parameters", {})

        # Set the API key name if provided
        api_key_name = self.z_02_api_key_name or kwargs.get("z_02_api_key_name")
        provider_key = self.z_03_provider_api_key or kwargs.get("z_03_provider_api_key")
        if api_key_name:
            authentication["providerKey"] = api_key_name
        if authentication:
            provider_key = None
            authentication["providerKey"] = authentication["providerKey"].split(".")[0]

        # Set authentication and parameters to None if no values are provided
        if not authentication:
            authentication = None
        if not parameters:
            parameters = None

        return {
            # must match astrapy.info.CollectionVectorServiceOptions
            "collection_vector_service_options": {
                "provider": provider_value,
                "modelName": model_name,
                "authentication": authentication,
                "parameters": parameters,
            },
            "collection_embedding_api_key": provider_key,
        }

    @check_cached_vector_store
    def build_vector_store(self, vectorize_options=None):
        try:
            from langchain_astradb import AstraDBVectorStore
            from langchain_astradb.utils.astradb import SetupMode
        except ImportError as e:
            msg = (
                "Could not import langchain Astra DB integration package. "
                "Please install it with `pip install langchain-astradb`."
            )
            raise ImportError(msg) from e

        try:
            if not self.setup_mode:
                self.setup_mode = self._inputs["setup_mode"].options[0]

            setup_mode_value = SetupMode[self.setup_mode.upper()]
        except KeyError as e:
            msg = f"Invalid setup mode: {self.setup_mode}"
            raise ValueError(msg) from e

        metric_value = self.metric or None
        autodetect = False

        if self.embedding_choice == "Embedding Model":
            embedding_dict = {"embedding": self.embedding_model}
        # Use autodetect if the collection name is NOT set to "+ Create new collection"
        elif self.collection_name != "+ Create new collection":
            autodetect = True
            metric_value = None
            setup_mode_value = None
            embedding_dict = {}
        else:
            from astrapy.info import CollectionVectorServiceOptions

            # Grab the collection options if available
            collection_options = self.get_collection_options()

            # Ensure collection_options and its nested attributes are handled safely
            authentication = getattr(self, "z_04_authentication", {}) or (
                collection_options.service.authentication
                if collection_options and collection_options.service and collection_options.service.authentication
                else {}
            )

            # Build the vectorize options dictionary
            dict_options = vectorize_options or self.build_vectorize_options(
                embedding_provider=(
                    getattr(self, "embedding_provider", None)
                    or (
                        collection_options.service.provider
                        if collection_options and collection_options.service
                        else None
                    )
                ),
                model=(
                    getattr(self, "model", None)
                    or (
                        collection_options.service.model_name
                        if collection_options and collection_options.service
                        else None
                    )
                ),
                z_01_model_parameters=(
                    getattr(self, "z_01_model_parameters", None)
                    or (
                        collection_options.service.parameters
                        if collection_options and collection_options.service
                        else None
                    )
                ),
                z_02_api_key_name=(
                    getattr(self, "z_02_api_key_name", None)
                    or (authentication.get("apiKey") if authentication else None)
                ),
                z_03_provider_api_key=(
                    getattr(self, "z_03_provider_api_key", None)
                    or (authentication.get("providerKey") if authentication else None)
                ),
                z_04_authentication=authentication,
            )

            # Set the embedding dictionary
            embedding_dict = {
                "collection_vector_service_options": CollectionVectorServiceOptions.from_dict(
                    dict_options.get("collection_vector_service_options")
                ),
                "collection_embedding_api_key": dict_options.get("collection_embedding_api_key"),
            }

        # Get Langflow version and platform information
        __version__ = get_version_info()["version"]
        langflow_prefix = ""
        if os.getenv("LANGFLOW_HOST") is not None:
            langflow_prefix = "ds-"

        try:
            vector_store = AstraDBVectorStore(
                token=self.token,
                api_endpoint=self.api_endpoint,
                namespace=self.keyspace or None,
                collection_name=self.get_collection_choice(),
                autodetect_collection=autodetect,
                environment=(
                    parse_api_endpoint(getattr(self, "api_endpoint", None)).environment
                    if getattr(self, "api_endpoint", None)
                    else None
                ),
                metric=metric_value,
                batch_size=self.batch_size or None,
                bulk_insert_batch_concurrency=self.bulk_insert_batch_concurrency or None,
                bulk_insert_overwrite_concurrency=self.bulk_insert_overwrite_concurrency or None,
                bulk_delete_concurrency=self.bulk_delete_concurrency or None,
                setup_mode=setup_mode_value,
                pre_delete_collection=self.pre_delete_collection,
                metadata_indexing_include=[s for s in self.metadata_indexing_include if s] or None,
                metadata_indexing_exclude=[s for s in self.metadata_indexing_exclude if s] or None,
                collection_indexing_policy=orjson.dumps(self.collection_indexing_policy)
                if self.collection_indexing_policy
                else None,
                ext_callers=[(f"{langflow_prefix}langflow", __version__)],
                **embedding_dict,
            )
        except Exception as e:
            msg = f"Error initializing AstraDBVectorStore: {e}"
            raise ValueError(msg) from e

        self._add_documents_to_vector_store(vector_store)

        return vector_store

    def _add_documents_to_vector_store(self, vector_store) -> None:
        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                msg = "Vector Store Inputs must be Data objects."
                raise TypeError(msg)

        if documents:
            self.log(f"Adding {len(documents)} documents to the Vector Store.")
            try:
                vector_store.add_documents(documents)
            except Exception as e:
                msg = f"Error adding documents to AstraDBVectorStore: {e}"
                raise ValueError(msg) from e
        else:
            self.log("No documents to add to the Vector Store.")

    def _map_search_type(self) -> str:
        if self.search_type == "Similarity with score threshold":
            return "similarity_score_threshold"
        if self.search_type == "MMR (Max Marginal Relevance)":
            return "mmr"
        return "similarity"

    def _build_search_args(self):
        query = self.search_input if isinstance(self.search_input, str) and self.search_input.strip() else None
        search_filter = (
            {k: v for k, v in self.search_filter.items() if k and v and k.strip()} if self.search_filter else None
        )

        if query:
            args = {
                "query": query,
                "search_type": self._map_search_type(),
                "k": self.number_of_results,
                "score_threshold": self.search_score_threshold,
            }
        elif self.advanced_search_filter or search_filter:
            args = {
                "n": self.number_of_results,
            }
        else:
            return {}

        filter_arg = self.advanced_search_filter or {}

        if search_filter:
            self.log(self.log(f"`search_filter` is deprecated. Use `advanced_search_filter`. Cleaned: {search_filter}"))
            filter_arg.update(search_filter)

        if filter_arg:
            args["filter"] = filter_arg

        return args

    def search_documents(self, vector_store=None) -> list[Data]:
        vector_store = vector_store or self.build_vector_store()

        self.log(f"Search input: {self.search_input}")
        self.log(f"Search type: {self.search_type}")
        self.log(f"Number of results: {self.number_of_results}")

        try:
            search_args = self._build_search_args()
        except Exception as e:
            msg = f"Error in AstraDBVectorStore._build_search_args: {e}"
            raise ValueError(msg) from e

        if not search_args:
            self.log("No search input or filters provided. Skipping search.")
            return []

        docs = []
        search_method = "search" if "query" in search_args else "metadata_search"

        try:
            self.log(f"Calling vector_store.{search_method} with args: {search_args}")
            docs = getattr(vector_store, search_method)(**search_args)
        except Exception as e:
            msg = f"Error performing {search_method} in AstraDBVectorStore: {e}"
            raise ValueError(msg) from e

        self.log(f"Retrieved documents: {len(docs)}")

        data = docs_to_data(docs)
        self.log(f"Converted documents to data: {len(data)}")
        self.status = data
        return data

    def get_retriever_kwargs(self):
        search_args = self._build_search_args()
        return {
            "search_type": self._map_search_type(),
            "search_kwargs": search_args,
        }

Cassandra

This component creates a Cassandra Vector Store with search capabilities. For more information, see the Cassandra documentation.

Parameters

Inputs
Name Type Description

database_ref

String

Contact points for the database or AstraDB database ID

username

String

Username for the database (leave empty for AstraDB)

token

SecretString

User password for the database or AstraDB token

keyspace

String

Table Keyspace or AstraDB namespace

table_name

String

Name of the table or AstraDB collection

ttl_seconds

Integer

Time-to-live for added texts

batch_size

Integer

Number of data to process in a single batch

setup_mode

String

Configuration mode for setting up the Cassandra table

cluster_kwargs

Dict

Additional keyword arguments for the Cassandra cluster

search_query

String

Query for similarity search

ingest_data

Data

Data to be ingested into the vector store

embedding

Embeddings

Embedding function to use

number_of_results

Integer

Number of results to return in search

search_type

String

Type of search to perform

search_score_threshold

Float

Minimum similarity score for search results

search_filter

Dict

Metadata filters for search query

body_search

String

Document textual search terms

enable_body_search

Boolean

Flag to enable body search

Outputs
Name Type Description

vector_store

Cassandra

Cassandra vector store instance

search_results

List[Data]

Results of similarity search

Component code

cassandra.py
from langchain_community.vectorstores import Cassandra
from loguru import logger

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.inputs import BoolInput, DictInput, FloatInput
from langflow.io import (
    DataInput,
    DropdownInput,
    HandleInput,
    IntInput,
    MessageTextInput,
    MultilineInput,
    SecretStrInput,
)
from langflow.schema import Data


class CassandraVectorStoreComponent(LCVectorStoreComponent):
    display_name = "Cassandra"
    description = "Cassandra Vector Store with search capabilities"
    documentation = "https://python.langchain.com/docs/modules/data_connection/vectorstores/integrations/cassandra"
    name = "Cassandra"
    icon = "Cassandra"

    inputs = [
        MessageTextInput(
            name="database_ref",
            display_name="Contact Points / Astra Database ID",
            info="Contact points for the database (or AstraDB database ID)",
            required=True,
        ),
        MessageTextInput(
            name="username", display_name="Username", info="Username for the database (leave empty for AstraDB)."
        ),
        SecretStrInput(
            name="token",
            display_name="Password / AstraDB Token",
            info="User password for the database (or AstraDB token).",
            required=True,
        ),
        MessageTextInput(
            name="keyspace",
            display_name="Keyspace",
            info="Table Keyspace (or AstraDB namespace).",
            required=True,
        ),
        MessageTextInput(
            name="table_name",
            display_name="Table Name",
            info="The name of the table (or AstraDB collection) where vectors will be stored.",
            required=True,
        ),
        IntInput(
            name="ttl_seconds",
            display_name="TTL Seconds",
            info="Optional time-to-live for the added texts.",
            advanced=True,
        ),
        IntInput(
            name="batch_size",
            display_name="Batch Size",
            info="Optional number of data to process in a single batch.",
            value=16,
            advanced=True,
        ),
        DropdownInput(
            name="setup_mode",
            display_name="Setup Mode",
            info="Configuration mode for setting up the Cassandra table, with options like 'Sync', 'Async', or 'Off'.",
            options=["Sync", "Async", "Off"],
            value="Sync",
            advanced=True,
        ),
        DictInput(
            name="cluster_kwargs",
            display_name="Cluster arguments",
            info="Optional dictionary of additional keyword arguments for the Cassandra cluster.",
            advanced=True,
            is_list=True,
        ),
        MultilineInput(name="search_query", display_name="Search Query"),
        DataInput(
            name="ingest_data",
            display_name="Ingest Data",
            is_list=True,
        ),
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
        DropdownInput(
            name="search_type",
            display_name="Search Type",
            info="Search type to use",
            options=["Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)"],
            value="Similarity",
            advanced=True,
        ),
        FloatInput(
            name="search_score_threshold",
            display_name="Search Score Threshold",
            info="Minimum similarity score threshold for search results. "
            "(when using 'Similarity with score threshold')",
            value=0,
            advanced=True,
        ),
        DictInput(
            name="search_filter",
            display_name="Search Metadata Filter",
            info="Optional dictionary of filters to apply to the search query.",
            advanced=True,
            is_list=True,
        ),
        MessageTextInput(
            name="body_search",
            display_name="Search Body",
            info="Document textual search terms to apply to the search query.",
            advanced=True,
        ),
        BoolInput(
            name="enable_body_search",
            display_name="Enable Body Search",
            info="Flag to enable body search. This must be enabled BEFORE the table is created.",
            value=False,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> Cassandra:
        try:
            import cassio
            from langchain_community.utilities.cassandra import SetupMode
        except ImportError as e:
            msg = "Could not import cassio integration package. Please install it with `pip install cassio`."
            raise ImportError(msg) from e

        from uuid import UUID

        database_ref = self.database_ref

        try:
            UUID(self.database_ref)
            is_astra = True
        except ValueError:
            is_astra = False
            if "," in self.database_ref:
                # use a copy because we can't change the type of the parameter
                database_ref = self.database_ref.split(",")

        if is_astra:
            cassio.init(
                database_id=database_ref,
                token=self.token,
                cluster_kwargs=self.cluster_kwargs,
            )
        else:
            cassio.init(
                contact_points=database_ref,
                username=self.username,
                password=self.token,
                cluster_kwargs=self.cluster_kwargs,
            )
        documents = []

        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        body_index_options = [("index_analyzer", "STANDARD")] if self.enable_body_search else None

        if self.setup_mode == "Off":
            setup_mode = SetupMode.OFF
        elif self.setup_mode == "Sync":
            setup_mode = SetupMode.SYNC
        else:
            setup_mode = SetupMode.ASYNC

        if documents:
            logger.debug(f"Adding {len(documents)} documents to the Vector Store.")
            table = Cassandra.from_documents(
                documents=documents,
                embedding=self.embedding,
                table_name=self.table_name,
                keyspace=self.keyspace,
                ttl_seconds=self.ttl_seconds or None,
                batch_size=self.batch_size,
                body_index_options=body_index_options,
            )
        else:
            logger.debug("No documents to add to the Vector Store.")
            table = Cassandra(
                embedding=self.embedding,
                table_name=self.table_name,
                keyspace=self.keyspace,
                ttl_seconds=self.ttl_seconds or None,
                body_index_options=body_index_options,
                setup_mode=setup_mode,
            )
        return table

    def _map_search_type(self) -> str:
        if self.search_type == "Similarity with score threshold":
            return "similarity_score_threshold"
        if self.search_type == "MMR (Max Marginal Relevance)":
            return "mmr"
        return "similarity"

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        logger.debug(f"Search input: {self.search_query}")
        logger.debug(f"Search type: {self.search_type}")
        logger.debug(f"Number of results: {self.number_of_results}")

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            try:
                search_type = self._map_search_type()
                search_args = self._build_search_args()

                logger.debug(f"Search args: {search_args}")

                docs = vector_store.search(query=self.search_query, search_type=search_type, **search_args)
            except KeyError as e:
                if "content" in str(e):
                    msg = (
                        "You should ingest data through Langflow (or LangChain) to query it in Langflow. "
                        "Your collection does not contain a field name 'content'."
                    )
                    raise ValueError(msg) from e
                raise

            logger.debug(f"Retrieved documents: {len(docs)}")

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

    def _build_search_args(self):
        args = {
            "k": self.number_of_results,
            "score_threshold": self.search_score_threshold,
        }

        if self.search_filter:
            clean_filter = {k: v for k, v in self.search_filter.items() if k and v}
            if len(clean_filter) > 0:
                args["filter"] = clean_filter
        if self.body_search:
            if not self.enable_body_search:
                msg = "You should enable body search when creating the table to search the body field."
                raise ValueError(msg)
            args["body_search"] = self.body_search
        return args

    def get_retriever_kwargs(self):
        search_args = self._build_search_args()
        return {
            "search_type": self._map_search_type(),
            "search_kwargs": search_args,
        }

Cassandra Graph Vector Store

This component implements a Cassandra Graph Vector Store with search capabilities.

Parameters

Inputs
Name Display Name Info

database_ref

Contact Points / Astra Database ID

Contact points for the database or AstraDB database ID (required)

username

Username

Username for the database (leave empty for AstraDB)

token

Password / AstraDB Token

User password for the database or AstraDB token (required)

keyspace

Keyspace

Table Keyspace or AstraDB namespace (required)

table_name

Table Name

The name of the table or AstraDB collection where vectors will be stored (required)

setup_mode

Setup Mode

Configuration mode for setting up the Cassandra table (options: "Sync", "Off", default: "Sync")

cluster_kwargs

Cluster arguments

Optional dictionary of additional keyword arguments for the Cassandra cluster

search_query

Search Query

Query string for similarity search

ingest_data

Ingest Data

Data to be ingested into the vector store (list of Data objects)

embedding

Embedding

Embedding model to use

number_of_results

Number of Results

Number of results to return in similarity search (default: 4)

search_type

Search Type

Search type to use (options: "Traversal", "MMR traversal", "Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)", default: "Traversal")

depth

Depth of traversal

The maximum depth of edges to traverse (for "Traversal" or "MMR traversal" search types, default: 1)

search_score_threshold

Search Score Threshold

Minimum similarity score threshold for search results (for "Similarity with score threshold" search type)

search_filter

Search Metadata Filter

Optional dictionary of filters to apply to the search query

Outputs
Name Display Name Info

vector_store

Vector Store

Built Cassandra Graph vector store

search_results

Search Results

Results of the similarity search as a list of Data objects

Component code

cassandra_graph.py
from uuid import UUID

from langchain_community.graph_vectorstores import CassandraGraphVectorStore
from loguru import logger

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.inputs import DictInput, FloatInput
from langflow.io import (
    DataInput,
    DropdownInput,
    HandleInput,
    IntInput,
    MessageTextInput,
    MultilineInput,
    SecretStrInput,
)
from langflow.schema import Data


class CassandraGraphVectorStoreComponent(LCVectorStoreComponent):
    display_name = "Cassandra Graph"
    description = "Cassandra Graph Vector Store"
    documentation = "https://python.langchain.com/v0.2/api_reference/community/graph_vectorstores.html"
    name = "CassandraGraph"
    icon = "Cassandra"

    inputs = [
        MessageTextInput(
            name="database_ref",
            display_name="Contact Points / Astra Database ID",
            info="Contact points for the database (or AstraDB database ID)",
            required=True,
        ),
        MessageTextInput(
            name="username", display_name="Username", info="Username for the database (leave empty for AstraDB)."
        ),
        SecretStrInput(
            name="token",
            display_name="Password / AstraDB Token",
            info="User password for the database (or AstraDB token).",
            required=True,
        ),
        MessageTextInput(
            name="keyspace",
            display_name="Keyspace",
            info="Table Keyspace (or AstraDB namespace).",
            required=True,
        ),
        MessageTextInput(
            name="table_name",
            display_name="Table Name",
            info="The name of the table (or AstraDB collection) where vectors will be stored.",
            required=True,
        ),
        DropdownInput(
            name="setup_mode",
            display_name="Setup Mode",
            info="Configuration mode for setting up the Cassandra table, with options like 'Sync' or 'Off'.",
            options=["Sync", "Off"],
            value="Sync",
            advanced=True,
        ),
        DictInput(
            name="cluster_kwargs",
            display_name="Cluster arguments",
            info="Optional dictionary of additional keyword arguments for the Cassandra cluster.",
            advanced=True,
            is_list=True,
        ),
        MultilineInput(name="search_query", display_name="Search Query"),
        DataInput(
            name="ingest_data",
            display_name="Ingest Data",
            is_list=True,
        ),
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
        DropdownInput(
            name="search_type",
            display_name="Search Type",
            info="Search type to use",
            options=[
                "Traversal",
                "MMR traversal",
                "Similarity",
                "Similarity with score threshold",
                "MMR (Max Marginal Relevance)",
            ],
            value="Traversal",
            advanced=True,
        ),
        IntInput(
            name="depth",
            display_name="Depth of traversal",
            info="The maximum depth of edges to traverse. (when using 'Traversal' or 'MMR traversal')",
            value=1,
            advanced=True,
        ),
        FloatInput(
            name="search_score_threshold",
            display_name="Search Score Threshold",
            info="Minimum similarity score threshold for search results. "
            "(when using 'Similarity with score threshold')",
            value=0,
            advanced=True,
        ),
        DictInput(
            name="search_filter",
            display_name="Search Metadata Filter",
            info="Optional dictionary of filters to apply to the search query.",
            advanced=True,
            is_list=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> CassandraGraphVectorStore:
        try:
            import cassio
            from langchain_community.utilities.cassandra import SetupMode
        except ImportError as e:
            msg = "Could not import cassio integration package. Please install it with `pip install cassio`."
            raise ImportError(msg) from e

        database_ref = self.database_ref

        try:
            UUID(self.database_ref)
            is_astra = True
        except ValueError:
            is_astra = False
            if "," in self.database_ref:
                # use a copy because we can't change the type of the parameter
                database_ref = self.database_ref.split(",")

        if is_astra:
            cassio.init(
                database_id=database_ref,
                token=self.token,
                cluster_kwargs=self.cluster_kwargs,
            )
        else:
            cassio.init(
                contact_points=database_ref,
                username=self.username,
                password=self.token,
                cluster_kwargs=self.cluster_kwargs,
            )
        documents = []

        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        setup_mode = SetupMode.OFF if self.setup_mode == "Off" else SetupMode.SYNC

        if documents:
            logger.debug(f"Adding {len(documents)} documents to the Vector Store.")
            store = CassandraGraphVectorStore.from_documents(
                documents=documents,
                embedding=self.embedding,
                node_table=self.table_name,
                keyspace=self.keyspace,
            )
        else:
            logger.debug("No documents to add to the Vector Store.")
            store = CassandraGraphVectorStore(
                embedding=self.embedding,
                node_table=self.table_name,
                keyspace=self.keyspace,
                setup_mode=setup_mode,
            )
        return store

    def _map_search_type(self) -> str:
        if self.search_type == "Similarity":
            return "similarity"
        if self.search_type == "Similarity with score threshold":
            return "similarity_score_threshold"
        if self.search_type == "MMR (Max Marginal Relevance)":
            return "mmr"
        if self.search_type == "MMR Traversal":
            return "mmr_traversal"
        return "traversal"

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        logger.debug(f"Search input: {self.search_query}")
        logger.debug(f"Search type: {self.search_type}")
        logger.debug(f"Number of results: {self.number_of_results}")

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            try:
                search_type = self._map_search_type()
                search_args = self._build_search_args()

                logger.debug(f"Search args: {search_args}")

                docs = vector_store.search(query=self.search_query, search_type=search_type, **search_args)
            except KeyError as e:
                if "content" in str(e):
                    msg = (
                        "You should ingest data through Langflow (or LangChain) to query it in Langflow. "
                        "Your collection does not contain a field name 'content'."
                    )
                    raise ValueError(msg) from e
                raise

            logger.debug(f"Retrieved documents: {len(docs)}")

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

    def _build_search_args(self):
        args = {
            "k": self.number_of_results,
            "score_threshold": self.search_score_threshold,
            "depth": self.depth,
        }

        if self.search_filter:
            clean_filter = {k: v for k, v in self.search_filter.items() if k and v}
            if len(clean_filter) > 0:
                args["filter"] = clean_filter
        return args

    def get_retriever_kwargs(self):
        search_args = self._build_search_args()
        return {
            "search_type": self._map_search_type(),
            "search_kwargs": search_args,
        }

Chroma DB

This component creates a Chroma Vector Store with search capabilities. For more information, see the Chroma documentation.

Parameters

Inputs
Name Type Description

collection_name

String

The name of the Chroma collection. Default: "langflow".

persist_directory

String

The directory to persist the Chroma database.

search_query

String

The query to search for in the vector store.

ingest_data

Data

The data to ingest into the vector store (list of Data objects).

embedding

Embeddings

The embedding function to use for the vector store.

chroma_server_cors_allow_origins

String

CORS allow origins for the Chroma server.

chroma_server_host

String

Host for the Chroma server.

chroma_server_http_port

Integer

HTTP port for the Chroma server.

chroma_server_grpc_port

Integer

gRPC port for the Chroma server.

chroma_server_ssl_enabled

Boolean

Enable SSL for the Chroma server.

allow_duplicates

Boolean

Allow duplicate documents in the vector store.

search_type

String

Type of search to perform: "Similarity" or "MMR".

number_of_results

Integer

Number of results to return from the search. Default: 10.

limit

Integer

Limit the number of records to compare when Allow Duplicates is False.

Component code

chroma.py
from copy import deepcopy

from chromadb.config import Settings
from langchain_chroma import Chroma
from loguru import logger

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.base.vectorstores.utils import chroma_collection_to_data
from langflow.io import BoolInput, DataInput, DropdownInput, HandleInput, IntInput, MultilineInput, StrInput
from langflow.schema import Data


class ChromaVectorStoreComponent(LCVectorStoreComponent):
    """Chroma Vector Store with search capabilities."""

    display_name: str = "Chroma DB"
    description: str = "Chroma Vector Store with search capabilities"
    documentation = "https://python.langchain.com/docs/integrations/vectorstores/chroma"
    name = "Chroma"
    icon = "Chroma"

    inputs = [
        StrInput(
            name="collection_name",
            display_name="Collection Name",
            value="langflow",
        ),
        StrInput(
            name="persist_directory",
            display_name="Persist Directory",
        ),
        MultilineInput(
            name="search_query",
            display_name="Search Query",
        ),
        DataInput(
            name="ingest_data",
            display_name="Ingest Data",
            is_list=True,
        ),
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        StrInput(
            name="chroma_server_cors_allow_origins",
            display_name="Server CORS Allow Origins",
            advanced=True,
        ),
        StrInput(
            name="chroma_server_host",
            display_name="Server Host",
            advanced=True,
        ),
        IntInput(
            name="chroma_server_http_port",
            display_name="Server HTTP Port",
            advanced=True,
        ),
        IntInput(
            name="chroma_server_grpc_port",
            display_name="Server gRPC Port",
            advanced=True,
        ),
        BoolInput(
            name="chroma_server_ssl_enabled",
            display_name="Server SSL Enabled",
            advanced=True,
        ),
        BoolInput(
            name="allow_duplicates",
            display_name="Allow Duplicates",
            advanced=True,
            info="If false, will not add documents that are already in the Vector Store.",
        ),
        DropdownInput(
            name="search_type",
            display_name="Search Type",
            options=["Similarity", "MMR"],
            value="Similarity",
            advanced=True,
        ),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            advanced=True,
            value=10,
        ),
        IntInput(
            name="limit",
            display_name="Limit",
            advanced=True,
            info="Limit the number of records to compare when Allow Duplicates is False.",
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> Chroma:
        """Builds the Chroma object."""
        try:
            from chromadb import Client
            from langchain_chroma import Chroma
        except ImportError as e:
            msg = "Could not import Chroma integration package. Please install it with `pip install langchain-chroma`."
            raise ImportError(msg) from e
        # Chroma settings
        chroma_settings = None
        client = None
        if self.chroma_server_host:
            chroma_settings = Settings(
                chroma_server_cors_allow_origins=self.chroma_server_cors_allow_origins or [],
                chroma_server_host=self.chroma_server_host,
                chroma_server_http_port=self.chroma_server_http_port or None,
                chroma_server_grpc_port=self.chroma_server_grpc_port or None,
                chroma_server_ssl_enabled=self.chroma_server_ssl_enabled,
            )
            client = Client(settings=chroma_settings)

        # Check persist_directory and expand it if it is a relative path
        persist_directory = self.resolve_path(self.persist_directory) if self.persist_directory is not None else None

        chroma = Chroma(
            persist_directory=persist_directory,
            client=client,
            embedding_function=self.embedding,
            collection_name=self.collection_name,
        )

        self._add_documents_to_vector_store(chroma)
        self.status = chroma_collection_to_data(chroma.get(limit=self.limit))
        return chroma

    def _add_documents_to_vector_store(self, vector_store: "Chroma") -> None:
        """Adds documents to the Vector Store."""
        if not self.ingest_data:
            self.status = ""
            return

        _stored_documents_without_id = []
        if self.allow_duplicates:
            stored_data = []
        else:
            stored_data = chroma_collection_to_data(vector_store.get(limit=self.limit))
            for value in deepcopy(stored_data):
                del value.id
                _stored_documents_without_id.append(value)

        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                if _input not in _stored_documents_without_id:
                    documents.append(_input.to_lc_document())
            else:
                msg = "Vector Store Inputs must be Data objects."
                raise TypeError(msg)

        if documents and self.embedding is not None:
            logger.debug(f"Adding {len(documents)} documents to the Vector Store.")
            vector_store.add_documents(documents)
        else:
            logger.debug("No documents to add to the Vector Store.")

Clickhouse

This component implements a Clickhouse Vector Store with search capabilities using the LangChain framework.

Parameters

Inputs
Name Display Name Info

host

hostname

Clickhouse server hostname (required, default: "localhost")

port

port

Clickhouse server port (required, default: 8123)

database

database

Clickhouse database name (required)

table

Table name

Clickhouse table name (required)

username

The ClickHouse user name.

Username for authentication (required)

password

The password for username.

Password for authentication (required)

index_type

index_type

Type of the index (options: "annoy", "vector_similarity", default: "annoy")

metric

metric

Metric to compute distance (options: "angular", "euclidean", "manhattan", "hamming", "dot", default: "angular")

secure

Use https/TLS

Overrides inferred values from the interface or port arguments (default: false)

index_param

Param of the index

Index parameters (default: "'L2Distance',100")

index_query_params

index query params

Additional index query parameters

search_query

Search Query

Query string for similarity search

ingest_data

Ingest Data

Data to be ingested into the vector store

embedding

Embedding

Embedding model to use

number_of_results

Number of Results

Number of results to return in similarity search (default: 4)

score_threshold

Score threshold

Threshold for similarity scores

Outputs
Name Display Name Info

vector_store

Vector Store

Built Clickhouse vector store

search_results

Search Results

Results of the similarity search as a list of Data objects

Component code

clickhouse.py
from langchain_community.vectorstores import Clickhouse, ClickhouseSettings

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.inputs import BoolInput, FloatInput
from langflow.io import (
    DataInput,
    DictInput,
    DropdownInput,
    HandleInput,
    IntInput,
    MultilineInput,
    SecretStrInput,
    StrInput,
)
from langflow.schema import Data


class ClickhouseVectorStoreComponent(LCVectorStoreComponent):
    display_name = "Clickhouse"
    description = "Clickhouse Vector Store with search capabilities"
    documentation = "https://python.langchain.com/v0.2/docs/integrations/vectorstores/clickhouse/"
    name = "Clickhouse"
    icon = "Clickhouse"

    inputs = [
        StrInput(name="host", display_name="hostname", required=True, value="localhost"),
        IntInput(name="port", display_name="port", required=True, value=8123),
        StrInput(name="database", display_name="database", required=True),
        StrInput(name="table", display_name="Table name", required=True),
        StrInput(name="username", display_name="The ClickHouse user name.", required=True),
        SecretStrInput(name="password", display_name="The password for username.", required=True),
        DropdownInput(
            name="index_type",
            display_name="index_type",
            options=["annoy", "vector_similarity"],
            info="Type of the index.",
            value="annoy",
            advanced=True,
        ),
        DropdownInput(
            name="metric",
            display_name="metric",
            options=["angular", "euclidean", "manhattan", "hamming", "dot"],
            info="Metric to compute distance.",
            value="angular",
            advanced=True,
        ),
        BoolInput(
            name="secure",
            display_name="Use https/TLS. This overrides inferred values from the interface or port arguments.",
            value=False,
            advanced=True,
        ),
        StrInput(name="index_param", display_name="Param of the index", value="'L2Distance',100", advanced=True),
        DictInput(name="index_query_params", display_name="index query params", advanced=True),
        MultilineInput(name="search_query", display_name="Search Query"),
        DataInput(name="ingest_data", display_name="Ingest Data", is_list=True),
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
        FloatInput(name="score_threshold", display_name="Score threshold", advanced=True),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> Clickhouse:
        try:
            import clickhouse_connect
        except ImportError as e:
            msg = (
                "Failed to import Clickhouse dependencies. "
                "Install it using `pip install langflow[clickhouse-connect] --pre`"
            )
            raise ImportError(msg) from e

        try:
            client = clickhouse_connect.get_client(host=self.host, username=self.username, password=self.password)
            client.command("SELECT 1")
        except Exception as e:
            msg = f"Failed to connect to Clickhouse: {e}"
            raise ValueError(msg) from e

        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        kwargs = {}
        if self.index_param:
            kwargs["index_param"] = self.index_param.split(",")
        if self.index_query_params:
            kwargs["index_query_params"] = self.index_query_params

        settings = ClickhouseSettings(
            table=self.table,
            database=self.database,
            host=self.host,
            index_type=self.index_type,
            metric=self.metric,
            password=self.password,
            port=self.port,
            secure=self.secure,
            username=self.username,
            **kwargs,
        )
        if documents:
            clickhouse_vs = Clickhouse.from_documents(documents=documents, embedding=self.embedding, config=settings)

        else:
            clickhouse_vs = Clickhouse(embedding=self.embedding, config=settings)

        return clickhouse_vs

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            kwargs = {}
            if self.score_threshold:
                kwargs["score_threshold"] = self.score_threshold

            docs = vector_store.similarity_search(query=self.search_query, k=self.number_of_results, **kwargs)

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

Couchbase

This component creates a Couchbase Vector Store with search capabilities. For more information, see the Couchbase documentation.

Parameters

Inputs
Name Type Description

couchbase_connection_string

SecretString

Couchbase Cluster connection string (required).

couchbase_username

String

Couchbase username (required).

couchbase_password

SecretString

Couchbase password (required).

bucket_name

String

Name of the Couchbase bucket (required).

scope_name

String

Name of the Couchbase scope (required).

collection_name

String

Name of the Couchbase collection (required).

index_name

String

Name of the Couchbase index (required).

search_query

String

The query to search for in the vector store.

ingest_data

Data

The data to ingest into the vector store (list of Data objects).

embedding

Embeddings

The embedding function to use for the vector store.

number_of_results

Integer

Number of results to return from the search. Default: 4 (advanced).

Outputs
Name Type Description

vector_store

CouchbaseVectorStore

A Couchbase vector store instance configured with the specified parameters.

Component code

couchbase.py
from datetime import timedelta

from langchain_community.vectorstores import CouchbaseVectorStore

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import DataInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput
from langflow.schema import Data


class CouchbaseVectorStoreComponent(LCVectorStoreComponent):
    display_name = "Couchbase"
    description = "Couchbase Vector Store with search capabilities"
    documentation = "https://python.langchain.com/v0.1/docs/integrations/document_loaders/couchbase/"
    name = "Couchbase"
    icon = "Couchbase"

    inputs = [
        SecretStrInput(
            name="couchbase_connection_string", display_name="Couchbase Cluster connection string", required=True
        ),
        StrInput(name="couchbase_username", display_name="Couchbase username", required=True),
        SecretStrInput(name="couchbase_password", display_name="Couchbase password", required=True),
        StrInput(name="bucket_name", display_name="Bucket Name", required=True),
        StrInput(name="scope_name", display_name="Scope Name", required=True),
        StrInput(name="collection_name", display_name="Collection Name", required=True),
        StrInput(name="index_name", display_name="Index Name", required=True),
        MultilineInput(name="search_query", display_name="Search Query"),
        DataInput(
            name="ingest_data",
            display_name="Ingest Data",
            is_list=True,
        ),
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> CouchbaseVectorStore:
        try:
            from couchbase.auth import PasswordAuthenticator
            from couchbase.cluster import Cluster
            from couchbase.options import ClusterOptions
        except ImportError as e:
            msg = "Failed to import Couchbase dependencies. Install it using `pip install langflow[couchbase] --pre`"
            raise ImportError(msg) from e

        try:
            auth = PasswordAuthenticator(self.couchbase_username, self.couchbase_password)
            options = ClusterOptions(auth)
            cluster = Cluster(self.couchbase_connection_string, options)

            cluster.wait_until_ready(timedelta(seconds=5))
        except Exception as e:
            msg = f"Failed to connect to Couchbase: {e}"
            raise ValueError(msg) from e

        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        if documents:
            couchbase_vs = CouchbaseVectorStore.from_documents(
                documents=documents,
                cluster=cluster,
                bucket_name=self.bucket_name,
                scope_name=self.scope_name,
                collection_name=self.collection_name,
                embedding=self.embedding,
                index_name=self.index_name,
            )

        else:
            couchbase_vs = CouchbaseVectorStore(
                cluster=cluster,
                bucket_name=self.bucket_name,
                scope_name=self.scope_name,
                collection_name=self.collection_name,
                embedding=self.embedding,
                index_name=self.index_name,
            )

        return couchbase_vs

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

FAISS

This component creates a FAISS Vector Store with search capabilities.

For more information, see the FAISS documentation.

Parameters

Inputs
Name Type Description

index_name

String

The name of the FAISS index. Default: "langflow_index".

persist_directory

String

Path to save the FAISS index. It will be relative to where Langflow is running.

search_query

String

The query to search for in the vector store.

ingest_data

Data

The data to ingest into the vector store (list of Data objects or documents).

allow_dangerous_deserialization

Boolean

Set to True to allow loading pickle files from untrusted sources. Default: True (advanced).

embedding

Embeddings

The embedding function to use for the vector store.

number_of_results

Integer

Number of results to return from the search. Default: 4 (advanced).

Outputs
Name Type Description

vector_store

FAISS

A FAISS vector store instance configured with the specified parameters.

Component code

faiss.py
from langchain_community.vectorstores import FAISS
from loguru import logger

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import BoolInput, DataInput, HandleInput, IntInput, MultilineInput, StrInput
from langflow.schema import Data


class FaissVectorStoreComponent(LCVectorStoreComponent):
    """FAISS Vector Store with search capabilities."""

    display_name: str = "FAISS"
    description: str = "FAISS Vector Store with search capabilities"
    documentation = "https://python.langchain.com/docs/modules/data_connection/vectorstores/integrations/faiss"
    name = "FAISS"
    icon = "FAISS"

    inputs = [
        StrInput(
            name="index_name",
            display_name="Index Name",
            value="langflow_index",
        ),
        StrInput(
            name="persist_directory",
            display_name="Persist Directory",
            info="Path to save the FAISS index. It will be relative to where Langflow is running.",
        ),
        MultilineInput(
            name="search_query",
            display_name="Search Query",
        ),
        DataInput(
            name="ingest_data",
            display_name="Ingest Data",
            is_list=True,
        ),
        BoolInput(
            name="allow_dangerous_deserialization",
            display_name="Allow Dangerous Deserialization",
            info="Set to True to allow loading pickle files from untrusted sources. "
            "Only enable this if you trust the source of the data.",
            advanced=True,
            value=True,
        ),
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            advanced=True,
            value=4,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> FAISS:
        """Builds the FAISS object."""
        if not self.persist_directory:
            msg = "Folder path is required to save the FAISS index."
            raise ValueError(msg)
        path = self.resolve_path(self.persist_directory)

        documents = []

        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        faiss = FAISS.from_documents(documents=documents, embedding=self.embedding)
        faiss.save_local(str(path), self.index_name)

        return faiss

    def search_documents(self) -> list[Data]:
        """Search for documents in the FAISS vector store."""
        if not self.persist_directory:
            msg = "Folder path is required to load the FAISS index."
            raise ValueError(msg)
        path = self.resolve_path(self.persist_directory)

        vector_store = FAISS.load_local(
            folder_path=path,
            embeddings=self.embedding,
            index_name=self.index_name,
            allow_dangerous_deserialization=self.allow_dangerous_deserialization,
        )

        if not vector_store:
            msg = "Failed to load the FAISS index."
            raise ValueError(msg)

        logger.debug(f"Search input: {self.search_query}")
        logger.debug(f"Number of results: {self.number_of_results}")

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )

            logger.debug(f"Retrieved documents: {len(docs)}")

            data = docs_to_data(docs)
            logger.debug(f"Converted documents to data: {len(data)}")
            logger.debug(data)
            return data  # Return the search results data
        logger.debug("No search input provided. Skipping search.")
        return []

Hyper-Converged Database (HCD) Vector Store

This component implements a Vector Store using Hyper-Converged Database (HCD).

Parameters

Inputs
Name Display Name Info

collection_name

Collection Name

The name of the collection within HCD where the vectors will be stored (required)

username

HCD Username

Authentication username for accessing HCD (default: "hcd-superuser", required)

password

HCD Password

Authentication password for accessing HCD (required)

api_endpoint

HCD API Endpoint

API endpoint URL for the HCD service (required)

search_input

Search Input

Query string for similarity search

ingest_data

Ingest Data

Data to be ingested into the vector store

namespace

Namespace

Optional namespace within HCD to use for the collection (default: "default_namespace")

ca_certificate

CA Certificate

Optional CA certificate for TLS connections to HCD

metric

Metric

Optional distance metric for vector comparisons (options: "cosine", "dot_product", "euclidean")

batch_size

Batch Size

Optional number of data to process in a single batch

bulk_insert_batch_concurrency

Bulk Insert Batch Concurrency

Optional concurrency level for bulk insert operations

bulk_insert_overwrite_concurrency

Bulk Insert Overwrite Concurrency

Optional concurrency level for bulk insert operations that overwrite existing data

bulk_delete_concurrency

Bulk Delete Concurrency

Optional concurrency level for bulk delete operations

setup_mode

Setup Mode

Configuration mode for setting up the vector store (options: "Sync", "Async", "Off", default: "Sync")

pre_delete_collection

Pre Delete Collection

Boolean flag to determine whether to delete the collection before creating a new one

metadata_indexing_include

Metadata Indexing Include

Optional list of metadata fields to include in the indexing

embedding

Embedding or Astra Vectorize

Allows either an embedding model or an Astra Vectorize configuration

metadata_indexing_exclude

Metadata Indexing Exclude

Optional list of metadata fields to exclude from the indexing

collection_indexing_policy

Collection Indexing Policy

Optional dictionary defining the indexing policy for the collection

number_of_results

Number of Results

Number of results to return in similarity search (default: 4)

search_type

Search Type

Search type to use (options: "Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)", default: "Similarity")

search_score_threshold

Search Score Threshold

Minimum similarity score threshold for search results (default: 0)

search_filter

Search Metadata Filter

Optional dictionary of filters to apply to the search query

Outputs
Name Display Name Info

vector_store

Vector Store

Built HCD vector store instance

search_results

Search Results

Results of similarity search as a list of Data objects

Component code

hcd.py
from loguru import logger

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers import docs_to_data
from langflow.inputs import DictInput, FloatInput
from langflow.io import (
    BoolInput,
    DataInput,
    DropdownInput,
    HandleInput,
    IntInput,
    MultilineInput,
    SecretStrInput,
    StrInput,
)
from langflow.schema import Data


class HCDVectorStoreComponent(LCVectorStoreComponent):
    display_name: str = "Hyper-Converged Database"
    description: str = "Implementation of Vector Store using Hyper-Converged Database (HCD) with search capabilities"
    documentation: str = "https://python.langchain.com/docs/integrations/vectorstores/astradb"
    name = "HCD"
    icon: str = "HCD"

    inputs = [
        StrInput(
            name="collection_name",
            display_name="Collection Name",
            info="The name of the collection within HCD where the vectors will be stored.",
            required=True,
        ),
        StrInput(
            name="username",
            display_name="HCD Username",
            info="Authentication username for accessing HCD.",
            value="hcd-superuser",
            required=True,
        ),
        SecretStrInput(
            name="password",
            display_name="HCD Password",
            info="Authentication password for accessing HCD.",
            value="HCD_PASSWORD",
            required=True,
        ),
        SecretStrInput(
            name="api_endpoint",
            display_name="HCD API Endpoint",
            info="API endpoint URL for the HCD service.",
            value="HCD_API_ENDPOINT",
            required=True,
        ),
        MultilineInput(
            name="search_input",
            display_name="Search Input",
        ),
        DataInput(
            name="ingest_data",
            display_name="Ingest Data",
            is_list=True,
        ),
        StrInput(
            name="namespace",
            display_name="Namespace",
            info="Optional namespace within HCD to use for the collection.",
            value="default_namespace",
            advanced=True,
        ),
        MultilineInput(
            name="ca_certificate",
            display_name="CA Certificate",
            info="Optional CA certificate for TLS connections to HCD.",
            advanced=True,
        ),
        DropdownInput(
            name="metric",
            display_name="Metric",
            info="Optional distance metric for vector comparisons in the vector store.",
            options=["cosine", "dot_product", "euclidean"],
            advanced=True,
        ),
        IntInput(
            name="batch_size",
            display_name="Batch Size",
            info="Optional number of data to process in a single batch.",
            advanced=True,
        ),
        IntInput(
            name="bulk_insert_batch_concurrency",
            display_name="Bulk Insert Batch Concurrency",
            info="Optional concurrency level for bulk insert operations.",
            advanced=True,
        ),
        IntInput(
            name="bulk_insert_overwrite_concurrency",
            display_name="Bulk Insert Overwrite Concurrency",
            info="Optional concurrency level for bulk insert operations that overwrite existing data.",
            advanced=True,
        ),
        IntInput(
            name="bulk_delete_concurrency",
            display_name="Bulk Delete Concurrency",
            info="Optional concurrency level for bulk delete operations.",
            advanced=True,
        ),
        DropdownInput(
            name="setup_mode",
            display_name="Setup Mode",
            info="Configuration mode for setting up the vector store, with options like 'Sync', 'Async', or 'Off'.",
            options=["Sync", "Async", "Off"],
            advanced=True,
            value="Sync",
        ),
        BoolInput(
            name="pre_delete_collection",
            display_name="Pre Delete Collection",
            info="Boolean flag to determine whether to delete the collection before creating a new one.",
            advanced=True,
        ),
        StrInput(
            name="metadata_indexing_include",
            display_name="Metadata Indexing Include",
            info="Optional list of metadata fields to include in the indexing.",
            advanced=True,
        ),
        HandleInput(
            name="embedding",
            display_name="Embedding or Astra Vectorize",
            input_types=["Embeddings", "dict"],
            # TODO: This should be optional, but need to refactor langchain-astradb first.
            info="Allows either an embedding model or an Astra Vectorize configuration.",
        ),
        StrInput(
            name="metadata_indexing_exclude",
            display_name="Metadata Indexing Exclude",
            info="Optional list of metadata fields to exclude from the indexing.",
            advanced=True,
        ),
        StrInput(
            name="collection_indexing_policy",
            display_name="Collection Indexing Policy",
            info="Optional dictionary defining the indexing policy for the collection.",
            advanced=True,
        ),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            advanced=True,
            value=4,
        ),
        DropdownInput(
            name="search_type",
            display_name="Search Type",
            info="Search type to use",
            options=["Similarity", "Similarity with score threshold", "MMR (Max Marginal Relevance)"],
            value="Similarity",
            advanced=True,
        ),
        FloatInput(
            name="search_score_threshold",
            display_name="Search Score Threshold",
            info="Minimum similarity score threshold for search results. "
            "(when using 'Similarity with score threshold')",
            value=0,
            advanced=True,
        ),
        DictInput(
            name="search_filter",
            display_name="Search Metadata Filter",
            info="Optional dictionary of filters to apply to the search query.",
            advanced=True,
            is_list=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self):
        try:
            from langchain_astradb import AstraDBVectorStore
            from langchain_astradb.utils.astradb import SetupMode
        except ImportError as e:
            msg = (
                "Could not import langchain Astra DB integration package. "
                "Please install it with `pip install langchain-astradb`."
            )
            raise ImportError(msg) from e

        try:
            from astrapy.authentication import UsernamePasswordTokenProvider
            from astrapy.constants import Environment
        except ImportError as e:
            msg = "Could not import astrapy integration package. Please install it with `pip install astrapy`."
            raise ImportError(msg) from e

        try:
            if not self.setup_mode:
                self.setup_mode = self._inputs["setup_mode"].options[0]

            setup_mode_value = SetupMode[self.setup_mode.upper()]
        except KeyError as e:
            msg = f"Invalid setup mode: {self.setup_mode}"
            raise ValueError(msg) from e

        if not isinstance(self.embedding, dict):
            embedding_dict = {"embedding": self.embedding}
        else:
            from astrapy.info import CollectionVectorServiceOptions

            dict_options = self.embedding.get("collection_vector_service_options", {})
            dict_options["authentication"] = {
                k: v for k, v in dict_options.get("authentication", {}).items() if k and v
            }
            dict_options["parameters"] = {k: v for k, v in dict_options.get("parameters", {}).items() if k and v}
            embedding_dict = {
                "collection_vector_service_options": CollectionVectorServiceOptions.from_dict(dict_options)
            }
            collection_embedding_api_key = self.embedding.get("collection_embedding_api_key")
            if collection_embedding_api_key:
                embedding_dict["collection_embedding_api_key"] = collection_embedding_api_key

        token_provider = UsernamePasswordTokenProvider(self.username, self.password)
        vector_store_kwargs = {
            **embedding_dict,
            "collection_name": self.collection_name,
            "token": token_provider,
            "api_endpoint": self.api_endpoint,
            "namespace": self.namespace,
            "metric": self.metric or None,
            "batch_size": self.batch_size or None,
            "bulk_insert_batch_concurrency": self.bulk_insert_batch_concurrency or None,
            "bulk_insert_overwrite_concurrency": self.bulk_insert_overwrite_concurrency or None,
            "bulk_delete_concurrency": self.bulk_delete_concurrency or None,
            "setup_mode": setup_mode_value,
            "pre_delete_collection": self.pre_delete_collection or False,
            "environment": Environment.HCD,
        }

        if self.metadata_indexing_include:
            vector_store_kwargs["metadata_indexing_include"] = self.metadata_indexing_include
        elif self.metadata_indexing_exclude:
            vector_store_kwargs["metadata_indexing_exclude"] = self.metadata_indexing_exclude
        elif self.collection_indexing_policy:
            vector_store_kwargs["collection_indexing_policy"] = self.collection_indexing_policy

        try:
            vector_store = AstraDBVectorStore(**vector_store_kwargs)
        except Exception as e:
            msg = f"Error initializing AstraDBVectorStore: {e}"
            raise ValueError(msg) from e

        self._add_documents_to_vector_store(vector_store)
        return vector_store

    def _add_documents_to_vector_store(self, vector_store) -> None:
        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                msg = "Vector Store Inputs must be Data objects."
                raise TypeError(msg)

        if documents:
            logger.debug(f"Adding {len(documents)} documents to the Vector Store.")
            try:
                vector_store.add_documents(documents)
            except Exception as e:
                msg = f"Error adding documents to AstraDBVectorStore: {e}"
                raise ValueError(msg) from e
        else:
            logger.debug("No documents to add to the Vector Store.")

    def _map_search_type(self) -> str:
        if self.search_type == "Similarity with score threshold":
            return "similarity_score_threshold"
        if self.search_type == "MMR (Max Marginal Relevance)":
            return "mmr"
        return "similarity"

    def _build_search_args(self):
        args = {
            "k": self.number_of_results,
            "score_threshold": self.search_score_threshold,
        }

        if self.search_filter:
            clean_filter = {k: v for k, v in self.search_filter.items() if k and v}
            if len(clean_filter) > 0:
                args["filter"] = clean_filter
        return args

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        logger.debug(f"Search input: {self.search_input}")
        logger.debug(f"Search type: {self.search_type}")
        logger.debug(f"Number of results: {self.number_of_results}")

        if self.search_input and isinstance(self.search_input, str) and self.search_input.strip():
            try:
                search_type = self._map_search_type()
                search_args = self._build_search_args()

                docs = vector_store.search(query=self.search_input, search_type=search_type, **search_args)
            except Exception as e:
                msg = f"Error performing search in AstraDBVectorStore: {e}"
                raise ValueError(msg) from e

            logger.debug(f"Retrieved documents: {len(docs)}")

            data = docs_to_data(docs)
            logger.debug(f"Converted documents to data: {len(data)}")
            self.status = data
            return data
        logger.debug("No search input provided. Skipping search.")
        return []

    def get_retriever_kwargs(self):
        search_args = self._build_search_args()
        return {
            "search_type": self._map_search_type(),
            "search_kwargs": search_args,
        }

Milvus

This component creates a Milvus Vector Store with search capabilities.

For more information, see the Milvus documentation.

Parameters

Inputs
Name Type Description

collection_name

String

Name of the Milvus collection

collection_description

String

Description of the Milvus collection

uri

String

Connection URI for Milvus

password

SecretString

Connection password (if required)

connection_args

Dict

Additional connection arguments

primary_field

String

Name of the primary field

text_field

String

Name of the text field

vector_field

String

Name of the vector field

consistency_level

String

Consistency level for operations

index_params

Dict

Parameters for indexing

search_params

Dict

Parameters for searching

drop_old

Boolean

Whether to drop old collection

timeout

Float

Timeout for operations

search_query

String

Query for similarity search

ingest_data

Data

Data to be ingested into the vector store

embedding

Embeddings

Embedding function to use

number_of_results

Integer

Number of results to return in search

Outputs
Name Type Description

vector_store

Milvus

Milvus vector store instance

search_results

List[Data]

Results of similarity search

Component code

milvus.py
from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import (
    BoolInput,
    DataInput,
    DictInput,
    DropdownInput,
    FloatInput,
    HandleInput,
    IntInput,
    MultilineInput,
    SecretStrInput,
    StrInput,
)
from langflow.schema import Data


class MilvusVectorStoreComponent(LCVectorStoreComponent):
    """Milvus vector store with search capabilities."""

    display_name: str = "Milvus"
    description: str = "Milvus vector store with search capabilities"
    documentation = "https://python.langchain.com/docs/integrations/vectorstores/milvus"
    name = "Milvus"
    icon = "Milvus"

    inputs = [
        StrInput(name="collection_name", display_name="Collection Name", value="langflow"),
        StrInput(name="collection_description", display_name="Collection Description", value=""),
        StrInput(
            name="uri",
            display_name="Connection URI",
            value="http://localhost:19530",
        ),
        SecretStrInput(
            name="password",
            display_name="Token",
            value="",
            info="Ignore this field if no token is required to make connection.",
        ),
        DictInput(name="connection_args", display_name="Other Connection Arguments", advanced=True),
        StrInput(name="primary_field", display_name="Primary Field Name", value="pk"),
        StrInput(name="text_field", display_name="Text Field Name", value="text"),
        StrInput(name="vector_field", display_name="Vector Field Name", value="vector"),
        DropdownInput(
            name="consistency_level",
            display_name="Consistencey Level",
            options=["Bounded", "Session", "Strong", "Eventual"],
            value="Session",
            advanced=True,
        ),
        DictInput(name="index_params", display_name="Index Parameters", advanced=True),
        DictInput(name="search_params", display_name="Search Parameters", advanced=True),
        BoolInput(name="drop_old", display_name="Drop Old Collection", value=False, advanced=True),
        FloatInput(name="timeout", display_name="Timeout", advanced=True),
        MultilineInput(name="search_query", display_name="Search Query"),
        DataInput(
            name="ingest_data",
            display_name="Ingest Data",
            is_list=True,
        ),
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self):
        try:
            from langchain_milvus.vectorstores import Milvus as LangchainMilvus
        except ImportError as e:
            msg = "Could not import Milvus integration package. Please install it with `pip install langchain-milvus`."
            raise ImportError(msg) from e
        self.connection_args.update(uri=self.uri, token=self.password)
        milvus_store = LangchainMilvus(
            embedding_function=self.embedding,
            collection_name=self.collection_name,
            collection_description=self.collection_description,
            connection_args=self.connection_args,
            consistency_level=self.consistency_level,
            index_params=self.index_params,
            search_params=self.search_params,
            drop_old=self.drop_old,
            auto_id=True,
            primary_field=self.primary_field,
            text_field=self.text_field,
            vector_field=self.vector_field,
            timeout=self.timeout,
        )

        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        if documents:
            milvus_store.add_documents(documents)

        return milvus_store

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

MongoDB Atlas

This component creates a MongoDB Atlas Vector Store with search capabilities. For more information, see the MongoDB Atlas documentation.

Parameters

Inputs
Name Type Description

mongodb_atlas_cluster_uri

SecretString

MongoDB Atlas Cluster URI

db_name

String

Database name

collection_name

String

Collection name

index_name

String

Index name

search_query

String

Query for similarity search

ingest_data

Data

Data to be ingested into the vector store

embedding

Embeddings

Embedding function to use

number_of_results

Integer

Number of results to return in search

Outputs
Name Type Description

vector_store

MongoDBAtlasVectorSearch

MongoDB Atlas vector store instance

search_results

List[Data]

Results of similarity search

Component code

mongodb_atlas.py
import tempfile

import certifi
from langchain_community.vectorstores import MongoDBAtlasVectorSearch

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import BoolInput, DataInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput
from langflow.schema import Data


class MongoVectorStoreComponent(LCVectorStoreComponent):
    display_name = "MongoDB Atlas"
    description = "MongoDB Atlas Vector Store with search capabilities"
    documentation = "https://python.langchain.com/docs/modules/data_connection/vectorstores/integrations/mongodb_atlas"
    name = "MongoDBAtlasVector"
    icon = "MongoDB"

    inputs = [
        SecretStrInput(name="mongodb_atlas_cluster_uri", display_name="MongoDB Atlas Cluster URI", required=True),
        BoolInput(name="enable_mtls", display_name="Enable mTLS", value=False, advanced=True, required=True),
        SecretStrInput(
            name="mongodb_atlas_client_cert",
            display_name="MongoDB Atlas Combined Client Certificate",
            required=False,
            info="Client Certificate combined with the private key in the following format:\n "
            "-----BEGIN PRIVATE KEY-----\n...\n -----END PRIVATE KEY-----\n-----BEGIN CERTIFICATE-----\n"
            "...\n-----END CERTIFICATE-----\n",
        ),
        StrInput(name="db_name", display_name="Database Name", required=True),
        StrInput(name="collection_name", display_name="Collection Name", required=True),
        StrInput(name="index_name", display_name="Index Name", required=True),
        MultilineInput(name="search_query", display_name="Search Query"),
        DataInput(
            name="ingest_data",
            display_name="Ingest Data",
            is_list=True,
        ),
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> MongoDBAtlasVectorSearch:
        try:
            from pymongo import MongoClient
        except ImportError as e:
            msg = "Please install pymongo to use MongoDB Atlas Vector Store"
            raise ImportError(msg) from e

        # Create temporary files for the client certificate
        if self.enable_mtls:
            client_cert_path = None
            try:
                client_cert = self.mongodb_atlas_client_cert.replace(" ", "\n")
                client_cert = client_cert.replace("-----BEGIN\nPRIVATE\nKEY-----", "-----BEGIN PRIVATE KEY-----")
                client_cert = client_cert.replace(
                    "-----END\nPRIVATE\nKEY-----\n-----BEGIN\nCERTIFICATE-----",
                    "-----END PRIVATE KEY-----\n-----BEGIN CERTIFICATE-----",
                )
                client_cert = client_cert.replace("-----END\nCERTIFICATE-----", "-----END CERTIFICATE-----")
                with tempfile.NamedTemporaryFile(delete=False) as client_cert_file:
                    client_cert_file.write(client_cert.encode("utf-8"))
                    client_cert_path = client_cert_file.name

            except Exception as e:
                msg = f"Failed to write certificate to temporary file: {e}"
                raise ValueError(msg) from e

        try:
            mongo_client: MongoClient = (
                MongoClient(
                    self.mongodb_atlas_cluster_uri,
                    tls=True,
                    tlsCertificateKeyFile=client_cert_path,
                    tlsCAFile=certifi.where(),
                )
                if self.enable_mtls
                else MongoClient(self.mongodb_atlas_cluster_uri)
            )

            collection = mongo_client[self.db_name][self.collection_name]
            collection.drop()  # Drop collection to override the vector store
        except Exception as e:
            msg = f"Failed to connect to MongoDB Atlas: {e}"
            raise ValueError(msg) from e

        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        if documents:
            return MongoDBAtlasVectorSearch.from_documents(
                documents=documents, embedding=self.embedding, collection=collection, index_name=self.index_name
            )
        return MongoDBAtlasVectorSearch(
            embedding=self.embedding,
            collection=collection,
            index_name=self.index_name,
        )

    def search_documents(self) -> list[Data]:
        from bson.objectid import ObjectId

        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str):
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )
            for doc in docs:
                doc.metadata = {
                    key: str(value) if isinstance(value, ObjectId) else value for key, value in doc.metadata.items()
                }

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

PGVector

This component creates a PGVector Vector Store with search capabilities. For more information, see the PGVector documentation.

Parameters

Inputs
Name Type Description

pg_server_url

SecretString

PostgreSQL server connection string

collection_name

String

Table name for the vector store

search_query

String

Query for similarity search

ingest_data

Data

Data to be ingested into the vector store

embedding

Embeddings

Embedding function to use

number_of_results

Integer

Number of results to return in search

Outputs
Name Type Description

vector_store

PGVector

PGVector vector store instance

search_results

List[Data]

Results of similarity search

Component code

pgvector.py
from langchain_community.vectorstores import PGVector

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import DataInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput
from langflow.schema import Data
from langflow.utils.connection_string_parser import transform_connection_string


class PGVectorStoreComponent(LCVectorStoreComponent):
    display_name = "PGVector"
    description = "PGVector Vector Store with search capabilities"
    documentation = "https://python.langchain.com/v0.2/docs/integrations/vectorstores/pgvector/"
    name = "pgvector"
    icon = "cpu"

    inputs = [
        SecretStrInput(name="pg_server_url", display_name="PostgreSQL Server Connection String", required=True),
        StrInput(name="collection_name", display_name="Table", required=True),
        MultilineInput(name="search_query", display_name="Search Query"),
        DataInput(
            name="ingest_data",
            display_name="Ingestion Data",
            is_list=True,
        ),
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> PGVector:
        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        connection_string_parsed = transform_connection_string(self.pg_server_url)

        if documents:
            pgvector = PGVector.from_documents(
                embedding=self.embedding,
                documents=documents,
                collection_name=self.collection_name,
                connection_string=connection_string_parsed,
            )
        else:
            pgvector = PGVector.from_existing_index(
                embedding=self.embedding,
                collection_name=self.collection_name,
                connection_string=connection_string_parsed,
            )

        return pgvector

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

Pinecone

This component creates a Pinecone Vector Store with search capabilities.

For more information, see the Pinecone documentation.

Parameters

Inputs
Name Type Description

index_name

String

Name of the Pinecone index

namespace

String

Namespace for the index

distance_strategy

String

Strategy for calculating distance between vectors

pinecone_api_key

SecretString

API key for Pinecone

text_key

String

Key in the record to use as text

search_query

String

Query for similarity search

ingest_data

Data

Data to be ingested into the vector store

embedding

Embeddings

Embedding function to use

number_of_results

Integer

Number of results to return in search

Outputs
Name Type Description

vector_store

Pinecone

Pinecone vector store instance

search_results

List[Data]

Results of similarity search

Component code

pinecone.py
import numpy as np
from langchain_pinecone import Pinecone

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import DataInput, DropdownInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput
from langflow.schema import Data


class PineconeVectorStoreComponent(LCVectorStoreComponent):
    display_name = "Pinecone"
    description = "Pinecone Vector Store with search capabilities"
    documentation = "https://python.langchain.com/v0.2/docs/integrations/vectorstores/pinecone/"
    name = "Pinecone"
    icon = "Pinecone"
    inputs = [
        StrInput(name="index_name", display_name="Index Name", required=True),
        StrInput(name="namespace", display_name="Namespace", info="Namespace for the index."),
        DropdownInput(
            name="distance_strategy",
            display_name="Distance Strategy",
            options=["Cosine", "Euclidean", "Dot Product"],
            value="Cosine",
            advanced=True,
        ),
        SecretStrInput(name="pinecone_api_key", display_name="Pinecone API Key", required=True),
        StrInput(
            name="text_key",
            display_name="Text Key",
            info="Key in the record to use as text.",
            value="text",
            advanced=True,
        ),
        MultilineInput(name="search_query", display_name="Search Query"),
        DataInput(
            name="ingest_data",
            display_name="Ingest Data",
            is_list=True,
        ),
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> Pinecone:
        """Build and return a Pinecone vector store instance."""
        try:
            from langchain_pinecone._utilities import DistanceStrategy

            # Wrap the embedding model to ensure float32 output
            wrapped_embeddings = Float32Embeddings(self.embedding)

            # Convert distance strategy
            distance_strategy = self.distance_strategy.replace(" ", "_").upper()
            distance_strategy = DistanceStrategy[distance_strategy]

            # Initialize Pinecone instance with wrapped embeddings
            pinecone = Pinecone(
                index_name=self.index_name,
                embedding=wrapped_embeddings,  # Use wrapped embeddings
                text_key=self.text_key,
                namespace=self.namespace,
                distance_strategy=distance_strategy,
                pinecone_api_key=self.pinecone_api_key,
            )
        except Exception as e:
            error_msg = "Error building Pinecone vector store"
            raise ValueError(error_msg) from e
        else:
            # Process documents if any
            documents = []
            if self.ingest_data:
                for doc in self.ingest_data:
                    if isinstance(doc, Data):
                        documents.append(doc.to_lc_document())
                    else:
                        documents.append(doc)

                if documents:
                    pinecone.add_documents(documents)

            return pinecone

    def search_documents(self) -> list[Data]:
        """Search documents in the vector store."""
        try:
            if not self.search_query or not isinstance(self.search_query, str) or not self.search_query.strip():
                return []

            vector_store = self.build_vector_store()
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )
        except Exception as e:
            error_msg = "Error searching documents"
            raise ValueError(error_msg) from e
        else:
            data = docs_to_data(docs)
            self.status = data
            return data


class Float32Embeddings:
    """Wrapper class to ensure float32 embeddings."""

    def __init__(self, base_embeddings):
        self.base_embeddings = base_embeddings

    def embed_documents(self, texts):
        embeddings = self.base_embeddings.embed_documents(texts)
        if isinstance(embeddings, np.ndarray):
            return [[self._force_float32(x) for x in vec] for vec in embeddings]
        return [[self._force_float32(x) for x in vec] for vec in embeddings]

    def embed_query(self, text):
        embedding = self.base_embeddings.embed_query(text)
        if isinstance(embedding, np.ndarray):
            return [self._force_float32(x) for x in embedding]
        return [self._force_float32(x) for x in embedding]

    def _force_float32(self, value):
        """Convert any numeric type to Python float."""
        return float(np.float32(value))

Qdrant

This component creates a Qdrant Vector Store with search capabilities. For more information, see the Qdrant documentation.

Parameters

Inputs
Name Type Description

collection_name

String

Name of the Qdrant collection

host

String

Qdrant server host

port

Integer

Qdrant server port

grpc_port

Integer

Qdrant gRPC port

api_key

SecretString

API key for Qdrant

prefix

String

Prefix for Qdrant

timeout

Integer

Timeout for Qdrant operations

path

String

Path for Qdrant

url

String

URL for Qdrant

distance_func

String

Distance function for vector similarity

content_payload_key

String

Key for content payload

metadata_payload_key

String

Key for metadata payload

search_query

String

Query for similarity search

ingest_data

Data

Data to be ingested into the vector store

embedding

Embeddings

Embedding function to use

number_of_results

Integer

Number of results to return in search

Outputs
Name Type Description

vector_store

Qdrant

Qdrant vector store instance

search_results

List[Data]

Results of similarity search

Component code

qdrant.py
from langchain.embeddings.base import Embeddings
from langchain_community.vectorstores import Qdrant

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import (
    DataInput,
    DropdownInput,
    HandleInput,
    IntInput,
    MultilineInput,
    SecretStrInput,
    StrInput,
)
from langflow.schema import Data


class QdrantVectorStoreComponent(LCVectorStoreComponent):
    display_name = "Qdrant"
    description = "Qdrant Vector Store with search capabilities"
    documentation = "https://python.langchain.com/docs/modules/data_connection/vectorstores/integrations/qdrant"
    icon = "Qdrant"

    inputs = [
        StrInput(name="collection_name", display_name="Collection Name", required=True),
        StrInput(name="host", display_name="Host", value="localhost", advanced=True),
        IntInput(name="port", display_name="Port", value=6333, advanced=True),
        IntInput(name="grpc_port", display_name="gRPC Port", value=6334, advanced=True),
        SecretStrInput(name="api_key", display_name="API Key", advanced=True),
        StrInput(name="prefix", display_name="Prefix", advanced=True),
        IntInput(name="timeout", display_name="Timeout", advanced=True),
        StrInput(name="path", display_name="Path", advanced=True),
        StrInput(name="url", display_name="URL", advanced=True),
        DropdownInput(
            name="distance_func",
            display_name="Distance Function",
            options=["Cosine", "Euclidean", "Dot Product"],
            value="Cosine",
            advanced=True,
        ),
        StrInput(name="content_payload_key", display_name="Content Payload Key", value="page_content", advanced=True),
        StrInput(name="metadata_payload_key", display_name="Metadata Payload Key", value="metadata", advanced=True),
        MultilineInput(name="search_query", display_name="Search Query"),
        DataInput(
            name="ingest_data",
            display_name="Ingest Data",
            is_list=True,
        ),
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> Qdrant:
        qdrant_kwargs = {
            "collection_name": self.collection_name,
            "content_payload_key": self.content_payload_key,
            "metadata_payload_key": self.metadata_payload_key,
        }

        server_kwargs = {
            "host": self.host or None,
            "port": int(self.port),  # Ensure port is an integer
            "grpc_port": int(self.grpc_port),  # Ensure grpc_port is an integer
            "api_key": self.api_key,
            "prefix": self.prefix,
            # Ensure timeout is an integer
            "timeout": int(self.timeout) if self.timeout else None,
            "path": self.path or None,
            "url": self.url or None,
        }

        server_kwargs = {k: v for k, v in server_kwargs.items() if v is not None}
        documents = []

        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        if not isinstance(self.embedding, Embeddings):
            msg = "Invalid embedding object"
            raise TypeError(msg)

        if documents:
            qdrant = Qdrant.from_documents(documents, embedding=self.embedding, **qdrant_kwargs, **server_kwargs)
        else:
            from qdrant_client import QdrantClient

            client = QdrantClient(**server_kwargs)
            qdrant = Qdrant(embeddings=self.embedding, client=client, **qdrant_kwargs)

        return qdrant

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

Redis

This component creates a Redis Vector Store with search capabilities. For more information, see the Redis documentation.

Parameters

Inputs
Name Type Description

redis_server_url

SecretString

Redis server connection string

redis_index_name

String

Name of the Redis index

code

String

Custom code for Redis (advanced)

schema

String

Schema for Redis index

search_query

String

Query for similarity search

ingest_data

Data

Data to be ingested into the vector store

number_of_results

Integer

Number of results to return in search

embedding

Embeddings

Embedding function to use

Outputs
Name Type Description

vector_store

Redis

Redis vector store instance

search_results

List[Data]

Results of similarity search

Component code

redis.py
from pathlib import Path

from langchain.text_splitter import CharacterTextSplitter
from langchain_community.vectorstores.redis import Redis

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import DataInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput
from langflow.schema import Data


class RedisVectorStoreComponent(LCVectorStoreComponent):
    """A custom component for implementing a Vector Store using Redis."""

    display_name: str = "Redis"
    description: str = "Implementation of Vector Store using Redis"
    documentation = "https://python.langchain.com/docs/integrations/vectorstores/redis"
    name = "Redis"
    icon = "Redis"

    inputs = [
        SecretStrInput(name="redis_server_url", display_name="Redis Server Connection String", required=True),
        StrInput(
            name="redis_index_name",
            display_name="Redis Index",
        ),
        StrInput(name="code", display_name="Code", advanced=True),
        StrInput(
            name="schema",
            display_name="Schema",
        ),
        MultilineInput(name="search_query", display_name="Search Query"),
        DataInput(
            name="ingest_data",
            display_name="Ingest Data",
            is_list=True,
        ),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> Redis:
        documents = []

        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)
        Path("docuemnts.txt").write_text(str(documents), encoding="utf-8")

        if not documents:
            if self.schema is None:
                msg = "If no documents are provided, a schema must be provided."
                raise ValueError(msg)
            redis_vs = Redis.from_existing_index(
                embedding=self.embedding,
                index_name=self.redis_index_name,
                schema=self.schema,
                key_prefix=None,
                redis_url=self.redis_server_url,
            )
        else:
            text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0)
            docs = text_splitter.split_documents(documents)
            redis_vs = Redis.from_documents(
                documents=docs,
                embedding=self.embedding,
                redis_url=self.redis_server_url,
                index_name=self.redis_index_name,
            )
        return redis_vs

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

Supabase

This component creates a connection to a Supabase Vector Store with search capabilities.

For more information, see the Supabase documentation.

Parameters

Inputs
Name Type Description

supabase_url

String

URL of the Supabase instance

supabase_service_key

SecretString

Service key for Supabase authentication

table_name

String

Name of the table in Supabase

query_name

String

Name of the query to use

search_query

String

Query for similarity search

ingest_data

Data

Data to be ingested into the vector store

embedding

Embeddings

Embedding function to use

number_of_results

Integer

Number of results to return in search

Outputs
Name Type Description

vector_store

SupabaseVectorStore

Supabase vector store instance

search_results

List[Data]

Results of similarity search

Component code

supabase.py
from langchain_community.vectorstores import SupabaseVectorStore
from supabase.client import Client, create_client

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import DataInput, HandleInput, IntInput, MultilineInput, SecretStrInput, StrInput
from langflow.schema import Data


class SupabaseVectorStoreComponent(LCVectorStoreComponent):
    display_name = "Supabase"
    description = "Supabase Vector Store with search capabilities"
    documentation = "https://python.langchain.com/v0.2/docs/integrations/vectorstores/supabase/"
    name = "SupabaseVectorStore"
    icon = "Supabase"

    inputs = [
        StrInput(name="supabase_url", display_name="Supabase URL", required=True),
        SecretStrInput(name="supabase_service_key", display_name="Supabase Service Key", required=True),
        StrInput(name="table_name", display_name="Table Name", advanced=True),
        StrInput(name="query_name", display_name="Query Name"),
        MultilineInput(name="search_query", display_name="Search Query"),
        DataInput(
            name="ingest_data",
            display_name="Ingest Data",
            is_list=True,
        ),
        HandleInput(name="embedding", display_name="Embedding", input_types=["Embeddings"]),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> SupabaseVectorStore:
        supabase: Client = create_client(self.supabase_url, supabase_key=self.supabase_service_key)

        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        if documents:
            supabase_vs = SupabaseVectorStore.from_documents(
                documents=documents,
                embedding=self.embedding,
                query_name=self.query_name,
                client=supabase,
                table_name=self.table_name,
            )
        else:
            supabase_vs = SupabaseVectorStore(
                client=supabase,
                embedding=self.embedding,
                table_name=self.table_name,
                query_name=self.query_name,
            )

        return supabase_vs

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

Upstash

This component creates an Upstash Vector Store with search capabilities. For more information, see the Upstash documentation.

Parameters

Inputs
Name Type Description

index_url

String

The URL of the Upstash index

index_token

SecretString

The token for the Upstash index

text_key

String

The key in the record to use as text

namespace

String

Namespace for the index

search_query

String

Query for similarity search

metadata_filter

String

Filters documents by metadata

ingest_data

Data

Data to be ingested into the vector store

embedding

Embeddings

Embedding function to use (optional)

number_of_results

Integer

Number of results to return in search

Outputs
Name Type Description

vector_store

UpstashVectorStore

Upstash vector store instance

search_results

List[Data]

Results of similarity search

Component code

upstash.py
from langchain_community.vectorstores import UpstashVectorStore

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import (
    DataInput,
    HandleInput,
    IntInput,
    MultilineInput,
    SecretStrInput,
    StrInput,
)
from langflow.schema import Data


class UpstashVectorStoreComponent(LCVectorStoreComponent):
    display_name = "Upstash"
    description = "Upstash Vector Store with search capabilities"
    documentation = "https://python.langchain.com/v0.2/docs/integrations/vectorstores/upstash/"
    name = "Upstash"
    icon = "Upstash"

    inputs = [
        StrInput(
            name="index_url",
            display_name="Index URL",
            info="The URL of the Upstash index.",
            required=True,
        ),
        SecretStrInput(
            name="index_token",
            display_name="Index Token",
            info="The token for the Upstash index.",
            required=True,
        ),
        StrInput(
            name="text_key",
            display_name="Text Key",
            info="The key in the record to use as text.",
            value="text",
            advanced=True,
        ),
        StrInput(
            name="namespace",
            display_name="Namespace",
            info="Leave empty for default namespace.",
        ),
        MultilineInput(name="search_query", display_name="Search Query"),
        MultilineInput(
            name="metadata_filter",
            display_name="Metadata Filter",
            info="Filters documents by metadata. Look at the documentation for more information.",
        ),
        DataInput(
            name="ingest_data",
            display_name="Ingest Data",
            is_list=True,
        ),
        HandleInput(
            name="embedding",
            display_name="Embedding",
            input_types=["Embeddings"],
            info="To use Upstash's embeddings, don't provide an embedding.",
        ),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> UpstashVectorStore:
        use_upstash_embedding = self.embedding is None

        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        if documents:
            if use_upstash_embedding:
                upstash_vs = UpstashVectorStore(
                    embedding=use_upstash_embedding,
                    text_key=self.text_key,
                    index_url=self.index_url,
                    index_token=self.index_token,
                    namespace=self.namespace,
                )
                upstash_vs.add_documents(documents)
            else:
                upstash_vs = UpstashVectorStore.from_documents(
                    documents=documents,
                    embedding=self.embedding,
                    text_key=self.text_key,
                    index_url=self.index_url,
                    index_token=self.index_token,
                    namespace=self.namespace,
                )
        else:
            upstash_vs = UpstashVectorStore(
                embedding=self.embedding or use_upstash_embedding,
                text_key=self.text_key,
                index_url=self.index_url,
                index_token=self.index_token,
                namespace=self.namespace,
            )

        return upstash_vs

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
                filter=self.metadata_filter,
            )

            data = docs_to_data(docs)
            self.status = data
            return data
        return []

Vectara

This component creates a Vectara Vector Store with search capabilities. For more information, see the Vectara documentation.

Parameters

Inputs
Name Type Description

vectara_customer_id

String

Vectara customer ID

vectara_corpus_id

String

Vectara corpus ID

vectara_api_key

SecretString

Vectara API key

embedding

Embeddings

Embedding function to use (optional)

ingest_data

List[Document/Data]

Data to be ingested into the vector store

search_query

String

Query for similarity search

number_of_results

Integer

Number of results to return in search

Outputs
Name Type Description

vector_store

Vectara

Vectara vector store instance

search_results

List[Data]

Results of similarity search

Component code

vectara.py
from typing import TYPE_CHECKING

from langchain_community.vectorstores import Vectara
from loguru import logger

from langflow.base.vectorstores.model import LCVectorStoreComponent, check_cached_vector_store
from langflow.helpers.data import docs_to_data
from langflow.io import HandleInput, IntInput, MessageTextInput, SecretStrInput, StrInput
from langflow.schema import Data

if TYPE_CHECKING:
    from langchain_community.vectorstores import Vectara


class VectaraVectorStoreComponent(LCVectorStoreComponent):
    """Vectara Vector Store with search capabilities."""

    display_name: str = "Vectara"
    description: str = "Vectara Vector Store with search capabilities"
    documentation = "https://python.langchain.com/docs/modules/data_connection/vectorstores/integrations/vectara"
    name = "Vectara"
    icon = "Vectara"

    inputs = [
        StrInput(name="vectara_customer_id", display_name="Vectara Customer ID", required=True),
        StrInput(name="vectara_corpus_id", display_name="Vectara Corpus ID", required=True),
        SecretStrInput(name="vectara_api_key", display_name="Vectara API Key", required=True),
        HandleInput(
            name="embedding",
            display_name="Embedding",
            input_types=["Embeddings"],
        ),
        HandleInput(
            name="ingest_data",
            display_name="Ingest Data",
            input_types=["Document", "Data"],
            is_list=True,
        ),
        MessageTextInput(
            name="search_query",
            display_name="Search Query",
        ),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=4,
            advanced=True,
        ),
    ]

    @check_cached_vector_store
    def build_vector_store(self) -> "Vectara":
        """Builds the Vectara object."""
        try:
            from langchain_community.vectorstores import Vectara
        except ImportError as e:
            msg = "Could not import Vectara. Please install it with `pip install langchain-community`."
            raise ImportError(msg) from e

        vectara = Vectara(
            vectara_customer_id=self.vectara_customer_id,
            vectara_corpus_id=self.vectara_corpus_id,
            vectara_api_key=self.vectara_api_key,
        )

        self._add_documents_to_vector_store(vectara)
        return vectara

    def _add_documents_to_vector_store(self, vector_store: "Vectara") -> None:
        """Adds documents to the Vector Store."""
        if not self.ingest_data:
            self.status = "No documents to add to Vectara"
            return

        documents = []
        for _input in self.ingest_data or []:
            if isinstance(_input, Data):
                documents.append(_input.to_lc_document())
            else:
                documents.append(_input)

        if documents:
            logger.debug(f"Adding {len(documents)} documents to Vectara.")
            vector_store.add_documents(documents)
            self.status = f"Added {len(documents)} documents to Vectara"
        else:
            logger.debug("No documents to add to Vectara.")
            self.status = "No valid documents to add to Vectara"

    def search_documents(self) -> list[Data]:
        vector_store = self.build_vector_store()

        if self.search_query and isinstance(self.search_query, str) and self.search_query.strip():
            docs = vector_store.similarity_search(
                query=self.search_query,
                k=self.number_of_results,
            )

            data = docs_to_data(docs)
            self.status = f"Found {len(data)} results for the query: {self.search_query}"
            return data
        self.status = "No search query provided"
        return []

Vectara RAG

This component creates a Vectara RAG pipeline. For more information, see the Vectara documentation.

Parameters

Inputs
Name Type Description

vectara_customer_id

String

Vectara customer ID

vectara_corpus_id

String

Vectara corpus ID

vectara_api_key

SecretString

Vectara API key

search_query

String

The query to receive an answer on

lexical_interpolation

Float

Hybrid search factor

filter

String

Metadata filters for narrowing search

reranker

String

Type of reranker to use

reranker_k

Integer

Number of results to rerank

diversity_bias

Float

Diversity bias for MMR reranker

max_results

Integer

Maximum results to summarize

response_lang

String

Language code for response

prompt

String

Name of the summarizer prompt

Outputs
Name Type Description

answer

Message

Generated response from Vectara RAG

Component code

vectara_rag.py
from langflow.custom import Component
from langflow.field_typing.range_spec import RangeSpec
from langflow.io import DropdownInput, FloatInput, IntInput, MessageTextInput, Output, SecretStrInput, StrInput
from langflow.schema.message import Message


class VectaraRagComponent(Component):
    display_name = "Vectara RAG"
    description = "Vectara's full end to end RAG"
    documentation = "https://docs.vectara.com/docs"
    icon = "Vectara"
    name = "VectaraRAG"
    SUMMARIZER_PROMPTS = [
        "vectara-summary-ext-24-05-sml",
        "vectara-summary-ext-24-05-med-omni",
        "vectara-summary-ext-24-05-large",
        "vectara-summary-ext-24-05-med",
        "vectara-summary-ext-v1.3.0",
    ]

    RERANKER_TYPES = ["mmr", "rerank_multilingual_v1", "none"]

    RESPONSE_LANGUAGES = [
        "auto",
        "eng",
        "spa",
        "fra",
        "zho",
        "deu",
        "hin",
        "ara",
        "por",
        "ita",
        "jpn",
        "kor",
        "rus",
        "tur",
        "fas",
        "vie",
        "tha",
        "heb",
        "nld",
        "ind",
        "pol",
        "ukr",
        "ron",
        "swe",
        "ces",
        "ell",
        "ben",
        "msa",
        "urd",
    ]

    field_order = ["vectara_customer_id", "vectara_corpus_id", "vectara_api_key", "search_query", "reranker"]

    inputs = [
        StrInput(name="vectara_customer_id", display_name="Vectara Customer ID", required=True),
        StrInput(name="vectara_corpus_id", display_name="Vectara Corpus ID", required=True),
        SecretStrInput(name="vectara_api_key", display_name="Vectara API Key", required=True),
        MessageTextInput(name="search_query", display_name="Search Query", info="The query to receive an answer on."),
        FloatInput(
            name="lexical_interpolation",
            display_name="Hybrid Search Factor",
            range_spec=RangeSpec(min=0.005, max=0.1, step=0.005),
            value=0.005,
            advanced=True,
            info="How much to weigh lexical scores compared to the embedding score. "
            "0 means lexical search is not used at all, and 1 means only lexical search is used.",
        ),
        MessageTextInput(
            name="filter",
            display_name="Metadata Filters",
            value="",
            advanced=True,
            info="The filter string to narrow the search to according to metadata attributes.",
        ),
        DropdownInput(
            name="reranker",
            display_name="Reranker Type",
            options=RERANKER_TYPES,
            value=RERANKER_TYPES[0],
            info="How to rerank the retrieved search results.",
        ),
        IntInput(
            name="reranker_k",
            display_name="Number of Results to Rerank",
            value=50,
            range_spec=RangeSpec(min=1, max=100, step=1),
            advanced=True,
        ),
        FloatInput(
            name="diversity_bias",
            display_name="Diversity Bias",
            value=0.2,
            range_spec=RangeSpec(min=0, max=1, step=0.01),
            advanced=True,
            info="Ranges from 0 to 1, with higher values indicating greater diversity (only applies to MMR reranker).",
        ),
        IntInput(
            name="max_results",
            display_name="Max Results to Summarize",
            value=7,
            range_spec=RangeSpec(min=1, max=100, step=1),
            advanced=True,
            info="The maximum number of search results to be available to the prompt.",
        ),
        DropdownInput(
            name="response_lang",
            display_name="Response Language",
            options=RESPONSE_LANGUAGES,
            value="eng",
            advanced=True,
            info="Use the ISO 639-1 or 639-3 language code or auto to automatically detect the language.",
        ),
        DropdownInput(
            name="prompt",
            display_name="Prompt Name",
            options=SUMMARIZER_PROMPTS,
            value=SUMMARIZER_PROMPTS[0],
            advanced=True,
            info="Only vectara-summary-ext-24-05-sml is for Growth customers; "
            "all other prompts are for Scale customers only.",
        ),
    ]

    outputs = [
        Output(name="answer", display_name="Answer", method="generate_response"),
    ]

    def generate_response(
        self,
    ) -> Message:
        text_output = ""

        try:
            from langchain_community.vectorstores import Vectara
            from langchain_community.vectorstores.vectara import RerankConfig, SummaryConfig, VectaraQueryConfig
        except ImportError as e:
            msg = "Could not import Vectara. Please install it with `pip install langchain-community`."
            raise ImportError(msg) from e

        vectara = Vectara(self.vectara_customer_id, self.vectara_corpus_id, self.vectara_api_key)
        rerank_config = RerankConfig(self.reranker, self.reranker_k, self.diversity_bias)
        summary_config = SummaryConfig(
            is_enabled=True, max_results=self.max_results, response_lang=self.response_lang, prompt_name=self.prompt
        )
        config = VectaraQueryConfig(
            lambda_val=self.lexical_interpolation,
            filter=self.filter,
            summary_config=summary_config,
            rerank_config=rerank_config,
        )
        rag = vectara.as_rag(config)
        response = rag.invoke(self.search_query, config={"callbacks": self.get_langchain_callbacks()})

        text_output = response["answer"]

        return Message(text=text_output)

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2024 DataStax | Privacy policy | Terms of use

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com