Processing components in Langflow

This Langflow feature is currently in public preview. Development is ongoing, and the features and functionality are subject to change. Langflow, and the use of such, is subject to the DataStax Preview Terms.

Processing components process and transform data within a flow.

Use a processing component in a flow

The Split Text processing component in this flow splits the incoming Data into chunks to be embedded into the vector store component.

The component offers control over chunk size, overlap, and separator, which affect context and granularity in vector store retrieval results.

vector store document ingestion

Alter Metadata

This component is in Legacy. Legacy components can be used in flows, but may not work due to Langflow core updates.

This component modifies metadata of input objects. It can add new metadata, update existing metadata, and remove specified metadata fields. The component works with both Message and Data objects, and can also create a new Data object from user-provided text.

Parameters

Inputs
Name Display Name Info

input_value

Input

Objects to which Metadata should be added.

text_in

User Text

Text input; the value will be in the 'text' attribute of the Data object. Empty text entries are ignored.

metadata

Metadata

Metadata to add to each object.

remove_fields

Fields to Remove

Metadata fields to remove.

Outputs
Name Display Name Info

data

Data

List of Input objects, each with added Metadata.

Component code

alter_metadata.py
from langflow.custom.custom_component.component import Component
from langflow.inputs.inputs import MessageTextInput
from langflow.io import HandleInput, NestedDictInput, Output, StrInput
from langflow.schema.data import Data
from langflow.schema.dataframe import DataFrame


class AlterMetadataComponent(Component):
    display_name = "Alter Metadata"
    description = "Adds/Removes Metadata Dictionary on inputs"
    icon = "merge"
    name = "AlterMetadata"
    legacy = True

    inputs = [
        HandleInput(
            name="input_value",
            display_name="Input",
            info="Object(s) to which Metadata should be added",
            required=False,
            input_types=["Message", "Data"],
            is_list=True,
        ),
        StrInput(
            name="text_in",
            display_name="User Text",
            info="Text input; value will be in 'text' attribute of Data object. Empty text entries are ignored.",
            required=False,
        ),
        NestedDictInput(
            name="metadata",
            display_name="Metadata",
            info="Metadata to add to each object",
            input_types=["Data"],
            required=True,
        ),
        MessageTextInput(
            name="remove_fields",
            display_name="Fields to Remove",
            info="Metadata Fields to Remove",
            required=False,
            is_list=True,
        ),
    ]

    outputs = [
        Output(
            name="data",
            display_name="Data",
            info="List of Input objects each with added Metadata",
            method="process_output",
        ),
        Output(
            display_name="DataFrame",
            name="dataframe",
            info="Data objects as a DataFrame, with metadata as columns",
            method="as_dataframe",
        ),
    ]

    def _as_clean_dict(self, obj):
        """Convert a Data object or a standard dictionary to a standard dictionary."""
        if isinstance(obj, dict):
            as_dict = obj
        elif isinstance(obj, Data):
            as_dict = obj.data
        else:
            msg = f"Expected a Data object or a dictionary but got {type(obj)}."
            raise TypeError(msg)

        return {k: v for k, v in (as_dict or {}).items() if k and k.strip()}

    def process_output(self) -> list[Data]:
        # Ensure metadata is a dictionary, filtering out any empty keys
        metadata = self._as_clean_dict(self.metadata)

        # Convert text_in to a Data object if it exists, and initialize our list of Data objects
        data_objects = [Data(text=self.text_in)] if self.text_in else []

        # Append existing Data objects from input_value, if any
        if self.input_value:
            data_objects.extend(self.input_value)

        # Update each Data object with the new metadata, preserving existing fields
        for data in data_objects:
            data.data.update(metadata)

        # Handle removal of fields specified in remove_fields
        if self.remove_fields:
            fields_to_remove = {field.strip() for field in self.remove_fields if field.strip()}

            # Remove specified fields from each Data object's metadata
            for data in data_objects:
                data.data = {k: v for k, v in data.data.items() if k not in fields_to_remove}

        # Set the status for tracking/debugging purposes
        self.status = data_objects
        return data_objects

    def as_dataframe(self) -> DataFrame:
        """Convert the processed data objects into a DataFrame.

        Returns:
            DataFrame: A DataFrame where each row corresponds to a Data object,
                    with metadata fields as columns.
        """
        data_list = self.process_output()
        return DataFrame(data_list)

Combine data

Prior to Langflow version 1.1.3, this component was named Merge Data.

This component combines multiple data objects into a unified list of data objects.

Parameters

Inputs
Name Display Name Info

data_inputs

data Inputs

A list of data input objects to be merged.

Outputs
Name Display Name Info

merged_data

Merged data

The resulting list of merged data objects with consistent keys.

Component code

merge_data.py
from enum import Enum
from typing import cast

from loguru import logger

from langflow.custom.custom_component.component import Component
from langflow.io import DataInput, DropdownInput, Output
from langflow.schema.dataframe import DataFrame


class DataOperation(str, Enum):
    CONCATENATE = "Concatenate"
    APPEND = "Append"
    MERGE = "Merge"
    JOIN = "Join"


class MergeDataComponent(Component):
    display_name = "Combine Data"
    description = "Combines data using different operations"
    icon = "merge"
    MIN_INPUTS_REQUIRED = 2
    legacy = True

    inputs = [
        DataInput(name="data_inputs", display_name="Data Inputs", info="Data to combine", is_list=True, required=True),
        DropdownInput(
            name="operation",
            display_name="Operation Type",
            options=[op.value for op in DataOperation],
            value=DataOperation.CONCATENATE.value,
        ),
    ]
    outputs = [Output(display_name="DataFrame", name="combined_data", method="combine_data")]

    def combine_data(self) -> DataFrame:
        if not self.data_inputs or len(self.data_inputs) < self.MIN_INPUTS_REQUIRED:
            empty_dataframe = DataFrame()
            self.status = empty_dataframe
            return empty_dataframe

        operation = DataOperation(self.operation)
        try:
            combined_dataframe = self._process_operation(operation)
            self.status = combined_dataframe
        except Exception as e:
            logger.error(f"Error during operation {operation}: {e!s}")
            raise
        else:
            return combined_dataframe

    def _process_operation(self, operation: DataOperation) -> DataFrame:
        if operation == DataOperation.CONCATENATE:
            combined_data: dict[str, str | object] = {}
            for data_input in self.data_inputs:
                for key, value in data_input.data.items():
                    if key in combined_data:
                        if isinstance(combined_data[key], str) and isinstance(value, str):
                            combined_data[key] = f"{combined_data[key]}\n{value}"
                        else:
                            combined_data[key] = value
                    else:
                        combined_data[key] = value
            return DataFrame([combined_data])

        if operation == DataOperation.APPEND:
            rows = [data_input.data for data_input in self.data_inputs]
            return DataFrame(rows)

        if operation == DataOperation.MERGE:
            result_data: dict[str, str | list[str] | object] = {}
            for data_input in self.data_inputs:
                for key, value in data_input.data.items():
                    if key in result_data and isinstance(value, str):
                        if isinstance(result_data[key], list):
                            cast("list[str]", result_data[key]).append(value)
                        else:
                            result_data[key] = [result_data[key], value]
                    else:
                        result_data[key] = value
            return DataFrame([result_data])

        if operation == DataOperation.JOIN:
            combined_data = {}
            for idx, data_input in enumerate(self.data_inputs, 1):
                for key, value in data_input.data.items():
                    new_key = f"{key}_doc{idx}" if idx > 1 else key
                    combined_data[new_key] = value
            return DataFrame([combined_data])

        return DataFrame()

Combine Text

This component concatenates two text sources into a single text chunk using a specified delimiter.

Parameters

Inputs
Name Display Name Info

first_text

First Text

The first text input to concatenate.

second_text

Second Text

The second text input to concatenate.

delimiter

Delimiter

A string used to separate the two text inputs. Defaults to a space.

Outputs
Name Display Name Info

message

Message

A Message object containing the combined text.

Component code

combine_text.py
from langflow.custom.custom_component.component import Component
from langflow.io import MessageTextInput, Output
from langflow.schema.message import Message


class CombineTextComponent(Component):
    display_name = "Combine Text"
    description = "Concatenate two text sources into a single text chunk using a specified delimiter."
    icon = "merge"
    name = "CombineText"
    legacy: bool = True

    inputs = [
        MessageTextInput(
            name="text1",
            display_name="First Text",
            info="The first text input to concatenate.",
        ),
        MessageTextInput(
            name="text2",
            display_name="Second Text",
            info="The second text input to concatenate.",
        ),
        MessageTextInput(
            name="delimiter",
            display_name="Delimiter",
            info="A string used to separate the two text inputs. Defaults to a whitespace.",
            value=" ",
        ),
    ]

    outputs = [
        Output(display_name="Combined Text", name="combined_text", method="combine_texts"),
    ]

    def combine_texts(self) -> Message:
        combined = self.delimiter.join([self.text1, self.text2])
        self.status = combined
        return Message(text=combined)

Create data

This component is in Legacy as of Langflow version 1.1.3. Legacy components can be used in flows, but may not work due to Langflow core updates.

This component dynamically creates a Data object with a specified number of fields.

Parameters

Inputs
Name Display Name Info

number_of_fields

Number of Fields

The number of fields to be added to the record.

text_key

Text Key

Key that identifies the field to be used as the text content.

text_key_validator

Text Key Validator

If enabled, checks if the given 'Text Key' is present in the given 'Data'.

Outputs
Name Display Name Info

data

Data

A Data object created with the specified fields and text key.

Component code

create_data.py
from typing import Any

from langflow.custom.custom_component.component import Component
from langflow.field_typing.range_spec import RangeSpec
from langflow.inputs.inputs import BoolInput, DictInput, IntInput, MessageTextInput
from langflow.io import Output
from langflow.schema.data import Data
from langflow.schema.dotdict import dotdict


class CreateDataComponent(Component):
    display_name: str = "Create Data"
    description: str = "Dynamically create a Data with a specified number of fields."
    name: str = "CreateData"
    MAX_FIELDS = 15  # Define a constant for maximum number of fields
    legacy = True
    icon = "ListFilter"

    inputs = [
        IntInput(
            name="number_of_fields",
            display_name="Number of Fields",
            info="Number of fields to be added to the record.",
            real_time_refresh=True,
            value=1,
            range_spec=RangeSpec(min=1, max=MAX_FIELDS, step=1, step_type="int"),
        ),
        MessageTextInput(
            name="text_key",
            display_name="Text Key",
            info="Key that identifies the field to be used as the text content.",
            advanced=True,
        ),
        BoolInput(
            name="text_key_validator",
            display_name="Text Key Validator",
            advanced=True,
            info="If enabled, checks if the given 'Text Key' is present in the given 'Data'.",
        ),
    ]

    outputs = [
        Output(display_name="Data", name="data", method="build_data"),
    ]

    def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
        if field_name == "number_of_fields":
            default_keys = ["code", "_type", "number_of_fields", "text_key", "text_key_validator"]
            try:
                field_value_int = int(field_value)
            except ValueError:
                return build_config
            existing_fields = {}
            if field_value_int > self.MAX_FIELDS:
                build_config["number_of_fields"]["value"] = self.MAX_FIELDS
                msg = (
                    f"Number of fields cannot exceed {self.MAX_FIELDS}. "
                    "Please adjust the number of fields to be within the allowed limit."
                )
                raise ValueError(msg)
            if len(build_config) > len(default_keys):
                # back up the existing template fields
                for key in build_config.copy():
                    if key not in default_keys:
                        existing_fields[key] = build_config.pop(key)

            for i in range(1, field_value_int + 1):
                key = f"field_{i}_key"
                if key in existing_fields:
                    field = existing_fields[key]
                    build_config[key] = field
                else:
                    field = DictInput(
                        display_name=f"Field {i}",
                        name=key,
                        info=f"Key for field {i}.",
                        input_types=["Message", "Data"],
                    )
                    build_config[field.name] = field.to_dict()

            build_config["number_of_fields"]["value"] = field_value_int
        return build_config

    async def build_data(self) -> Data:
        data = self.get_data()
        return_data = Data(data=data, text_key=self.text_key)
        self.status = return_data
        if self.text_key_validator:
            self.validate_text_key()
        return return_data

    def get_data(self):
        """Function to get the Data from the attributes."""
        data = {}
        for value_dict in self._attributes.values():
            if isinstance(value_dict, dict):
                # Check if the value of the value_dict is a Data
                value_dict_ = {
                    key: value.get_text() if isinstance(value, Data) else value for key, value in value_dict.items()
                }
                data.update(value_dict_)
        return data

    def validate_text_key(self) -> None:
        """This function validates that the Text Key is one of the keys in the Data."""
        data_keys = self.get_data().keys()
        if self.text_key not in data_keys and self.text_key != "":
            formatted_data_keys = ", ".join(data_keys)
            msg = f"Text Key: '{self.text_key}' not found in the Data keys: '{formatted_data_keys}'"
            raise ValueError(msg)

DataFrame operations

This component performs various operations on Pandas DataFrames. Here’s an overview of the available operations and their inputs:

Operations
Operation Description Required Inputs

Add Column

Adds a new column with a constant value

new_column_name, new_column_value

Drop Column

Removes a specified column

column_name

Filter

Filters rows based on column value

column_name, filter_value

Head

Returns first n rows

num_rows

Rename Column

Renames an existing column

column_name, new_column_name

Replace Value

Replaces values in a column

column_name, replace_value, replacement_value

Select Columns

Selects specific columns

columns_to_select

Sort

Sorts DataFrame by column

column_name, ascending

Tail

Returns last n rows

num_rows

Parameters

Inputs
Name Display Name Info

df

DataFrame

The input DataFrame to operate on.

operation

Operation

Select the DataFrame operation to perform. Options: Add Column, Drop Column, Filter, Head, Rename Column, Replace Value, Select Columns, Sort, Tail

column_name

Column Name

The column name to use for the operation.

filter_value

Filter Value

The value to filter rows by.

ascending

Sort Ascending

Whether to sort in ascending order.

new_column_name

New Column Name

The new column name when renaming or adding a column.

new_column_value

New Column Value

The value to populate the new column with.

columns_to_select

Columns to Select

List of column names to select.

num_rows

Number of Rows

Number of rows to return (for head/tail). Default: 5

replace_value

Value to Replace

The value to replace in the column.

replacement_value

Replacement Value

The value to replace with.

Outputs
Name Display Name Info

output

DataFrame

The resulting DataFrame after the operation.

Component code

dataframe_operations.py
from langflow.custom.custom_component.component import Component
from langflow.io import BoolInput, DataFrameInput, DropdownInput, IntInput, MessageTextInput, Output, StrInput
from langflow.schema.dataframe import DataFrame


class DataFrameOperationsComponent(Component):
    display_name = "DataFrame Operations"
    description = "Perform various operations on a DataFrame."
    icon = "table"

    # Available operations
    OPERATION_CHOICES = [
        "Add Column",
        "Drop Column",
        "Filter",
        "Head",
        "Rename Column",
        "Replace Value",
        "Select Columns",
        "Sort",
        "Tail",
    ]

    inputs = [
        DataFrameInput(
            name="df",
            display_name="DataFrame",
            info="The input DataFrame to operate on.",
        ),
        DropdownInput(
            name="operation",
            display_name="Operation",
            options=OPERATION_CHOICES,
            info="Select the DataFrame operation to perform.",
            real_time_refresh=True,
        ),
        StrInput(
            name="column_name",
            display_name="Column Name",
            info="The column name to use for the operation.",
            dynamic=True,
            show=False,
        ),
        MessageTextInput(
            name="filter_value",
            display_name="Filter Value",
            info="The value to filter rows by.",
            dynamic=True,
            show=False,
        ),
        BoolInput(
            name="ascending",
            display_name="Sort Ascending",
            info="Whether to sort in ascending order.",
            dynamic=True,
            show=False,
            value=True,
        ),
        StrInput(
            name="new_column_name",
            display_name="New Column Name",
            info="The new column name when renaming or adding a column.",
            dynamic=True,
            show=False,
        ),
        MessageTextInput(
            name="new_column_value",
            display_name="New Column Value",
            info="The value to populate the new column with.",
            dynamic=True,
            show=False,
        ),
        StrInput(
            name="columns_to_select",
            display_name="Columns to Select",
            dynamic=True,
            is_list=True,
            show=False,
        ),
        IntInput(
            name="num_rows",
            display_name="Number of Rows",
            info="Number of rows to return (for head/tail).",
            dynamic=True,
            show=False,
            value=5,
        ),
        MessageTextInput(
            name="replace_value",
            display_name="Value to Replace",
            info="The value to replace in the column.",
            dynamic=True,
            show=False,
        ),
        MessageTextInput(
            name="replacement_value",
            display_name="Replacement Value",
            info="The value to replace with.",
            dynamic=True,
            show=False,
        ),
    ]

    outputs = [
        Output(
            display_name="DataFrame",
            name="output",
            method="perform_operation",
            info="The resulting DataFrame after the operation.",
        )
    ]

    def update_build_config(self, build_config, field_value, field_name=None):
        # Hide all dynamic fields by default
        dynamic_fields = [
            "column_name",
            "filter_value",
            "ascending",
            "new_column_name",
            "new_column_value",
            "columns_to_select",
            "num_rows",
            "replace_value",
            "replacement_value",
        ]
        for field in dynamic_fields:
            build_config[field]["show"] = False

        # Show relevant fields based on the selected operation
        if field_name == "operation":
            if field_value == "Filter":
                build_config["column_name"]["show"] = True
                build_config["filter_value"]["show"] = True
            elif field_value == "Sort":
                build_config["column_name"]["show"] = True
                build_config["ascending"]["show"] = True
            elif field_value == "Drop Column":
                build_config["column_name"]["show"] = True
            elif field_value == "Rename Column":
                build_config["column_name"]["show"] = True
                build_config["new_column_name"]["show"] = True
            elif field_value == "Add Column":
                build_config["new_column_name"]["show"] = True
                build_config["new_column_value"]["show"] = True
            elif field_value == "Select Columns":
                build_config["columns_to_select"]["show"] = True
            elif field_value in {"Head", "Tail"}:
                build_config["num_rows"]["show"] = True
            elif field_value == "Replace Value":
                build_config["column_name"]["show"] = True
                build_config["replace_value"]["show"] = True
                build_config["replacement_value"]["show"] = True

        return build_config

    def perform_operation(self) -> DataFrame:
        dataframe_copy = self.df.copy()
        operation = self.operation

        if operation == "Filter":
            return self.filter_rows_by_value(dataframe_copy)
        if operation == "Sort":
            return self.sort_by_column(dataframe_copy)
        if operation == "Drop Column":
            return self.drop_column(dataframe_copy)
        if operation == "Rename Column":
            return self.rename_column(dataframe_copy)
        if operation == "Add Column":
            return self.add_column(dataframe_copy)
        if operation == "Select Columns":
            return self.select_columns(dataframe_copy)
        if operation == "Head":
            return self.head(dataframe_copy)
        if operation == "Tail":
            return self.tail(dataframe_copy)
        if operation == "Replace Value":
            return self.replace_values(dataframe_copy)
        msg = f"Unsupported operation: {operation}"

        raise ValueError(msg)

    # Existing methods
    def filter_rows_by_value(self, df: DataFrame) -> DataFrame:
        return DataFrame(df[df[self.column_name] == self.filter_value])

    def sort_by_column(self, df: DataFrame) -> DataFrame:
        return DataFrame(df.sort_values(by=self.column_name, ascending=self.ascending))

    def drop_column(self, df: DataFrame) -> DataFrame:
        return DataFrame(df.drop(columns=[self.column_name]))

    def rename_column(self, df: DataFrame) -> DataFrame:
        return DataFrame(df.rename(columns={self.column_name: self.new_column_name}))

    def add_column(self, df: DataFrame) -> DataFrame:
        df[self.new_column_name] = [self.new_column_value] * len(df)
        return DataFrame(df)

    def select_columns(self, df: DataFrame) -> DataFrame:
        columns = [col.strip() for col in self.columns_to_select]
        return DataFrame(df[columns])

    # New methods
    def head(self, df: DataFrame) -> DataFrame:
        return DataFrame(df.head(self.num_rows))

    def tail(self, df: DataFrame) -> DataFrame:
        return DataFrame(df.tail(self.num_rows))

    def replace_values(self, df: DataFrame) -> DataFrame:
        df[self.column_name] = df[self.column_name].replace(self.replace_value, self.replacement_value)
        return DataFrame(df)

Data to message

This component is in Legacy, which means it is no longer in active development as of Langflow version 1.3. Instead, use the Parser component.

This component converts data objects into plain text using a specified template.

Parameters

Inputs
Name Display Name Info

data

data

The data to convert to text.

template

Template

The template to use for formatting the data. It can contain the keys {text}, {data} or any other key in the data.

sep

Separator

The separator to use between multiple data items.

Outputs
Name Display Name Info

text

Text

The resulting formatted text string as a message object.

Component code

parse_data.py
from langflow.custom.custom_component.component import Component
from langflow.helpers.data import data_to_text, data_to_text_list
from langflow.io import DataInput, MultilineInput, Output, StrInput
from langflow.schema.data import Data
from langflow.schema.message import Message


class ParseDataComponent(Component):
    display_name = "Data to Message"
    description = "Convert Data objects into Messages using any {field_name} from input data."
    icon = "message-square"
    name = "ParseData"
    legacy = True
    metadata = {
        "legacy_name": "Parse Data",
    }

    inputs = [
        DataInput(
            name="data",
            display_name="Data",
            info="The data to convert to text.",
            is_list=True,
            required=True,
        ),
        MultilineInput(
            name="template",
            display_name="Template",
            info="The template to use for formatting the data. "
            "It can contain the keys {text}, {data} or any other key in the Data.",
            value="{text}",
            required=True,
        ),
        StrInput(name="sep", display_name="Separator", advanced=True, value="\n"),
    ]

    outputs = [
        Output(
            display_name="Message",
            name="text",
            info="Data as a single Message, with each input Data separated by Separator",
            method="parse_data",
        ),
        Output(
            display_name="Data List",
            name="data_list",
            info="Data as a list of new Data, each having `text` formatted by Template",
            method="parse_data_as_list",
        ),
    ]

    def _clean_args(self) -> tuple[list[Data], str, str]:
        data = self.data if isinstance(self.data, list) else [self.data]
        template = self.template
        sep = self.sep
        return data, template, sep

    def parse_data(self) -> Message:
        data, template, sep = self._clean_args()
        result_string = data_to_text(template, data, sep)
        self.status = result_string
        return Message(text=result_string)

    def parse_data_as_list(self) -> list[Data]:
        data, template, _ = self._clean_args()
        text_list, data_list = data_to_text_list(template, data)
        for item, text in zip(data_list, text_list, strict=True):
            item.set_text(text)
        self.status = data_list
        return data_list

Filter data

This component is in Beta as of Langflow version 1.1.3, and is not yet fully supported.

This component filters a data object based on a list of specified keys. This component allows for selective extraction of data from a data object, retaining only the key-value pairs that match the provided filter criteria.

Parameters

Inputs
Name Display Name Info

data

data

data object to filter

filter_criteria

Filter Criteria

List of keys to filter by.

Outputs
Name Display Name Info

filtered_data

Filtered data

The resulting filtered data object.

Component code

filter_data.py
from langflow.custom.custom_component.component import Component
from langflow.io import DataInput, MessageTextInput, Output
from langflow.schema.data import Data


class FilterDataComponent(Component):
    display_name = "Filter Data"
    description = "Filters a Data object based on a list of keys."
    icon = "filter"
    beta = True
    name = "FilterData"
    legacy = True

    inputs = [
        DataInput(
            name="data",
            display_name="Data",
            info="Data object to filter.",
        ),
        MessageTextInput(
            name="filter_criteria",
            display_name="Filter Criteria",
            info="List of keys to filter by.",
            is_list=True,
        ),
    ]

    outputs = [
        Output(display_name="Filtered Data", name="filtered_data", method="filter_data"),
    ]

    def filter_data(self) -> Data:
        filter_criteria: list[str] = self.filter_criteria
        data = self.data.data if isinstance(self.data, Data) else {}

        # Filter the data
        filtered = {key: value for key, value in data.items() if key in filter_criteria}

        # Create a new Data object with the filtered data
        filtered_data = Data(data=filtered)
        self.status = filtered_data
        return filtered_data

Filter Values

This component is in Beta as of Langflow version 1.1.3, and is not yet fully supported.

This component filters a list of data items based on a specified key, filter value, and comparison operator.

Parameters

Inputs
Name Display Name Info

input_data

Input data

The list of data items to filter.

filter_key

Filter Key

The key to filter on (for example, 'route').

filter_value

Filter Value

The value to filter by (for example, 'CMIP').

operator

Comparison Operator

The operator to apply for comparing the values.

Outputs
Name Display Name Info

filtered_data

Filtered data

The resulting list of filtered data items.

Component code

filter_data_values.py
from typing import Any

from langflow.custom.custom_component.component import Component
from langflow.io import DataInput, DropdownInput, MessageTextInput, Output
from langflow.schema.data import Data


class DataFilterComponent(Component):
    display_name = "Filter Values"
    description = (
        "Filter a list of data items based on a specified key, filter value,"
        " and comparison operator. Check advanced options to select match comparision."
    )
    icon = "filter"
    beta = True
    name = "FilterDataValues"
    legacy = True

    inputs = [
        DataInput(name="input_data", display_name="Input Data", info="The list of data items to filter.", is_list=True),
        MessageTextInput(
            name="filter_key",
            display_name="Filter Key",
            info="The key to filter on (e.g., 'route').",
            value="route",
            input_types=["Data"],
        ),
        MessageTextInput(
            name="filter_value",
            display_name="Filter Value",
            info="The value to filter by (e.g., 'CMIP').",
            value="CMIP",
            input_types=["Data"],
        ),
        DropdownInput(
            name="operator",
            display_name="Comparison Operator",
            options=["equals", "not equals", "contains", "starts with", "ends with"],
            info="The operator to apply for comparing the values.",
            value="equals",
            advanced=True,
        ),
    ]

    outputs = [
        Output(display_name="Filtered Data", name="filtered_data", method="filter_data"),
    ]

    def compare_values(self, item_value: Any, filter_value: str, operator: str) -> bool:
        if operator == "equals":
            return str(item_value) == filter_value
        if operator == "not equals":
            return str(item_value) != filter_value
        if operator == "contains":
            return filter_value in str(item_value)
        if operator == "starts with":
            return str(item_value).startswith(filter_value)
        if operator == "ends with":
            return str(item_value).endswith(filter_value)
        return False

    def filter_data(self) -> list[Data]:
        # Extract inputs
        input_data: list[Data] = self.input_data
        filter_key: str = self.filter_key.text
        filter_value: str = self.filter_value.text
        operator: str = self.operator

        # Validate inputs
        if not input_data:
            self.status = "Input data is empty."
            return []

        if not filter_key or not filter_value:
            self.status = "Filter key or value is missing."
            return input_data

        # Filter the data
        filtered_data = []
        for item in input_data:
            if isinstance(item.data, dict) and filter_key in item.data:
                if self.compare_values(item.data[filter_key], filter_value, operator):
                    filtered_data.append(item)
            else:
                self.status = f"Warning: Some items don't have the key '{filter_key}' or are not dictionaries."

        self.status = filtered_data
        return filtered_data

JSON Cleaner

This component is in Legacy. Legacy components can be used in flows, but may not work due to Langflow core updates.

This component cleans JSON strings to ensure they are fully compliant with the JSON specification.

Parameters

Inputs
Name Display Name Info

json_str

JSON String

The JSON string to be cleaned. This can be a raw, potentially malformed JSON string produced by language models or other sources that may not fully comply with JSON specifications.

remove_control_chars

Remove Control Characters

If set to True, this option removes control characters (ASCII characters 0-31 and 127) from the JSON string. This can help eliminate invisible characters that might cause parsing issues or make the JSON invalid.

normalize_unicode

Normalize Unicode

When enabled, this option normalizes Unicode characters in the JSON string to their canonical composition form (NFC). This ensures consistent representation of Unicode characters across different systems and prevents potential issues with character encoding.

validate_json

Validate JSON

If set to True, this option attempts to parse the JSON string to ensure it is well-formed before applying the final repair operation. It raises a ValueError if the JSON is invalid, allowing for early detection of major structural issues in the JSON.

Outputs
Name Display Name Info

output

Cleaned JSON String

The resulting cleaned, repaired, and validated JSON string that fully complies with the JSON specification.

Component code

json_cleaner.py
import json
import unicodedata

from langflow.custom.custom_component.component import Component
from langflow.inputs.inputs import BoolInput, MessageTextInput
from langflow.schema.message import Message
from langflow.template.field.base import Output


class JSONCleaner(Component):
    icon = "braces"
    display_name = "JSON Cleaner"
    description = (
        "Cleans the messy and sometimes incorrect JSON strings produced by LLMs "
        "so that they are fully compliant with the JSON spec."
    )
    legacy = True
    inputs = [
        MessageTextInput(
            name="json_str", display_name="JSON String", info="The JSON string to be cleaned.", required=True
        ),
        BoolInput(
            name="remove_control_chars",
            display_name="Remove Control Characters",
            info="Remove control characters from the JSON string.",
            required=False,
        ),
        BoolInput(
            name="normalize_unicode",
            display_name="Normalize Unicode",
            info="Normalize Unicode characters in the JSON string.",
            required=False,
        ),
        BoolInput(
            name="validate_json",
            display_name="Validate JSON",
            info="Validate the JSON string to ensure it is well-formed.",
            required=False,
        ),
    ]

    outputs = [
        Output(display_name="Cleaned JSON String", name="output", method="clean_json"),
    ]

    def clean_json(self) -> Message:
        try:
            from json_repair import repair_json
        except ImportError as e:
            msg = "Could not import the json_repair package. Please install it with `pip install json_repair`."
            raise ImportError(msg) from e

        """Clean the input JSON string based on provided options and return the cleaned JSON string."""
        json_str = self.json_str
        remove_control_chars = self.remove_control_chars
        normalize_unicode = self.normalize_unicode
        validate_json = self.validate_json

        start = json_str.find("{")
        end = json_str.rfind("}")
        if start == -1 or end == -1:
            msg = "Invalid JSON string: Missing '{' or '}'"
            raise ValueError(msg)
        try:
            json_str = json_str[start : end + 1]

            if remove_control_chars:
                json_str = self._remove_control_characters(json_str)
            if normalize_unicode:
                json_str = self._normalize_unicode(json_str)
            if validate_json:
                json_str = self._validate_json(json_str)

            cleaned_json_str = repair_json(json_str)
            result = str(cleaned_json_str)

            self.status = result
            return Message(text=result)
        except Exception as e:
            msg = f"Error cleaning JSON string: {e}"
            raise ValueError(msg) from e

    def _remove_control_characters(self, s: str) -> str:
        """Remove control characters from the string."""
        return s.translate(self.translation_table)

    def _normalize_unicode(self, s: str) -> str:
        """Normalize Unicode characters in the string."""
        return unicodedata.normalize("NFC", s)

    def _validate_json(self, s: str) -> str:
        """Validate the JSON string."""
        try:
            json.loads(s)
        except json.JSONDecodeError as e:
            msg = f"Invalid JSON string: {e}"
            raise ValueError(msg) from e
        return s

    def __init__(self, *args, **kwargs):
        # Create a translation table that maps control characters to None
        super().__init__(*args, **kwargs)
        self.translation_table = str.maketrans("", "", "".join(chr(i) for i in range(32)) + chr(127))

Lambda filter

This component uses an LLM to generate a Lambda function for filtering or transforming structured data.

To use the Lambda filter component, you must connect it to a Language Model component, which the component uses to generate a function based on the natural language instructions in the Instructions field.

This example gets JSON data from the https://jsonplaceholder.typicode.com/users API endpoint. The Instructions field in the Lambda filter component specifies the task extract emails. The connected LLM creates a filter based on the instructions, and successfully extracts a list of email addresses from the JSON data.

component lambda filter

Parameters

Inputs
Name Display Name Info

data

Data

The structured data to filter or transform using a Lambda function.

llm

Language Model

The connection port for a Model component.

filter_instruction

Instructions

Natural language instructions for how to filter or transform the data using a Lambda function, such as Filter the data to only include items where the 'status' is 'active'.

sample_size

Sample Size

For large datasets, the number of characters to sample from the dataset head and tail.

max_size

Max Size

The number of characters for the data to be considered "large", which triggers sampling by the sample_size value.

Outputs
Name Display Name Info

filtered_data

Filtered Data

The filtered or transformed data object.

dataframe

DataFrame

The filtered data returned as a DataFrame.

Component code

lambda_filter.py
from __future__ import annotations

import json
import re
from typing import TYPE_CHECKING, Any

from langflow.custom.custom_component.component import Component
from langflow.io import DataInput, HandleInput, IntInput, MultilineInput, Output
from langflow.schema.data import Data
from langflow.schema.dataframe import DataFrame
from langflow.utils.data_structure import get_data_structure

if TYPE_CHECKING:
    from collections.abc import Callable


class LambdaFilterComponent(Component):
    display_name = "Smart Function"
    description = "Uses an LLM to generate a function for filtering or transforming structured data."
    icon = "test-tube-diagonal"
    name = "Smart Function"

    inputs = [
        DataInput(
            name="data",
            display_name="Data",
            info="The structured data to filter or transform using a lambda function.",
            is_list=True,
            required=True,
        ),
        HandleInput(
            name="llm",
            display_name="Language Model",
            info="Connect the 'Language Model' output from your LLM component here.",
            input_types=["LanguageModel"],
            required=True,
        ),
        MultilineInput(
            name="filter_instruction",
            display_name="Instructions",
            info=(
                "Natural language instructions for how to filter or transform the data using a lambda function. "
                "Example: Filter the data to only include items where the 'status' is 'active'."
            ),
            value="Filter the data to...",
            required=True,
        ),
        IntInput(
            name="sample_size",
            display_name="Sample Size",
            info="For large datasets, number of items to sample from head/tail.",
            value=1000,
            advanced=True,
        ),
        IntInput(
            name="max_size",
            display_name="Max Size",
            info="Number of characters for the data to be considered large.",
            value=30000,
            advanced=True,
        ),
    ]

    outputs = [
        Output(
            display_name="Filtered Data",
            name="filtered_data",
            method="filter_data",
        ),
        Output(
            display_name="DataFrame",
            name="dataframe",
            method="as_dataframe",
        ),
    ]

    def get_data_structure(self, data):
        """Extract the structure of a dictionary, replacing values with their types."""
        return {k: get_data_structure(v) for k, v in data.items()}

    def _validate_lambda(self, lambda_text: str) -> bool:
        """Validate the provided lambda function text."""
        # Return False if the lambda function does not start with 'lambda' or does not contain a colon
        return lambda_text.strip().startswith("lambda") and ":" in lambda_text

    async def filter_data(self) -> list[Data]:
        self.log(str(self.data))
        data = self.data[0].data if isinstance(self.data, list) else self.data.data

        dump = json.dumps(data)
        self.log(str(data))

        llm = self.llm
        instruction = self.filter_instruction
        sample_size = self.sample_size

        # Get data structure and samples
        data_structure = self.get_data_structure(data)
        dump_structure = json.dumps(data_structure)
        self.log(dump_structure)

        # For large datasets, sample from head and tail
        if len(dump) > self.max_size:
            data_sample = (
                f"Data is too long to display... \n\n First lines (head): {dump[:sample_size]} \n\n"
                f" Last lines (tail): {dump[-sample_size:]})"
            )
        else:
            data_sample = dump

        self.log(data_sample)

        prompt = f"""Given this data structure and examples, create a Python lambda function that
                    implements the following instruction:

                    Data Structure:
                    {dump_structure}

                    Example Items:
                    {data_sample}

                    Instruction: {instruction}

                    Return ONLY the lambda function and nothing else. No need for ```python or whatever.
                    Just a string starting with lambda.
                    """

        response = await llm.ainvoke(prompt)
        response_text = response.content if hasattr(response, "content") else str(response)
        self.log(response_text)

        # Extract lambda using regex
        lambda_match = re.search(r"lambda\s+\w+\s*:.*?(?=\n|$)", response_text)
        if not lambda_match:
            msg = f"Could not find lambda in response: {response_text}"
            raise ValueError(msg)

        lambda_text = lambda_match.group().strip()
        self.log(lambda_text)

        # Validation is commented out as requested
        if not self._validate_lambda(lambda_text):
            msg = f"Invalid lambda format: {lambda_text}"
            raise ValueError(msg)

        # Create and apply the function
        fn: Callable[[Any], Any] = eval(lambda_text)  # noqa: S307

        # Apply the lambda function to the data
        processed_data = fn(data)

        # If it's a dict, wrap it in a Data object
        if isinstance(processed_data, dict):
            return [Data(**processed_data)]
        # If it's a list, convert each item to a Data object
        if isinstance(processed_data, list):
            return [Data(**item) if isinstance(item, dict) else Data(text=str(item)) for item in processed_data]
        # If it's anything else, convert to string and wrap in a Data object
        return [Data(text=str(processed_data))]

    async def as_dataframe(self) -> DataFrame:
        """Return filtered data as a DataFrame."""
        filtered_data = await self.filter_data()
        return DataFrame(filtered_data)

LLM Router

This component routes requests to the most appropriate LLM based on the OpenRouter model specifications.

Parameters

Inputs
Name Display Name Info

models

Language Models

List of LLMs to route between.

input_value

Input

The input message to be routed.

judge_llm

Judge LLM

LLM that will evaluate and select the most appropriate model.

optimization

Optimization

Optimization preference (quality/speed/cost/balanced).

Outputs
Name Display Name Info

output

Output

The response from the selected model.

selected_model

Selected Model

Name of the chosen model.

Component code

llm_router.py
import asyncio
import http  # Added for HTTPStatus
import json
from typing import Any

import aiohttp

from langflow.base.models.chat_result import get_chat_result
from langflow.base.models.model_utils import get_model_name
from langflow.custom.custom_component.component import Component
from langflow.inputs.inputs import BoolInput, DropdownInput, HandleInput, IntInput, MultilineInput
from langflow.schema.data import Data
from langflow.schema.message import Message
from langflow.template.field.base import Output


class LLMRouterComponent(Component):
    display_name = "LLM Router"
    description = "Routes the input to the most appropriate LLM based on OpenRouter model specifications"
    icon = "git-branch"

    # Constants for magic values
    MAX_DESCRIPTION_LENGTH = 500
    QUERY_PREVIEW_MAX_LENGTH = 1000

    inputs = [
        HandleInput(
            name="models",
            display_name="Language Models",
            input_types=["LanguageModel"],
            required=True,
            is_list=True,
            info="List of LLMs to route between",
        ),
        MultilineInput(
            name="input_value",
            display_name="Input",
            required=True,
            info="The input message to be routed",
        ),
        HandleInput(
            name="judge_llm",
            display_name="Judge LLM",
            input_types=["LanguageModel"],
            required=True,
            info="LLM that will evaluate and select the most appropriate model",
        ),
        DropdownInput(
            name="optimization",
            display_name="Optimization",
            options=["quality", "speed", "cost", "balanced"],
            value="balanced",
            info="Optimization preference for model selection",
        ),
        BoolInput(
            name="use_openrouter_specs",
            display_name="Use OpenRouter Specs",
            value=True,
            info=(
                "Fetch model specifications from OpenRouter API for enhanced routing decisions. "
                "If false, only model names will be used."
            ),
            advanced=True,
        ),
        IntInput(
            name="timeout",
            display_name="API Timeout",
            value=10,
            info="Timeout for API requests in seconds",
            advanced=True,
        ),
        BoolInput(
            name="fallback_to_first",
            display_name="Fallback to First Model",
            value=True,
            info="Use first model as fallback when routing fails",
            advanced=True,
        ),
    ]

    outputs = [
        Output(display_name="Output", name="output", method="route_to_model"),
        Output(
            display_name="Selected Model Info",
            name="selected_model_info",
            method="get_selected_model_info",
            types=["Data"],
        ),
        Output(
            display_name="Routing Decision",
            name="routing_decision",
            method="get_routing_decision",
        ),
    ]

    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._selected_model_name: str | None = None
        self._selected_api_model_id: str | None = None
        self._routing_decision: str = ""
        self._models_api_cache: dict[str, dict[str, Any]] = {}
        self._model_name_to_api_id: dict[str, str] = {}

    def _simplify_model_name(self, name: str) -> str:
        """Simplify model name for matching by lowercasing and removing non-alphanumerics."""
        return "".join(c.lower() for c in name if c.isalnum())

    async def _fetch_openrouter_models_data(self) -> None:
        """Fetch all models from OpenRouter API and cache them along with name mappings."""
        if self._models_api_cache and self._model_name_to_api_id:
            return

        if not self.use_openrouter_specs:
            self.log("OpenRouter specs are disabled. Skipping fetch.")
            return

        try:
            self.status = "Fetching OpenRouter model specifications..."
            self.log("Fetching all model specifications from OpenRouter API: https://openrouter.ai/api/v1/models")
            async with (
                aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session,
                session.get("https://openrouter.ai/api/v1/models") as response,
            ):
                if response.status == http.HTTPStatus.OK:
                    data = await response.json()
                    models_list = data.get("data", [])

                    _models_api_cache_temp = {}
                    _model_name_to_api_id_temp = {}

                    for model_data in models_list:
                        api_model_id = model_data.get("id")
                        if not api_model_id:
                            continue

                        _models_api_cache_temp[api_model_id] = model_data
                        _model_name_to_api_id_temp[api_model_id] = api_model_id

                        api_model_name = model_data.get("name")
                        if api_model_name:
                            _model_name_to_api_id_temp[api_model_name] = api_model_id
                            simplified_api_name = self._simplify_model_name(api_model_name)
                            _model_name_to_api_id_temp[simplified_api_name] = api_model_id

                        hugging_face_id = model_data.get("hugging_face_id")
                        if hugging_face_id:
                            _model_name_to_api_id_temp[hugging_face_id] = api_model_id
                            simplified_hf_id = self._simplify_model_name(hugging_face_id)
                            _model_name_to_api_id_temp[simplified_hf_id] = api_model_id

                        if "/" in api_model_id:
                            try:
                                model_name_part_of_id = api_model_id.split("/", 1)[1]
                                if model_name_part_of_id:
                                    _model_name_to_api_id_temp[model_name_part_of_id] = api_model_id
                                    simplified_part_id = self._simplify_model_name(model_name_part_of_id)
                                    _model_name_to_api_id_temp[simplified_part_id] = api_model_id
                            except IndexError:
                                pass  # Should not happen if '/' is present

                    self._models_api_cache = _models_api_cache_temp
                    self._model_name_to_api_id = _model_name_to_api_id_temp
                    log_msg = (
                        f"Successfully fetched and cached {len(self._models_api_cache)} "
                        f"model specifications from OpenRouter."
                    )
                    self.log(log_msg)
                else:
                    err_text = await response.text()
                    self.log(f"Failed to fetch OpenRouter models: HTTP {response.status} - {err_text}")
                    self._models_api_cache = {}
                    self._model_name_to_api_id = {}
        except aiohttp.ClientError as e:
            self.log(f"AIOHTTP ClientError fetching OpenRouter models: {e!s}", "error")
            self._models_api_cache = {}
            self._model_name_to_api_id = {}
        except asyncio.TimeoutError:
            self.log("Timeout fetching OpenRouter model specifications.", "error")
            self._models_api_cache = {}
            self._model_name_to_api_id = {}
        except json.JSONDecodeError as e:
            self.log(f"JSON decode error fetching OpenRouter models: {e!s}", "error")
            self._models_api_cache = {}
            self._model_name_to_api_id = {}
        finally:
            self.status = ""

    def _get_api_model_id_for_langflow_model(self, langflow_model_name: str) -> str | None:
        """Attempt to find the OpenRouter API ID for a given Langflow model name."""
        if not langflow_model_name:
            return None

        potential_names_to_check = [langflow_model_name, self._simplify_model_name(langflow_model_name)]

        if langflow_model_name.startswith("models/"):
            name_without_prefix = langflow_model_name[len("models/") :]
            potential_names_to_check.append(name_without_prefix)
            potential_names_to_check.append(self._simplify_model_name(name_without_prefix))

        elif langflow_model_name.startswith("community_models/"):
            name_without_prefix = langflow_model_name[len("community_models/") :]
            potential_names_to_check.append(name_without_prefix)
            simplified_no_prefix = self._simplify_model_name(name_without_prefix)
            potential_names_to_check.append(simplified_no_prefix)

        elif langflow_model_name.startswith("community_models/"):
            name_without_prefix = langflow_model_name[len("community_models/") :]
            potential_names_to_check.append(name_without_prefix)
            simplified_no_prefix_comm = self._simplify_model_name(name_without_prefix)
            potential_names_to_check.append(simplified_no_prefix_comm)

        unique_names_to_check = list(dict.fromkeys(potential_names_to_check))

        for name_variant in unique_names_to_check:
            if name_variant in self._model_name_to_api_id:
                return self._model_name_to_api_id[name_variant]

        self.log(
            f"Could not map Langflow model name '{langflow_model_name}' "
            f"(tried variants: {unique_names_to_check}) to an OpenRouter API ID."
        )
        return None

    def _get_model_specs_dict(self, langflow_model_name: str) -> dict[str, Any]:
        """Get a dictionary of relevant model specifications for a given Langflow model name."""
        if not self.use_openrouter_specs or not self._models_api_cache:
            return {
                "id": langflow_model_name,
                "name": langflow_model_name,
                "description": "Specifications not available.",
            }

        api_model_id = self._get_api_model_id_for_langflow_model(langflow_model_name)

        if not api_model_id or api_model_id not in self._models_api_cache:
            log_msg = (
                f"No cached API data found for Langflow model '{langflow_model_name}' "
                f"(mapped API ID: {api_model_id}). Returning basic info."
            )
            self.log(log_msg)
            return {
                "id": langflow_model_name,
                "name": langflow_model_name,
                "description": "Full specifications not found in cache.",
            }

        model_data = self._models_api_cache[api_model_id]
        top_provider_data = model_data.get("top_provider", {})
        architecture_data = model_data.get("architecture", {})
        pricing_data = model_data.get("pricing", {})
        description = model_data.get("description", "No description available")
        truncated_description = (
            description[: self.MAX_DESCRIPTION_LENGTH - 3] + "..."
            if len(description) > self.MAX_DESCRIPTION_LENGTH
            else description
        )

        specs = {
            "id": model_data.get("id"),
            "name": model_data.get("name"),
            "description": truncated_description,
            "context_length": top_provider_data.get("context_length") or model_data.get("context_length"),
            "max_completion_tokens": (
                top_provider_data.get("max_completion_tokens") or model_data.get("max_completion_tokens")
            ),
            "tokenizer": architecture_data.get("tokenizer"),
            "input_modalities": architecture_data.get("input_modalities", []),
            "output_modalities": architecture_data.get("output_modalities", []),
            "pricing_prompt": pricing_data.get("prompt"),
            "pricing_completion": pricing_data.get("completion"),
            "is_moderated": top_provider_data.get("is_moderated"),
            "supported_parameters": model_data.get("supported_parameters", []),
        }
        return {k: v for k, v in specs.items() if v is not None}

    def _create_system_prompt(self) -> str:
        """Create system prompt for the judge LLM."""
        return """\
You are an expert AI model selection specialist. Your task is to analyze the user's input query,
their optimization preference, and a list of available models with their specifications,
then select the most appropriate model.

Each model will be presented as a JSON object with its capabilities and characteristics.

Your decision should be based on:
1. Task complexity and requirements derived from the user's query.
2. Context length needed for the input.
3. Model capabilities (e.g., context window, input/output modalities, tokenizer).
4. Pricing considerations, if relevant to the optimization preference.
5. User's stated optimization preference (quality, speed, cost, balanced).

Return ONLY the index number (0, 1, 2, etc.) of the best model from the provided list.
Do not provide any explanation or reasoning, just the index number.
If multiple models seem equally suitable according to the preference, you may pick the first one that matches.
If no model seems suitable, pick the first model in the list (index 0) as a fallback."""

    async def route_to_model(self) -> Message:
        """Main routing method."""
        if not self.models or not self.input_value or not self.judge_llm:
            error_msg = "Missing required inputs: models, input_value, or judge_llm"
            self.status = error_msg
            self.log(f"Validation Error: {error_msg}", "error")
            raise ValueError(error_msg)

        successful_result: Message | None = None
        try:
            self.log(f"Starting model routing with {len(self.models)} available Langflow models.")
            self.log(f"Optimization preference: {self.optimization}")
            self.log(f"Input length: {len(self.input_value)} characters")

            if self.use_openrouter_specs and not self._models_api_cache:
                await self._fetch_openrouter_models_data()

            system_prompt_content = self._create_system_prompt()
            system_message = {"role": "system", "content": system_prompt_content}

            self.status = "Analyzing available models and preparing specifications..."
            model_specs_for_judge = []
            for i, langflow_model_instance in enumerate(self.models):
                langflow_model_name = get_model_name(langflow_model_instance)
                if not langflow_model_name:
                    self.log(f"Warning: Could not determine name for model at index {i}. Using placeholder.", "warning")
                    spec_dict = {
                        "id": f"unknown_model_{i}",
                        "name": f"Unknown Model {i}",
                        "description": "Name could not be determined.",
                    }
                else:
                    spec_dict = self._get_model_specs_dict(langflow_model_name)

                model_specs_for_judge.append({"index": i, "langflow_name": langflow_model_name, "specs": spec_dict})
                self.log(
                    f"Prepared specs for Langflow model {i} ('{langflow_model_name}'): {spec_dict.get('name', 'N/A')}"
                )

            estimated_tokens = len(self.input_value.split()) * 1.3
            self.log(f"Estimated input tokens: {int(estimated_tokens)}")

            query_preview = self.input_value[: self.QUERY_PREVIEW_MAX_LENGTH]
            if len(self.input_value) > self.QUERY_PREVIEW_MAX_LENGTH:
                query_preview += "..."

            user_message_content = f"""User Query: "{query_preview}"
Optimization Preference: {self.optimization}
Estimated Input Tokens: ~{int(estimated_tokens)}

Available Models (JSON list):
{json.dumps(model_specs_for_judge, indent=2)}

Based on the user query, optimization preference, and the detailed model specifications,
select the index of the most appropriate model.
Return ONLY the index number:"""

            user_message = {"role": "user", "content": user_message_content}

            self.log("Requesting model selection from judge LLM...")
            self.status = "Judge LLM analyzing options..."

            response = await self.judge_llm.ainvoke([system_message, user_message])
            selected_index, chosen_model_instance = self._parse_judge_response(response.content.strip())
            self._selected_model_name = get_model_name(chosen_model_instance)
            if self._selected_model_name:
                self._selected_api_model_id = (
                    self._get_api_model_id_for_langflow_model(self._selected_model_name) or self._selected_model_name
                )
            else:
                self._selected_api_model_id = "unknown_model"

            specs_source = (
                "OpenRouter API"
                if self.use_openrouter_specs and self._models_api_cache
                else "Basic (Langflow model names only)"
            )
            self._routing_decision = f"""Model Selection Decision:
- Selected Model Index: {selected_index}
- Selected Langflow Model Name: {self._selected_model_name}
- Selected API Model ID (if resolved): {self._selected_api_model_id}
- Optimization Preference: {self.optimization}
- Input Query Length: {len(self.input_value)} characters (~{int(estimated_tokens)} tokens)
- Number of Models Considered: {len(self.models)}
- Specifications Source: {specs_source}"""

            log_msg = (
                f"DECISION by Judge LLM: Selected model index {selected_index} -> "
                f"Langflow Name: '{self._selected_model_name}', API ID: '{self._selected_api_model_id}'"
            )
            self.log(log_msg)

            self.status = f"Generating response with: {self._selected_model_name}"
            input_message_obj = Message(text=self.input_value)

            raw_result = get_chat_result(
                runnable=chosen_model_instance,
                input_value=input_message_obj,
            )
            result = Message(text=str(raw_result)) if not isinstance(raw_result, Message) else raw_result

            self.status = f"Successfully routed to: {self._selected_model_name}"
            successful_result = result

        except (ValueError, TypeError, AttributeError, KeyError, RuntimeError) as e:
            error_msg = f"Routing error: {type(e).__name__} - {e!s}"
            self.log(f"{error_msg}", "error")
            self.log("Detailed routing error occurred. Check logs for details.", "error")
            self.status = error_msg

            if self.fallback_to_first and self.models:
                self.log("Activating fallback to first model due to error.", "warning")
                chosen_model_instance = self.models[0]
                self._selected_model_name = get_model_name(chosen_model_instance)
                if self._selected_model_name:
                    mapped_id = self._get_api_model_id_for_langflow_model(self._selected_model_name)
                    self._selected_api_model_id = mapped_id or self._selected_model_name
                else:
                    self._selected_api_model_id = "fallback_model"
                self._routing_decision = f"""Fallback Decision:
- Error During Routing: {error_msg}
- Fallback Model Langflow Name: {self._selected_model_name}
- Fallback Model API ID (if resolved): {self._selected_api_model_id}
- Reason: Automatic fallback enabled"""

                self.status = f"Fallback: Using {self._selected_model_name}"
                input_message_obj = Message(text=self.input_value)

                raw_fallback_result = get_chat_result(
                    runnable=chosen_model_instance,
                    input_value=input_message_obj,
                )
                if not isinstance(raw_fallback_result, Message):
                    successful_result = Message(text=str(raw_fallback_result))
                else:
                    successful_result = raw_fallback_result
            else:
                self.log("No fallback model available or fallback disabled. Raising error.", "error")
                raise

        if successful_result is None:
            error_message = "Unexpected state in route_to_model: No result produced."
            self.log(f"Error: {error_message}", "error")
            raise RuntimeError(error_message)
        return successful_result

    def _parse_judge_response(self, response_content: str) -> tuple[int, Any]:
        """Parse the judge's response to extract model index."""
        try:
            cleaned_response = "".join(filter(str.isdigit, response_content.strip()))
            if not cleaned_response:
                self.log(f"Judge LLM response was non-numeric: '{response_content}'. Defaulting to index 0.", "warning")
                return 0, self.models[0]

            selected_index = int(cleaned_response)

            if 0 <= selected_index < len(self.models):
                self.log(f"Judge LLM selected index: {selected_index}")
                return selected_index, self.models[selected_index]
            log_msg = (
                f"Judge LLM selected index {selected_index} is out of bounds "
                f"(0-{len(self.models) - 1}). Defaulting to index 0."
            )
            self.log(log_msg, "warning")
            return 0, self.models[0]

        except ValueError:
            self.log(
                f"Could not parse judge LLM response to integer: '{response_content}'. Defaulting to index 0.",
                "warning",
            )
            return 0, self.models[0]
        except (AttributeError, IndexError) as e:
            self.log(f"Error parsing judge response '{response_content}': {e!s}. Defaulting to index 0.", "error")
            return 0, self.models[0]

    def get_selected_model_info(self) -> list[Data]:
        """Return detailed information about the selected model as a list of Data objects."""
        if self._selected_model_name:
            specs_dict = self._get_model_specs_dict(self._selected_model_name)
            if "langflow_name" not in specs_dict:
                specs_dict["langflow_model_name_used_for_lookup"] = self._selected_model_name
            if self._selected_api_model_id and specs_dict.get("id") != self._selected_api_model_id:
                specs_dict["resolved_api_model_id"] = self._selected_api_model_id
            data_output = [Data(data=specs_dict)]
            self.status = data_output
            return data_output

        data_output = [Data(data={"info": "No model selected yet - run the router first."})]
        self.status = data_output
        return data_output

    def get_routing_decision(self) -> Message:
        """Return the comprehensive routing decision explanation."""
        if self._routing_decision:
            message_output = Message(text=f"{self._routing_decision}")
            self.status = message_output
            return message_output

        message_output = Message(text="No routing decision made yet - run the router first.")
        self.status = message_output
        return message_output

Message to data

This component converts a message object to a data object.

Parameters

Inputs
Name Display Name Info

message

message

The message object to convert to a data object.

Outputs
Name Display Name Info

data

data

The resulting data object converted from the input message.

Component code

message_to_data.py
from loguru import logger

from langflow.custom.custom_component.component import Component
from langflow.io import MessageInput, Output
from langflow.schema.data import Data
from langflow.schema.message import Message


class MessageToDataComponent(Component):
    display_name = "Message to Data"
    description = "Convert a Message object to a Data object"
    icon = "message-square-share"
    beta = True
    name = "MessagetoData"
    legacy = True

    inputs = [
        MessageInput(
            name="message",
            display_name="Message",
            info="The Message object to convert to a Data object",
        ),
    ]

    outputs = [
        Output(display_name="Data", name="data", method="convert_message_to_data"),
    ]

    def convert_message_to_data(self) -> Data:
        if isinstance(self.message, Message):
            # Convert Message to Data
            return Data(data=self.message.data)

        msg = "Error converting Message to Data: Input must be a Message object"
        logger.opt(exception=True).debug(msg)
        self.status = msg
        return Data(data={"error": msg})

Parse DataFrame

This component is in Legacy, which means it is no longer in active development as of Langflow version 1.3. Instead, use the Parser component.

This component converts a DataFrame into plain text following a specified template. Each column in the DataFrame is treated as a possible template key, for example, {col_name}.

Parameters

Inputs
Name Display Name Info

df

DataFrame

The DataFrame to convert to text rows.

template

Template

The template for formatting each row. Use placeholders matching column names in the DataFrame, for example, {col1}, {col2}.

sep

Separator

String that joins all row texts when building the single Text output.

Outputs
Name Display Name Info

text

Text

All rows combined into a single text, each row formatted by the template and separated by the separator value defined in sep.

Component code

parse_dataframe.py
from langflow.custom.custom_component.component import Component
from langflow.io import DataFrameInput, MultilineInput, Output, StrInput
from langflow.schema.message import Message


class ParseDataFrameComponent(Component):
    display_name = "Parse DataFrame"
    description = (
        "Convert a DataFrame into plain text following a specified template. "
        "Each column in the DataFrame is treated as a possible template key, e.g. {col_name}."
    )
    icon = "braces"
    name = "ParseDataFrame"
    legacy = True

    inputs = [
        DataFrameInput(name="df", display_name="DataFrame", info="The DataFrame to convert to text rows."),
        MultilineInput(
            name="template",
            display_name="Template",
            info=(
                "The template for formatting each row. "
                "Use placeholders matching column names in the DataFrame, for example '{col1}', '{col2}'."
            ),
            value="{text}",
        ),
        StrInput(
            name="sep",
            display_name="Separator",
            advanced=True,
            value="\n",
            info="String that joins all row texts when building the single Text output.",
        ),
    ]

    outputs = [
        Output(
            display_name="Text",
            name="text",
            info="All rows combined into a single text, each row formatted by the template and separated by `sep`.",
            method="parse_data",
        ),
    ]

    def _clean_args(self):
        dataframe = self.df
        template = self.template or "{text}"
        sep = self.sep or "\n"
        return dataframe, template, sep

    def parse_data(self) -> Message:
        """Converts each row of the DataFrame into a formatted string using the template.

        then joins them with `sep`. Returns a single combined string as a Message.
        """
        dataframe, template, sep = self._clean_args()

        lines = []
        # For each row in the DataFrame, build a dict and format
        for _, row in dataframe.iterrows():
            row_dict = row.to_dict()
            text_line = template.format(**row_dict)  # e.g. template="{text}", row_dict={"text": "Hello"}
            lines.append(text_line)

        # Join all lines with the provided separator
        result_string = sep.join(lines)
        self.status = result_string  # store in self.status for UI logs
        return Message(text=result_string)

Parse JSON

This component is in Legacy as of Langflow version 1.1.3. Legacy components can be used in flows, but may not work due to Langflow core updates.

This component converts and extracts JSON fields using JQ queries.

Parameters

Inputs
Name Display Name Info

input_value

Input

The data object to filter. It can be a message or data object.

query

JQ Query

JQ Query to filter the data. The input is always a JSON list.

Outputs
Name Display Name Info

filtered_data

Filtered data

Filtered data as a list of data objects.

Component code

parse_json_data.py
import json
from json import JSONDecodeError

import jq
from json_repair import repair_json
from loguru import logger

from langflow.custom.custom_component.component import Component
from langflow.inputs.inputs import HandleInput, MessageTextInput
from langflow.io import Output
from langflow.schema.data import Data
from langflow.schema.message import Message


class ParseJSONDataComponent(Component):
    display_name = "Parse JSON"
    description = "Convert and extract JSON fields."
    icon = "braces"
    name = "ParseJSONData"
    legacy: bool = True

    inputs = [
        HandleInput(
            name="input_value",
            display_name="Input",
            info="Data object to filter.",
            required=True,
            input_types=["Message", "Data"],
        ),
        MessageTextInput(
            name="query",
            display_name="JQ Query",
            info="JQ Query to filter the data. The input is always a JSON list.",
            required=True,
        ),
    ]

    outputs = [
        Output(display_name="Filtered Data", name="filtered_data", method="filter_data"),
    ]

    def _parse_data(self, input_value) -> str:
        if isinstance(input_value, Message) and isinstance(input_value.text, str):
            return input_value.text
        if isinstance(input_value, Data):
            return json.dumps(input_value.data)
        return str(input_value)

    def filter_data(self) -> list[Data]:
        to_filter = self.input_value
        if not to_filter:
            return []
        # Check if input is a list
        if isinstance(to_filter, list):
            to_filter = [self._parse_data(f) for f in to_filter]
        else:
            to_filter = self._parse_data(to_filter)

        # If input is not a list, don't wrap it in a list
        if not isinstance(to_filter, list):
            to_filter = repair_json(to_filter)
            try:
                to_filter_as_dict = json.loads(to_filter)
            except JSONDecodeError:
                try:
                    to_filter_as_dict = json.loads(repair_json(to_filter))
                except JSONDecodeError as e:
                    msg = f"Invalid JSON: {e}"
                    raise ValueError(msg) from e
        else:
            to_filter = [repair_json(f) for f in to_filter]
            to_filter_as_dict = []
            for f in to_filter:
                try:
                    to_filter_as_dict.append(json.loads(f))
                except JSONDecodeError:
                    try:
                        to_filter_as_dict.append(json.loads(repair_json(f)))
                    except JSONDecodeError as e:
                        msg = f"Invalid JSON: {e}"
                        raise ValueError(msg) from e
            to_filter = to_filter_as_dict

        full_filter_str = json.dumps(to_filter_as_dict)

        logger.info("to_filter: ", to_filter)

        results = jq.compile(self.query).input_text(full_filter_str).all()
        logger.info("results: ", results)
        return [Data(data=value) if isinstance(value, dict) else Data(text=str(value)) for value in results]

Parser

This component formats DataFrame or Data objects into text using templates, with an option to convert inputs directly to strings using stringify.

To use this component, create variables for values in the template the same way you would in a Prompt component. For DataFrames, use column names, for example Name: {Name}. For Data objects, use {text}.

Parameters

Inputs
Name Display Name Info

stringify

Stringify

Enable to convert input to a string instead of using a template.

template

Template

Template for formatting using variables in curly brackets. For DataFrames, use column names, such as `Name: {Name}. For Data objects, use `{text}.

input_data

Data or DataFrame

The input to parse - accepts either a DataFrame or Data object.

sep

Separator

String used to separate rows/items. Default: newline.

clean_data

Clean Data

When stringify is enabled, cleans data by removing empty rows and lines.

Outputs
Name Display Name Info

parsed_text

Parsed Text

The resulting formatted text as a message object.

Component code

parser.py
from langflow.custom.custom_component.component import Component
from langflow.helpers.data import safe_convert
from langflow.inputs.inputs import BoolInput, HandleInput, MessageTextInput, MultilineInput, TabInput
from langflow.schema.data import Data
from langflow.schema.dataframe import DataFrame
from langflow.schema.message import Message
from langflow.template.field.base import Output


class ParserComponent(Component):
    display_name = "Parser"
    description = "Extracts text using a template."
    icon = "braces"

    inputs = [
        HandleInput(
            name="input_data",
            display_name="Data or DataFrame",
            input_types=["DataFrame", "Data"],
            info="Accepts either a DataFrame or a Data object.",
            required=True,
        ),
        TabInput(
            name="mode",
            display_name="Mode",
            options=["Parser", "Stringify"],
            value="Parser",
            info="Convert into raw string instead of using a template.",
            real_time_refresh=True,
        ),
        MultilineInput(
            name="pattern",
            display_name="Template",
            info=(
                "Use variables within curly brackets to extract column values for DataFrames "
                "or key values for Data."
                "For example: `Name: {Name}, Age: {Age}, Country: {Country}`"
            ),
            value="Text: {text}",  # Example default
            dynamic=True,
            show=True,
            required=True,
        ),
        MessageTextInput(
            name="sep",
            display_name="Separator",
            advanced=True,
            value="\n",
            info="String used to separate rows/items.",
        ),
    ]

    outputs = [
        Output(
            display_name="Parsed Text",
            name="parsed_text",
            info="Formatted text output.",
            method="parse_combined_text",
        ),
    ]

    def update_build_config(self, build_config, field_value, field_name=None):
        """Dynamically hide/show `template` and enforce requirement based on `stringify`."""
        if field_name == "mode":
            build_config["pattern"]["show"] = self.mode == "Parser"
            build_config["pattern"]["required"] = self.mode == "Parser"
            if field_value:
                clean_data = BoolInput(
                    name="clean_data",
                    display_name="Clean Data",
                    info=(
                        "Enable to clean the data by removing empty rows and lines "
                        "in each cell of the DataFrame/ Data object."
                    ),
                    value=True,
                    advanced=True,
                    required=False,
                )
                build_config["clean_data"] = clean_data.to_dict()
            else:
                build_config.pop("clean_data", None)

        return build_config

    def _clean_args(self):
        """Prepare arguments based on input type."""
        input_data = self.input_data

        match input_data:
            case list() if all(isinstance(item, Data) for item in input_data):
                msg = "List of Data objects is not supported."
                raise ValueError(msg)
            case DataFrame():
                return input_data, None
            case Data():
                return None, input_data
            case dict() if "data" in input_data:
                try:
                    if "columns" in input_data:  # Likely a DataFrame
                        return DataFrame.from_dict(input_data), None
                    # Likely a Data object
                    return None, Data(**input_data)
                except (TypeError, ValueError, KeyError) as e:
                    msg = f"Invalid structured input provided: {e!s}"
                    raise ValueError(msg) from e
            case _:
                msg = f"Unsupported input type: {type(input_data)}. Expected DataFrame or Data."
                raise ValueError(msg)

    def parse_combined_text(self) -> Message:
        """Parse all rows/items into a single text or convert input to string if `stringify` is enabled."""
        # Early return for stringify option
        if self.mode == "Stringify":
            return self.convert_to_string()

        df, data = self._clean_args()

        lines = []
        if df is not None:
            for _, row in df.iterrows():
                formatted_text = self.pattern.format(**row.to_dict())
                lines.append(formatted_text)
        elif data is not None:
            formatted_text = self.pattern.format(**data.data)
            lines.append(formatted_text)

        combined_text = self.sep.join(lines)
        self.status = combined_text
        return Message(text=combined_text)

    def convert_to_string(self) -> Message:
        """Convert input data to string with proper error handling."""
        result = ""
        if isinstance(self.input_data, list):
            result = "\n".join([safe_convert(item, clean_data=self.clean_data or False) for item in self.input_data])
        else:
            result = safe_convert(self.input_data or False)
        self.log(f"Converted to string with length: {len(result)}")

        message = Message(text=result)
        self.status = message
        return message

Regex Extractor

This component extracts patterns from text using regular expressions.

Parameters

Inputs
Name Display Name Info

input_text

Input Text

The text to analyze. Type: MessageTextInput, Required: true

pattern

Regex Pattern

The regular expression pattern to match. Type: MessageTextInput, Required: true

Outputs
Name Display Name Info

data

Data

List of extracted matches as Data objects. Method: extract_matches

text

Message

Formatted text of all matches. Method: get_matches_text

Component code

regex.py
import re

from langflow.custom.custom_component.component import Component
from langflow.io import MessageTextInput, Output
from langflow.schema.data import Data
from langflow.schema.message import Message


class RegexExtractorComponent(Component):
    display_name = "Regex Extractor"
    description = "Extract patterns from text using regular expressions."
    icon = "regex"
    legacy = True

    inputs = [
        MessageTextInput(
            name="input_text",
            display_name="Input Text",
            info="The text to analyze",
            required=True,
        ),
        MessageTextInput(
            name="pattern",
            display_name="Regex Pattern",
            info="The regular expression pattern to match",
            value=r"",
            required=True,
            tool_mode=True,
        ),
    ]

    outputs = [
        Output(display_name="Data", name="data", method="extract_matches"),
        Output(display_name="Message", name="text", method="get_matches_text"),
    ]

    def extract_matches(self) -> list[Data]:
        if not self.pattern or not self.input_text:
            self.status = []
            return []

        try:
            # Compile regex pattern
            pattern = re.compile(self.pattern)

            # Find all matches in the input text
            matches = pattern.findall(self.input_text)

            # Filter out empty matches
            filtered_matches = [match for match in matches if match]  # Remove empty matches

            # Return empty list for no matches, or list of matches if found
            result: list = [] if not filtered_matches else [Data(data={"match": match}) for match in filtered_matches]

        except re.error as e:
            error_message = f"Invalid regex pattern: {e!s}"
            result = [Data(data={"error": error_message})]
        except ValueError as e:
            error_message = f"Error extracting matches: {e!s}"
            result = [Data(data={"error": error_message})]

        self.status = result
        return result

    def get_matches_text(self) -> Message:
        """Get matches as a formatted text message."""
        matches = self.extract_matches()

        if not matches:
            message = Message(text="No matches found")
            self.status = message
            return message

        if "error" in matches[0].data:
            message = Message(text=matches[0].data["error"])
            self.status = message
            return message

        result = "\n".join(match.data["match"] for match in matches)
        message = Message(text=result)
        self.status = message
        return message

Save to File

This component saves DataFrames, Data, or Messages to various file formats.

Parameters

Inputs
Name Display Name Info

input_type

Input Type

Select the type of input to save. Options: ["DataFrame", "Data", "Message"]. Type: DropdownInput

df

DataFrame

The DataFrame to save. Type: DataFrameInput

data

Data

The Data object to save. Type: DataInput

message

Message

The Message to save. Type: MessageInput

file_format

File Format

Select the file format to save the input. Options for DataFrame/Data: ["csv", "excel", "json", "markdown"]. Options for Message: ["txt", "json", "markdown"]. Type: DropdownInput

file_path

File Path (including filename)

The full file path (including filename and extension). Type: StrInput

Outputs
Name Display Name Info

confirmation

Confirmation

Confirmation message after saving the file. Method: save_to_file

Component code

save_to_file.py
404: Not Found

Select Data

This component is in Legacy as of Langflow version 1.1.3. Legacy components can be used in flows, but may not work due to Langflow core updates.

This component selects a single data item from a list.

Parameters

Inputs
Name Display Name Info

data_list

Data List

List of data to select from.

data_index

Data Index

Index of the data to select.

Outputs
Name Display Name Info

selected_data

Selected Data

The selected Data object.

Component code

select_data.py
from langflow.custom.custom_component.component import Component
from langflow.field_typing.range_spec import RangeSpec
from langflow.inputs.inputs import DataInput, IntInput
from langflow.io import Output
from langflow.schema.data import Data


class SelectDataComponent(Component):
    display_name: str = "Select Data"
    description: str = "Select a single data from a list of data."
    name: str = "SelectData"
    icon = "prototypes"
    legacy = True

    inputs = [
        DataInput(
            name="data_list",
            display_name="Data List",
            info="List of data to select from.",
            is_list=True,  # Specify that this input takes a list of Data objects
        ),
        IntInput(
            name="data_index",
            display_name="Data Index",
            info="Index of the data to select.",
            value=0,  # Will be populated dynamically based on the length of data_list
            range_spec=RangeSpec(min=0, max=15, step=1, step_type="int"),
        ),
    ]

    outputs = [
        Output(display_name="Selected Data", name="selected_data", method="select_data"),
    ]

    async def select_data(self) -> Data:
        # Retrieve the selected index from the dropdown
        selected_index = int(self.data_index)
        # Get the data list

        # Validate that the selected index is within bounds
        if selected_index < 0 or selected_index >= len(self.data_list):
            msg = f"Selected index {selected_index} is out of range."
            raise ValueError(msg)

        # Return the selected Data object
        selected_data = self.data_list[selected_index]
        self.status = selected_data  # Update the component status to reflect the selected data
        return selected_data

Split text

This component splits text into chunks based on specified criteria.

Parameters

Inputs
Name Display Name Info

data_inputs

Input Documents

The data to split. The component accepts Data or DataFrame objects.

chunk_overlap

Chunk Overlap

The number of characters to overlap between chunks. Default: 200.

chunk_size

Chunk Size

The maximum number of characters in each chunk. Default: 1000.

separator

Separator

The character to split on. Default: newline.

text_key

Text Key

The key to use for the text column (advanced). Default: text.

Outputs
Name Display Name Info

chunks

Chunks

List of split text chunks as Data objects.

dataframe

DataFrame

List of split text chunks as DataFrame objects.

Component code

split_text.py
from langchain_text_splitters import CharacterTextSplitter

from langflow.custom.custom_component.component import Component
from langflow.io import DropdownInput, HandleInput, IntInput, MessageTextInput, Output
from langflow.schema.data import Data
from langflow.schema.dataframe import DataFrame
from langflow.utils.util import unescape_string


class SplitTextComponent(Component):
    display_name: str = "Split Text"
    description: str = "Split text into chunks based on specified criteria."
    icon = "scissors-line-dashed"
    name = "SplitText"

    inputs = [
        HandleInput(
            name="data_inputs",
            display_name="Data or DataFrame",
            info="The data with texts to split in chunks.",
            input_types=["Data", "DataFrame"],
            required=True,
        ),
        IntInput(
            name="chunk_overlap",
            display_name="Chunk Overlap",
            info="Number of characters to overlap between chunks.",
            value=200,
        ),
        IntInput(
            name="chunk_size",
            display_name="Chunk Size",
            info=(
                "The maximum length of each chunk. Text is first split by separator, "
                "then chunks are merged up to this size. "
                "Individual splits larger than this won't be further divided."
            ),
            value=1000,
        ),
        MessageTextInput(
            name="separator",
            display_name="Separator",
            info=(
                "The character to split on. Use \\n for newline. "
                "Examples: \\n\\n for paragraphs, \\n for lines, . for sentences"
            ),
            value="\n",
        ),
        MessageTextInput(
            name="text_key",
            display_name="Text Key",
            info="The key to use for the text column.",
            value="text",
            advanced=True,
        ),
        DropdownInput(
            name="keep_separator",
            display_name="Keep Separator",
            info="Whether to keep the separator in the output chunks and where to place it.",
            options=["False", "True", "Start", "End"],
            value="False",
            advanced=True,
        ),
    ]

    outputs = [
        Output(display_name="Chunks", name="dataframe", method="split_text"),
    ]

    def _docs_to_data(self, docs) -> list[Data]:
        return [Data(text=doc.page_content, data=doc.metadata) for doc in docs]

    def _fix_separator(self, separator: str) -> str:
        """Fix common separator issues and convert to proper format."""
        if separator == "/n":
            return "\n"
        if separator == "/t":
            return "\t"
        return separator

    def split_text_base(self):
        separator = self._fix_separator(self.separator)
        separator = unescape_string(separator)

        if isinstance(self.data_inputs, DataFrame):
            if not len(self.data_inputs):
                msg = "DataFrame is empty"
                raise TypeError(msg)

            self.data_inputs.text_key = self.text_key
            try:
                documents = self.data_inputs.to_lc_documents()
            except Exception as e:
                msg = f"Error converting DataFrame to documents: {e}"
                raise TypeError(msg) from e
        else:
            if not self.data_inputs:
                msg = "No data inputs provided"
                raise TypeError(msg)

            documents = []
            if isinstance(self.data_inputs, Data):
                self.data_inputs.text_key = self.text_key
                documents = [self.data_inputs.to_lc_document()]
            else:
                try:
                    documents = [input_.to_lc_document() for input_ in self.data_inputs if isinstance(input_, Data)]
                    if not documents:
                        msg = f"No valid Data inputs found in {type(self.data_inputs)}"
                        raise TypeError(msg)
                except AttributeError as e:
                    msg = f"Invalid input type in collection: {e}"
                    raise TypeError(msg) from e
        try:
            # Convert string 'False'/'True' to boolean
            keep_sep = self.keep_separator
            if isinstance(keep_sep, str):
                if keep_sep.lower() == "false":
                    keep_sep = False
                elif keep_sep.lower() == "true":
                    keep_sep = True
                # 'start' and 'end' are kept as strings

            splitter = CharacterTextSplitter(
                chunk_overlap=self.chunk_overlap,
                chunk_size=self.chunk_size,
                separator=separator,
                keep_separator=keep_sep,
            )
            return splitter.split_documents(documents)
        except Exception as e:
            msg = f"Error splitting text: {e}"
            raise TypeError(msg) from e

    def split_text(self) -> DataFrame:
        return DataFrame(self._docs_to_data(self.split_text_base()))

Update data

The Update data component dynamically updates or appends data with specified fields.

Parameters

Inputs
Name Display Name Info

old_data

data

The records to update. It can be a single data object or a list of data objects.

number_of_fields

Number of Fields

Number of fields to be added to the record (range: 1-15).

text_key

Text Key

Key that identifies the field to be used as the text content.

text_key_validator

Text Key Validator

If enabled, checks if the given 'Text Key' is present in the given 'data' object.

Outputs
Name Display Name Info

data

data

The resulting updated data objects.

Component code

update_data.py
from typing import Any

from langflow.custom.custom_component.component import Component
from langflow.field_typing.range_spec import RangeSpec
from langflow.inputs.inputs import (
    BoolInput,
    DataInput,
    DictInput,
    IntInput,
    MessageTextInput,
)
from langflow.io import Output
from langflow.schema.data import Data
from langflow.schema.dotdict import dotdict


class UpdateDataComponent(Component):
    display_name: str = "Update Data"
    description: str = "Dynamically update or append data with the specified fields."
    name: str = "UpdateData"
    MAX_FIELDS = 15  # Define a constant for maximum number of fields
    icon = "FolderSync"
    legacy = True

    inputs = [
        DataInput(
            name="old_data",
            display_name="Data",
            info="The record to update.",
            is_list=True,  # Changed to True to handle list of Data objects
            required=True,
        ),
        IntInput(
            name="number_of_fields",
            display_name="Number of Fields",
            info="Number of fields to be added to the record.",
            real_time_refresh=True,
            value=0,
            range_spec=RangeSpec(min=1, max=MAX_FIELDS, step=1, step_type="int"),
        ),
        MessageTextInput(
            name="text_key",
            display_name="Text Key",
            info="Key that identifies the field to be used as the text content.",
            advanced=True,
        ),
        BoolInput(
            name="text_key_validator",
            display_name="Text Key Validator",
            advanced=True,
            info="If enabled, checks if the given 'Text Key' is present in the given 'Data'.",
        ),
    ]

    outputs = [
        Output(display_name="Data", name="data", method="build_data"),
    ]

    def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None):
        """Update the build configuration when the number of fields changes.

        Args:
            build_config (dotdict): The current build configuration.
            field_value (Any): The new value for the field.
            field_name (Optional[str]): The name of the field being updated.
        """
        if field_name == "number_of_fields":
            default_keys = {
                "code",
                "_type",
                "number_of_fields",
                "text_key",
                "old_data",
                "text_key_validator",
            }
            try:
                field_value_int = int(field_value)
            except ValueError:
                return build_config

            if field_value_int > self.MAX_FIELDS:
                build_config["number_of_fields"]["value"] = self.MAX_FIELDS
                msg = f"Number of fields cannot exceed {self.MAX_FIELDS}. Try using a Component to combine two Data."
                raise ValueError(msg)

            existing_fields = {}
            # Back up the existing template fields
            for key in list(build_config.keys()):
                if key not in default_keys:
                    existing_fields[key] = build_config.pop(key)

            for i in range(1, field_value_int + 1):
                key = f"field_{i}_key"
                if key in existing_fields:
                    field = existing_fields[key]
                    build_config[key] = field
                else:
                    field = DictInput(
                        display_name=f"Field {i}",
                        name=key,
                        info=f"Key for field {i}.",
                        input_types=["Message", "Data"],
                    )
                    build_config[field.name] = field.to_dict()

            build_config["number_of_fields"]["value"] = field_value_int
        return build_config

    async def build_data(self) -> Data | list[Data]:
        """Build the updated data by combining the old data with new fields."""
        new_data = self.get_data()
        if isinstance(self.old_data, list):
            for data_item in self.old_data:
                if not isinstance(data_item, Data):
                    continue  # Skip invalid items
                data_item.data.update(new_data)
                if self.text_key:
                    data_item.text_key = self.text_key
                self.validate_text_key(data_item)
            self.status = self.old_data
            return self.old_data  # Returns List[Data]
        if isinstance(self.old_data, Data):
            self.old_data.data.update(new_data)
            if self.text_key:
                self.old_data.text_key = self.text_key
            self.status = self.old_data
            self.validate_text_key(self.old_data)
            return self.old_data  # Returns Data
        msg = "old_data is not a Data object or list of Data objects."
        raise ValueError(msg)

    def get_data(self):
        """Function to get the Data from the attributes."""
        data = {}
        default_keys = {
            "code",
            "_type",
            "number_of_fields",
            "text_key",
            "old_data",
            "text_key_validator",
        }
        for attr_name, attr_value in self._attributes.items():
            if attr_name in default_keys:
                continue  # Skip default attributes
            if isinstance(attr_value, dict):
                for key, value in attr_value.items():
                    data[key] = value.get_text() if isinstance(value, Data) else value
            elif isinstance(attr_value, Data):
                data[attr_name] = attr_value.get_text()
            else:
                data[attr_name] = attr_value
        return data

    def validate_text_key(self, data: Data) -> None:
        """This function validates that the Text Key is one of the keys in the Data."""
        data_keys = data.data.keys()
        if self.text_key and self.text_key not in data_keys:
            msg = f"Text Key: '{self.text_key}' not found in the Data keys: {', '.join(data_keys)}"
            raise ValueError(msg)

Was this helpful?

Give Feedback

How can we improve the documentation?

© 2025 DataStax | Privacy policy | Terms of use | Manage Privacy Choices

Apache, Apache Cassandra, Cassandra, Apache Tomcat, Tomcat, Apache Lucene, Apache Solr, Apache Hadoop, Hadoop, Apache Pulsar, Pulsar, Apache Spark, Spark, Apache TinkerPop, TinkerPop, Apache Kafka and Kafka are either registered trademarks or trademarks of the Apache Software Foundation or its subsidiaries in Canada, the United States and/or other countries. Kubernetes is the registered trademark of the Linux Foundation.

General Inquiries: +1 (650) 389-6000, info@datastax.com